You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@streampipes.apache.org by ze...@apache.org on 2021/10/25 13:11:53 UTC

[incubator-streampipes-extensions] 02/03: [STREAMPIPES-447] Processing Element: Detect Value Change

This is an automated email from the ASF dual-hosted git repository.

zehnder pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-streampipes-extensions.git

commit e1152e748b867ec4bab859609d74d528b3744ca0
Author: Philipp Zehnder <ze...@fzi.de>
AuthorDate: Mon Oct 25 15:03:44 2021 +0200

    [STREAMPIPES-447] Processing Element: Detect Value Change
---
 .../processors/enricher/jvm/EnricherJvmInit.java   |  4 +-
 ...geController.java => ValueChangeProcessor.java} | 45 ++++++++++++++--------
 .../documentation.md                               |  5 ++-
 .../strings.en                                     | 13 ++++++-
 .../numericalfilter/NumericalFilterProcessor.java  |  2 +-
 5 files changed, 47 insertions(+), 22 deletions(-)

diff --git a/streampipes-processors-enricher-jvm/src/main/java/org/apache/streampipes/processors/enricher/jvm/EnricherJvmInit.java b/streampipes-processors-enricher-jvm/src/main/java/org/apache/streampipes/processors/enricher/jvm/EnricherJvmInit.java
index 0fbcbdf..d022600 100644
--- a/streampipes-processors-enricher-jvm/src/main/java/org/apache/streampipes/processors/enricher/jvm/EnricherJvmInit.java
+++ b/streampipes-processors-enricher-jvm/src/main/java/org/apache/streampipes/processors/enricher/jvm/EnricherJvmInit.java
@@ -30,7 +30,7 @@ import org.apache.streampipes.messaging.kafka.SpKafkaProtocolFactory;
 import org.apache.streampipes.messaging.mqtt.SpMqttProtocolFactory;
 import org.apache.streampipes.processors.enricher.jvm.processor.jseval.JSEvalController;
 import org.apache.streampipes.processors.enricher.jvm.processor.sizemeasure.SizeMeasureController;
-import org.apache.streampipes.processors.enricher.jvm.processor.valueChange.ValueChangeController;
+import org.apache.streampipes.processors.enricher.jvm.processor.valueChange.ValueChangeProcessor;
 
 public class EnricherJvmInit extends StandaloneModelSubmitter {
 
@@ -46,7 +46,7 @@ public class EnricherJvmInit extends StandaloneModelSubmitter {
             8090)
             .registerPipelineElements(new SizeMeasureController(),
                     new JSEvalController(),
-                    new ValueChangeController())
+                    new ValueChangeProcessor())
             .registerMessagingFormats(
                     new JsonDataFormatFactory(),
                     new CborDataFormatFactory(),
diff --git a/streampipes-processors-enricher-jvm/src/main/java/org/apache/streampipes/processors/enricher/jvm/processor/valueChange/ValueChangeController.java b/streampipes-processors-enricher-jvm/src/main/java/org/apache/streampipes/processors/enricher/jvm/processor/valueChange/ValueChangeProcessor.java
similarity index 54%
rename from streampipes-processors-enricher-jvm/src/main/java/org/apache/streampipes/processors/enricher/jvm/processor/valueChange/ValueChangeController.java
rename to streampipes-processors-enricher-jvm/src/main/java/org/apache/streampipes/processors/enricher/jvm/processor/valueChange/ValueChangeProcessor.java
index a378ae6..4200755 100644
--- a/streampipes-processors-enricher-jvm/src/main/java/org/apache/streampipes/processors/enricher/jvm/processor/valueChange/ValueChangeController.java
+++ b/streampipes-processors-enricher-jvm/src/main/java/org/apache/streampipes/processors/enricher/jvm/processor/valueChange/ValueChangeProcessor.java
@@ -15,44 +15,59 @@ import org.apache.streampipes.wrapper.routing.SpOutputCollector;
 import org.apache.streampipes.wrapper.standalone.ProcessorParams;
 import org.apache.streampipes.wrapper.standalone.StreamPipesDataProcessor;
 
-public class ValueChangeController extends StreamPipesDataProcessor {
-	private static final String CHANGEVALUE_MAPPING = "changevalue-mapping";
-	private static final String USER_INPUT_MAPPING = "user-input-mapping";
+public class ValueChangeProcessor extends StreamPipesDataProcessor {
+	private static final String CHANGE_VALUE_MAPPING_ID = "change-value-mapping";
+	private static final String FROM_PROPERTY_VALUE_ID = "from-property-value";
+	private static final String TO_PROPERTY_VALUE_ID = "to-property-value";
+	private static final String IS_CHANGED_ID = "is-changed";
 	private static final String IS_CHANGED = "isChanged";
 
-
-	private float initValue;
+	private String  mappingProperty;
+	private float userDefinedFrom;
+	private float userDefinedTo;
+	private float lastValueOfEvent;
 
 	@Override
 	public DataProcessorDescription declareModel() {
-		return ProcessingElementBuilder.create("org.apache.streampipes.processors.enricher.jvm.valueChange","ValueChange","A value change data processor which return a boolean on data change")
+		return ProcessingElementBuilder.create("org.apache.streampipes.processors.enricher.jvm.valueChange")
 				.category(DataProcessorType.ENRICH)
 				.withAssets(Assets.DOCUMENTATION, Assets.ICON)
 				.withLocales(Locales.EN)
-				.requiredFloatParameter(Labels.withId(USER_INPUT_MAPPING))
+				.requiredFloatParameter(Labels.withId(FROM_PROPERTY_VALUE_ID))
+				.requiredFloatParameter(Labels.withId(TO_PROPERTY_VALUE_ID))
 				.requiredStream(StreamRequirementsBuilder
 						.create()
 						.requiredPropertyWithUnaryMapping(EpRequirements.numberReq(),
-							Labels.withId(CHANGEVALUE_MAPPING),
+							Labels.withId(CHANGE_VALUE_MAPPING_ID),
 							PropertyScope.NONE)
 						.build())
-				.outputStrategy(OutputStrategies.append(EpProperties.booleanEp(Labels.withId(IS_CHANGED),
-					IS_CHANGED,SO.Boolean)))
+				.outputStrategy(OutputStrategies.append(
+						EpProperties.booleanEp(Labels.withId(IS_CHANGED_ID), IS_CHANGED, SO.Boolean)))
 				.build();
 	}
 
 	@Override
 	public void onInvocation(ProcessorParams processorParams, SpOutputCollector spOutputCollector, EventProcessorRuntimeContext eventProcessorRuntimeContext) throws SpRuntimeException {
-		this.initValue = processorParams.extractor().singleValueParameter(USER_INPUT_MAPPING,Float.class);
+		this.lastValueOfEvent = Float.MAX_VALUE;
+		this.userDefinedFrom = processorParams.extractor().singleValueParameter(FROM_PROPERTY_VALUE_ID, Float.class);
+		this.userDefinedTo = processorParams.extractor().singleValueParameter(TO_PROPERTY_VALUE_ID, Float.class);
+		this.mappingProperty = processorParams.extractor().mappingPropertyValue(CHANGE_VALUE_MAPPING_ID);
 	}
 
 	@Override
 	public void onEvent(Event event, SpOutputCollector spOutputCollector) throws SpRuntimeException {
-		float currValue = event.getFieldBySelector(CHANGEVALUE_MAPPING).getAsPrimitive().getAsFloat();
-		if(currValue == this.initValue)
+		float thisValue = event.getFieldBySelector(mappingProperty).getAsPrimitive().getAsFloat();
+		if (this.lastValueOfEvent != Float.MAX_VALUE) {
+			if (this.lastValueOfEvent == this.userDefinedFrom && thisValue  == this.userDefinedTo) {
+				event.addField(IS_CHANGED,true);
+			} else {
+				event.addField(IS_CHANGED,false);
+			}
+		} else {
 			event.addField(IS_CHANGED,false);
-		else
-			event.addField(IS_CHANGED,true);
+		}
+
+		this.lastValueOfEvent =  thisValue;
 		spOutputCollector.collect(event);
 	}
 
diff --git a/streampipes-processors-enricher-jvm/src/main/resources/org.apache.streampipes.processors.enricher.jvm.valueChange/documentation.md b/streampipes-processors-enricher-jvm/src/main/resources/org.apache.streampipes.processors.enricher.jvm.valueChange/documentation.md
index be695c9..6a74d9d 100644
--- a/streampipes-processors-enricher-jvm/src/main/resources/org.apache.streampipes.processors.enricher.jvm.valueChange/documentation.md
+++ b/streampipes-processors-enricher-jvm/src/main/resources/org.apache.streampipes.processors.enricher.jvm.valueChange/documentation.md
@@ -13,10 +13,11 @@
   ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
   ~ See the License for the specific language governing permissions and
   ~ limitations under the License.
+
   ~
   -->
 
-## Size Measure
+## Value Change
 
 <p align="center"> 
     <img src="icon.png" width="150px;" class="pe-image-documentation"/>
@@ -26,7 +27,7 @@
 
 ## Description
 
-Measures the size of an incoming event and appends this number to the event by serializing it.
+The processing element should be able to detect when a numeric property change from one configured value to another.
 
 ***
 
diff --git a/streampipes-processors-enricher-jvm/src/main/resources/org.apache.streampipes.processors.enricher.jvm.valueChange/strings.en b/streampipes-processors-enricher-jvm/src/main/resources/org.apache.streampipes.processors.enricher.jvm.valueChange/strings.en
index e63e1a1..873d715 100644
--- a/streampipes-processors-enricher-jvm/src/main/resources/org.apache.streampipes.processors.enricher.jvm.valueChange/strings.en
+++ b/streampipes-processors-enricher-jvm/src/main/resources/org.apache.streampipes.processors.enricher.jvm.valueChange/strings.en
@@ -1,5 +1,14 @@
 org.apache.streampipes.processors.enricher.jvm.valueChange.title=Value Change
 org.apache.streampipes.processors.enricher.jvm.valueChange.description=The processing element should be able to detect when a numeric property change from one configured value to another
 
-valueChange.title=Value Change
-valueChange.description=The unit in which the size of the event should be added
\ No newline at end of file
+change-value-mapping.title=Property to monitor
+change-value-mapping.description=The property where the values are monitored
+
+from-property-value.title=From Value
+from-property-value.description=Result is true, when the property value switches from this value to the To Value
+
+to-property-value.title=To Value
+to-property-value.description=Result is true, wWhen the property value switches from the from value to this
+
+is-changed.title=isChanged
+is-changed.description=
\ No newline at end of file
diff --git a/streampipes-processors-filters-jvm/src/main/java/org/apache/streampipes/processors/filters/jvm/processor/numericalfilter/NumericalFilterProcessor.java b/streampipes-processors-filters-jvm/src/main/java/org/apache/streampipes/processors/filters/jvm/processor/numericalfilter/NumericalFilterProcessor.java
index d55b1eb..990c667 100644
--- a/streampipes-processors-filters-jvm/src/main/java/org/apache/streampipes/processors/filters/jvm/processor/numericalfilter/NumericalFilterProcessor.java
+++ b/streampipes-processors-filters-jvm/src/main/java/org/apache/streampipes/processors/filters/jvm/processor/numericalfilter/NumericalFilterProcessor.java
@@ -60,7 +60,7 @@ public class NumericalFilterProcessor extends StreamPipesDataProcessor {
             .outputStrategy(OutputStrategies.keep())
             .requiredSingleValueSelection(Labels.withId(OPERATION), Options.from("<", "<=", ">",
                     ">=", "==", "!="))
-            .requiredFloatParameter(Labels.withId(VALUE), NUMBER_MAPPING)
+            .requiredFloatParameter(Labels.withId(VALUE))
             .build();
 
   }