You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@streampipes.apache.org by bo...@apache.org on 2022/11/03 19:15:16 UTC
[incubator-streampipes] 01/01: refactored cumsum processor
This is an automated email from the ASF dual-hosted git repository.
bossenti pushed a commit to branch chore/refactor-processing-elemets
in repository https://gitbox.apache.org/repos/asf/incubator-streampipes.git
commit 5e70e39a7e8550160b87b6d35b9022a646c34bce
Author: bossenti <bo...@posteo.de>
AuthorDate: Thu Nov 3 20:10:00 2022 +0100
refactored cumsum processor
co-authored-by: smlabt <gi...@samuealbt.de>
---
.../jvm/ChangeDetectionJvmInit.java | 6 +-
.../changedetection/jvm/cusum/Cusum.java | 1 +
.../changedetection/jvm/cusum/CusumController.java | 1 +
.../jvm/cusum/CusumEventFields.java | 2 +
.../changedetection/jvm/cusum/CusumParameters.java | 1 +
.../jvm/cusum/WelfordAggregate.java | 1 +
.../jvm/{cusum => welford}/WelfordAggregate.java | 6 +-
.../jvm/welford/WelfordChangeDetection.java | 132 +++++++++++++++++++++
.../jvm/welford/WelfordEventFields.java | 19 +++
.../icon.png | Bin 0 -> 62957 bytes
.../documentation.md | 59 +++++++++
.../icon.png | Bin 0 -> 62957 bytes
.../strings.en | 11 ++
.../StandaloneEventProcessingDeclarer.java | 1 +
.../binding/EventProcessorBindingParams.java | 1 +
.../wrapper/runtime/EventProcessor.java | 1 +
16 files changed, 236 insertions(+), 6 deletions(-)
diff --git a/streampipes-extensions/streampipes-processors-change-detection-jvm/src/main/java/org/apache/streampipes/processors/changedetection/jvm/ChangeDetectionJvmInit.java b/streampipes-extensions/streampipes-processors-change-detection-jvm/src/main/java/org/apache/streampipes/processors/changedetection/jvm/ChangeDetectionJvmInit.java
index ef55d2832..c561bf9b7 100644
--- a/streampipes-extensions/streampipes-processors-change-detection-jvm/src/main/java/org/apache/streampipes/processors/changedetection/jvm/ChangeDetectionJvmInit.java
+++ b/streampipes-extensions/streampipes-processors-change-detection-jvm/src/main/java/org/apache/streampipes/processors/changedetection/jvm/ChangeDetectionJvmInit.java
@@ -30,6 +30,7 @@ import org.apache.streampipes.messaging.jms.SpJmsProtocolFactory;
import org.apache.streampipes.messaging.kafka.SpKafkaProtocolFactory;
import org.apache.streampipes.messaging.mqtt.SpMqttProtocolFactory;
import org.apache.streampipes.processors.changedetection.jvm.cusum.CusumController;
+import org.apache.streampipes.processors.changedetection.jvm.welford.WelfordChangeDetection;
public class ChangeDetectionJvmInit extends StandaloneModelSubmitter {
@@ -43,7 +44,10 @@ public class ChangeDetectionJvmInit extends StandaloneModelSubmitter {
"Processors Change Detection JVM",
"",
8090)
- .registerPipelineElements(new CusumController())
+ .registerPipelineElements(
+ new CusumController(),
+ new WelfordChangeDetection()
+ )
.registerMessagingFormats(
new JsonDataFormatFactory(),
new CborDataFormatFactory(),
diff --git a/streampipes-extensions/streampipes-processors-change-detection-jvm/src/main/java/org/apache/streampipes/processors/changedetection/jvm/cusum/Cusum.java b/streampipes-extensions/streampipes-processors-change-detection-jvm/src/main/java/org/apache/streampipes/processors/changedetection/jvm/cusum/Cusum.java
index 8c997b833..b65a01e6e 100644
--- a/streampipes-extensions/streampipes-processors-change-detection-jvm/src/main/java/org/apache/streampipes/processors/changedetection/jvm/cusum/Cusum.java
+++ b/streampipes-extensions/streampipes-processors-change-detection-jvm/src/main/java/org/apache/streampipes/processors/changedetection/jvm/cusum/Cusum.java
@@ -24,6 +24,7 @@ import org.apache.streampipes.wrapper.context.EventProcessorRuntimeContext;
import org.apache.streampipes.wrapper.routing.SpOutputCollector;
import org.apache.streampipes.wrapper.runtime.EventProcessor;
+@Deprecated(since="0.70.0", forRemoval = true)
public class Cusum implements EventProcessor<CusumParameters> {
private String selectedNumberMapping;
diff --git a/streampipes-extensions/streampipes-processors-change-detection-jvm/src/main/java/org/apache/streampipes/processors/changedetection/jvm/cusum/CusumController.java b/streampipes-extensions/streampipes-processors-change-detection-jvm/src/main/java/org/apache/streampipes/processors/changedetection/jvm/cusum/CusumController.java
index 5fad75db9..8028fced5 100644
--- a/streampipes-extensions/streampipes-processors-change-detection-jvm/src/main/java/org/apache/streampipes/processors/changedetection/jvm/cusum/CusumController.java
+++ b/streampipes-extensions/streampipes-processors-change-detection-jvm/src/main/java/org/apache/streampipes/processors/changedetection/jvm/cusum/CusumController.java
@@ -33,6 +33,7 @@ import org.apache.streampipes.wrapper.standalone.declarer.StandaloneEventProcess
import java.util.Arrays;
+@Deprecated(since = "0.70.0", forRemoval = true)
public class CusumController extends StandaloneEventProcessingDeclarer<CusumParameters> {
private static final String NUMBER_MAPPING = "number-mapping";
diff --git a/streampipes-extensions/streampipes-processors-change-detection-jvm/src/main/java/org/apache/streampipes/processors/changedetection/jvm/cusum/CusumEventFields.java b/streampipes-extensions/streampipes-processors-change-detection-jvm/src/main/java/org/apache/streampipes/processors/changedetection/jvm/cusum/CusumEventFields.java
index 555c38964..f3fff0b30 100644
--- a/streampipes-extensions/streampipes-processors-change-detection-jvm/src/main/java/org/apache/streampipes/processors/changedetection/jvm/cusum/CusumEventFields.java
+++ b/streampipes-extensions/streampipes-processors-change-detection-jvm/src/main/java/org/apache/streampipes/processors/changedetection/jvm/cusum/CusumEventFields.java
@@ -18,6 +18,8 @@
package org.apache.streampipes.processors.changedetection.jvm.cusum;
+
+@Deprecated(since = "0.70.0", forRemoval = true)
public class CusumEventFields {
public static final String VAL_LOW = "cusumLow";
diff --git a/streampipes-extensions/streampipes-processors-change-detection-jvm/src/main/java/org/apache/streampipes/processors/changedetection/jvm/cusum/CusumParameters.java b/streampipes-extensions/streampipes-processors-change-detection-jvm/src/main/java/org/apache/streampipes/processors/changedetection/jvm/cusum/CusumParameters.java
index 901f5c734..d243f7a53 100644
--- a/streampipes-extensions/streampipes-processors-change-detection-jvm/src/main/java/org/apache/streampipes/processors/changedetection/jvm/cusum/CusumParameters.java
+++ b/streampipes-extensions/streampipes-processors-change-detection-jvm/src/main/java/org/apache/streampipes/processors/changedetection/jvm/cusum/CusumParameters.java
@@ -21,6 +21,7 @@ package org.apache.streampipes.processors.changedetection.jvm.cusum;
import org.apache.streampipes.model.graph.DataProcessorInvocation;
import org.apache.streampipes.wrapper.params.binding.EventProcessorBindingParams;
+@Deprecated(since = "0.70.0", forRemoval = true)
public class CusumParameters extends EventProcessorBindingParams {
private String selectedNumberMapping;
diff --git a/streampipes-extensions/streampipes-processors-change-detection-jvm/src/main/java/org/apache/streampipes/processors/changedetection/jvm/cusum/WelfordAggregate.java b/streampipes-extensions/streampipes-processors-change-detection-jvm/src/main/java/org/apache/streampipes/processors/changedetection/jvm/cusum/WelfordAggregate.java
index 3479bb7c3..019c494c1 100644
--- a/streampipes-extensions/streampipes-processors-change-detection-jvm/src/main/java/org/apache/streampipes/processors/changedetection/jvm/cusum/WelfordAggregate.java
+++ b/streampipes-extensions/streampipes-processors-change-detection-jvm/src/main/java/org/apache/streampipes/processors/changedetection/jvm/cusum/WelfordAggregate.java
@@ -18,6 +18,7 @@
package org.apache.streampipes.processors.changedetection.jvm.cusum;
+@Deprecated(since = "0.70.0", forRemoval = true)
public class WelfordAggregate {
private Integer count;
private Double mean;
diff --git a/streampipes-extensions/streampipes-processors-change-detection-jvm/src/main/java/org/apache/streampipes/processors/changedetection/jvm/cusum/WelfordAggregate.java b/streampipes-extensions/streampipes-processors-change-detection-jvm/src/main/java/org/apache/streampipes/processors/changedetection/jvm/welford/WelfordAggregate.java
similarity index 90%
copy from streampipes-extensions/streampipes-processors-change-detection-jvm/src/main/java/org/apache/streampipes/processors/changedetection/jvm/cusum/WelfordAggregate.java
copy to streampipes-extensions/streampipes-processors-change-detection-jvm/src/main/java/org/apache/streampipes/processors/changedetection/jvm/welford/WelfordAggregate.java
index 3479bb7c3..e447c318b 100644
--- a/streampipes-extensions/streampipes-processors-change-detection-jvm/src/main/java/org/apache/streampipes/processors/changedetection/jvm/cusum/WelfordAggregate.java
+++ b/streampipes-extensions/streampipes-processors-change-detection-jvm/src/main/java/org/apache/streampipes/processors/changedetection/jvm/welford/WelfordAggregate.java
@@ -16,7 +16,7 @@
*
*/
-package org.apache.streampipes.processors.changedetection.jvm.cusum;
+package org.apache.streampipes.processors.changedetection.jvm.welford;
public class WelfordAggregate {
private Integer count;
@@ -52,8 +52,4 @@ public class WelfordAggregate {
public Double getSampleStd() {
return Math.sqrt(getSampleVariance());
}
-
- public Double getPopulationStd() {
- return Math.sqrt(getPopulationVariance());
- }
}
diff --git a/streampipes-extensions/streampipes-processors-change-detection-jvm/src/main/java/org/apache/streampipes/processors/changedetection/jvm/welford/WelfordChangeDetection.java b/streampipes-extensions/streampipes-processors-change-detection-jvm/src/main/java/org/apache/streampipes/processors/changedetection/jvm/welford/WelfordChangeDetection.java
new file mode 100644
index 000000000..e084377fd
--- /dev/null
+++ b/streampipes-extensions/streampipes-processors-change-detection-jvm/src/main/java/org/apache/streampipes/processors/changedetection/jvm/welford/WelfordChangeDetection.java
@@ -0,0 +1,132 @@
+package org.apache.streampipes.processors.changedetection.jvm.welford;
+
+import org.apache.streampipes.commons.exceptions.SpRuntimeException;
+import org.apache.streampipes.model.DataProcessorType;
+import org.apache.streampipes.model.graph.DataProcessorDescription;
+import org.apache.streampipes.model.runtime.Event;
+import org.apache.streampipes.model.schema.PropertyScope;
+import org.apache.streampipes.processors.changedetection.jvm.cusum.CusumEventFields;
+import org.apache.streampipes.sdk.builder.ProcessingElementBuilder;
+import org.apache.streampipes.sdk.builder.StreamRequirementsBuilder;
+import org.apache.streampipes.sdk.extractor.ProcessingElementParameterExtractor;
+import org.apache.streampipes.sdk.helpers.*;
+import org.apache.streampipes.sdk.utils.Assets;
+import org.apache.streampipes.vocabulary.SO;
+import org.apache.streampipes.wrapper.context.EventProcessorRuntimeContext;
+import org.apache.streampipes.wrapper.routing.SpOutputCollector;
+import org.apache.streampipes.wrapper.standalone.ProcessorParams;
+import org.apache.streampipes.wrapper.standalone.StreamPipesDataProcessor;
+
+import java.util.Arrays;
+
+public class WelfordChangeDetection extends StreamPipesDataProcessor {
+
+ private static final String NUMBER_MAPPING = "number-mapping";
+ private static final String PARAM_K = "param-k";
+ private static final String PARAM_H = "param-h";
+
+ private String selectedNumberMapping;
+ private Double k;
+ private Double h;
+ private Double cuSumLow;
+ private Double cuSumHigh;
+ private WelfordAggregate welfordAggregate;
+
+ @Override
+ public DataProcessorDescription declareModel() {
+ return ProcessingElementBuilder.create("org.apache.streampipes.processors.changedetection.jvm.welford")
+ .category(DataProcessorType.VALUE_OBSERVER)
+ .withAssets(Assets.DOCUMENTATION)
+ .withAssets(Assets.ICON)
+ .withLocales(Locales.EN)
+ .requiredStream(StreamRequirementsBuilder
+ .create()
+ .requiredPropertyWithUnaryMapping(EpRequirements.numberReq(),
+ Labels.withId(NUMBER_MAPPING),
+ PropertyScope.NONE).build())
+ .requiredFloatParameter(Labels.withId(PARAM_K), 0.0f, 0.0f, 100.0f, 0.01f)
+ .requiredFloatParameter(Labels.withId(PARAM_H), 0.0f, 0.0f, 100.0f, 0.01f)
+ .outputStrategy(
+ OutputStrategies.append(
+ Arrays.asList(
+ EpProperties.numberEp(Labels.empty(), WelfordEventFields.VAL_LOW.label, SO.Number),
+ EpProperties.numberEp(Labels.empty(), WelfordEventFields.VAL_HIGH.label, SO.Number),
+ EpProperties.booleanEp(Labels.empty(), WelfordEventFields.DECISION_LOW.label, SO.Boolean),
+ EpProperties.booleanEp(Labels.empty(), WelfordEventFields.DECISION_HIGH.label, SO.Boolean)
+ )
+ ))
+ .build();
+ }
+
+ @Override
+ public void onInvocation(ProcessorParams parameters, SpOutputCollector spOutputCollector, EventProcessorRuntimeContext runtimeContext) throws SpRuntimeException {
+
+ ProcessingElementParameterExtractor extractor = parameters.extractor();
+ this.selectedNumberMapping = extractor.singleValueParameter(NUMBER_MAPPING, String.class);
+ this.k = extractor.singleValueParameter(PARAM_K, Double.class);
+ this.h = extractor.singleValueParameter(PARAM_H, Double.class);
+ this.cuSumLow = 0.0;
+ this.cuSumHigh = 0.0;
+ this.welfordAggregate = new WelfordAggregate();
+
+ }
+
+ @Override
+ public void onEvent(Event event, SpOutputCollector collector) throws SpRuntimeException {
+
+ Double number = event.getFieldBySelector(selectedNumberMapping).getAsPrimitive().getAsDouble();
+ welfordAggregate.update(number); // update mean and standard deviation
+ Double normalized = getZScoreNormalizedValue(number);
+ updateStatistics(normalized);
+
+
+ Boolean isChangeHigh = getTestResult(this.cuSumHigh, h);
+ Boolean isChangeLow = getTestResult(this.cuSumLow, h);
+
+ Event updatedEvent = updateEvent(event, this.cuSumLow, this.cuSumHigh, isChangeLow, isChangeHigh);
+ collector.collect(updatedEvent);
+
+ if (isChangeHigh || isChangeLow) {
+ resetAfterChange();
+ }
+
+ }
+
+ @Override
+ public void onDetach() throws SpRuntimeException {
+ this.cuSumLow = 0.0;
+ this.cuSumHigh = 0.0;
+ }
+
+ private Double getZScoreNormalizedValue(Double value) {
+ Double mean = welfordAggregate.getMean();
+ Double std = welfordAggregate.getSampleStd();
+ return (value - mean) / std;
+ }
+
+ private void updateStatistics(Double newValue) {
+ if (newValue.isNaN()) {
+ return;
+ }
+ this.cuSumHigh = Math.max(0, this.cuSumHigh + newValue - k);
+ this.cuSumLow = Math.min(0, this.cuSumLow + newValue + k);
+ }
+
+ private Boolean getTestResult(Double cusum, Double h) {
+ return Math.abs(cusum) > this.h;
+ }
+
+ private Event updateEvent(Event event, Double cusumLow, Double cusumHigh, Boolean decisionLow, Boolean decisionHigh) {
+ event.addField(WelfordEventFields.VAL_LOW.label, cusumLow);
+ event.addField(WelfordEventFields.VAL_HIGH.label, cusumHigh);
+ event.addField(WelfordEventFields.DECISION_LOW.label, decisionLow);
+ event.addField(WelfordEventFields.DECISION_HIGH.label, decisionHigh);
+ return event;
+ }
+
+ private void resetAfterChange() {
+ this.cuSumHigh = 0.0;
+ this.cuSumLow = 0.0;
+ welfordAggregate = new WelfordAggregate();
+ }
+}
diff --git a/streampipes-extensions/streampipes-processors-change-detection-jvm/src/main/java/org/apache/streampipes/processors/changedetection/jvm/welford/WelfordEventFields.java b/streampipes-extensions/streampipes-processors-change-detection-jvm/src/main/java/org/apache/streampipes/processors/changedetection/jvm/welford/WelfordEventFields.java
new file mode 100644
index 000000000..b4e7098dc
--- /dev/null
+++ b/streampipes-extensions/streampipes-processors-change-detection-jvm/src/main/java/org/apache/streampipes/processors/changedetection/jvm/welford/WelfordEventFields.java
@@ -0,0 +1,19 @@
+package org.apache.streampipes.processors.changedetection.jvm.welford;
+
+public enum WelfordEventFields {
+ VAL_LOW("cumSumLow"),
+ VAL_HIGH("cumSumHigh"),
+ DECISION_LOW("changeDetectedLow"),
+ DECISION_HIGH("changeDetectedHigh");
+
+ public final String label;
+
+ WelfordEventFields(String label) {
+ this.label = label;
+ }
+
+ @Override
+ public String toString() {
+ return this.label;
+ }
+}
diff --git a/streampipes-extensions/streampipes-processors-change-detection-jvm/src/main/resources/org.apache.streampipes.processors.changedetection.jvm.cusum/icon.png b/streampipes-extensions/streampipes-processors-change-detection-jvm/src/main/resources/org.apache.streampipes.processors.changedetection.jvm.cusum/icon.png
new file mode 100644
index 000000000..444fa4e2c
Binary files /dev/null and b/streampipes-extensions/streampipes-processors-change-detection-jvm/src/main/resources/org.apache.streampipes.processors.changedetection.jvm.cusum/icon.png differ
diff --git a/streampipes-extensions/streampipes-processors-change-detection-jvm/src/main/resources/org.apache.streampipes.processors.changedetection.jvm.welford/documentation.md b/streampipes-extensions/streampipes-processors-change-detection-jvm/src/main/resources/org.apache.streampipes.processors.changedetection.jvm.welford/documentation.md
new file mode 100644
index 000000000..9dfaf814d
--- /dev/null
+++ b/streampipes-extensions/streampipes-processors-change-detection-jvm/src/main/resources/org.apache.streampipes.processors.changedetection.jvm.welford/documentation.md
@@ -0,0 +1,59 @@
+<!--
+ ~ Licensed to the Apache Software Foundation (ASF) under one or more
+ ~ contributor license agreements. See the NOTICE file distributed with
+ ~ this work for additional information regarding copyright ownership.
+ ~ The ASF licenses this file to You under the Apache License, Version 2.0
+ ~ (the "License"); you may not use this file except in compliance with
+ ~ the License. You may obtain a copy of the License at
+ ~
+ ~ http://www.apache.org/licenses/LICENSE-2.0
+ ~
+ ~ Unless required by applicable law or agreed to in writing, software
+ ~ distributed under the License is distributed on an "AS IS" BASIS,
+ ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ ~ See the License for the specific language governing permissions and
+ ~ limitations under the License.
+ ~
+ -->
+
+## Cusum (Cumulative Sum)
+
+<!--
+<p align="center">
+ <img src="icon.png" width="150px;" class="pe-image-documentation"/>
+</p>
+-->
+
+***
+
+## Description
+
+Performs change detection on a single dimension of the incoming data stream. A change is detected if the cumulative deviation from the mean exceeds a certain threshold. This implementation tracks the mean and the standard deviation using Welford's algorithm, which is well suited for data streams.
+
+***
+
+## Required input
+
+The welford dectection processor requires a data stream that has at least one field containing a numerical value.
+
+***
+
+## Configuration
+
+### Value to observe
+Specify the dimension of the data stream (e.g. the temperature) on which to perform change detection.
+
+### Parameter `k`
+`k` controls the sensitivity of the change detector. Its unit are standard deviations. For an observation `x_n`, the Cusum value is `S_n = max(0, S_{n-1} - z-score(x_n) - k)`. Thus, the cusum-score `S` icnreases if `S_{n-1} - z-score(x_n) > k`.
+
+### Parameter `h`
+The alarm theshold in standard deviations. An alarm occurs if `S_n > h`
+
+## Output
+
+This processor outputs the original data stream plus
+
+- `cusumLow`: The cusum value for negative changes
+- `cusumHigh`: The cusum value for positive changes
+- `changeDetectedLow`: Boolean indicating if a negative change was detected
+- `changeDetectedHigh`: Boolean indicating if a positive change was detected
\ No newline at end of file
diff --git a/streampipes-extensions/streampipes-processors-change-detection-jvm/src/main/resources/org.apache.streampipes.processors.changedetection.jvm.welford/icon.png b/streampipes-extensions/streampipes-processors-change-detection-jvm/src/main/resources/org.apache.streampipes.processors.changedetection.jvm.welford/icon.png
new file mode 100644
index 000000000..444fa4e2c
Binary files /dev/null and b/streampipes-extensions/streampipes-processors-change-detection-jvm/src/main/resources/org.apache.streampipes.processors.changedetection.jvm.welford/icon.png differ
diff --git a/streampipes-extensions/streampipes-processors-change-detection-jvm/src/main/resources/org.apache.streampipes.processors.changedetection.jvm.welford/strings.en b/streampipes-extensions/streampipes-processors-change-detection-jvm/src/main/resources/org.apache.streampipes.processors.changedetection.jvm.welford/strings.en
new file mode 100644
index 000000000..72b38eee7
--- /dev/null
+++ b/streampipes-extensions/streampipes-processors-change-detection-jvm/src/main/resources/org.apache.streampipes.processors.changedetection.jvm.welford/strings.en
@@ -0,0 +1,11 @@
+org.apache.streampipes.processors.changedetection.jvm.cusum.title=Cusum
+org.apache.streampipes.processors.changedetection.jvm.cusum.description=
+
+number-mapping.title=Value to observe
+number-mapping.description=Specifies the monitored dimension.
+
+param-k.title=Parameter k
+param-k.description=The sensitivity parameter. High value indicates low sensitivity. Unit: Standard deviations
+
+param-h.title=Parameter h
+param-h.description=The threshold above which a change is detected. Unit: Standard deviations
\ No newline at end of file
diff --git a/streampipes-wrapper-standalone/src/main/java/org/apache/streampipes/wrapper/standalone/declarer/StandaloneEventProcessingDeclarer.java b/streampipes-wrapper-standalone/src/main/java/org/apache/streampipes/wrapper/standalone/declarer/StandaloneEventProcessingDeclarer.java
index 9cba63920..7bbc6d84c 100644
--- a/streampipes-wrapper-standalone/src/main/java/org/apache/streampipes/wrapper/standalone/declarer/StandaloneEventProcessingDeclarer.java
+++ b/streampipes-wrapper-standalone/src/main/java/org/apache/streampipes/wrapper/standalone/declarer/StandaloneEventProcessingDeclarer.java
@@ -28,6 +28,7 @@ import org.apache.streampipes.wrapper.params.runtime.EventProcessorRuntimeParams
import org.apache.streampipes.wrapper.standalone.ConfiguredEventProcessor;
import org.apache.streampipes.wrapper.standalone.runtime.StandaloneEventProcessorRuntime;
+@Deprecated(since = "0.70.0", forRemoval = true)
public abstract class StandaloneEventProcessingDeclarer<B extends
EventProcessorBindingParams> extends EventProcessorDeclarer<B, StandaloneEventProcessorRuntime<B>> {
diff --git a/streampipes-wrapper/src/main/java/org/apache/streampipes/wrapper/params/binding/EventProcessorBindingParams.java b/streampipes-wrapper/src/main/java/org/apache/streampipes/wrapper/params/binding/EventProcessorBindingParams.java
index f00f12dc9..79971e03a 100644
--- a/streampipes-wrapper/src/main/java/org/apache/streampipes/wrapper/params/binding/EventProcessorBindingParams.java
+++ b/streampipes-wrapper/src/main/java/org/apache/streampipes/wrapper/params/binding/EventProcessorBindingParams.java
@@ -29,6 +29,7 @@ import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
+@Deprecated(since = "0.70.0", forRemoval = true)
public abstract class EventProcessorBindingParams extends
BindingParams<DataProcessorInvocation> implements
Serializable {
diff --git a/streampipes-wrapper/src/main/java/org/apache/streampipes/wrapper/runtime/EventProcessor.java b/streampipes-wrapper/src/main/java/org/apache/streampipes/wrapper/runtime/EventProcessor.java
index bee014951..f6538321a 100644
--- a/streampipes-wrapper/src/main/java/org/apache/streampipes/wrapper/runtime/EventProcessor.java
+++ b/streampipes-wrapper/src/main/java/org/apache/streampipes/wrapper/runtime/EventProcessor.java
@@ -25,6 +25,7 @@ import org.apache.streampipes.wrapper.context.EventProcessorRuntimeContext;
import org.apache.streampipes.wrapper.params.binding.EventProcessorBindingParams;
import org.apache.streampipes.wrapper.routing.SpOutputCollector;
+@Deprecated(since = "0.70.0", forRemoval = true)
public interface EventProcessor<B extends EventProcessorBindingParams> extends
PipelineElement<B, DataProcessorInvocation> {