You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@streampipes.apache.org by bo...@apache.org on 2022/11/03 19:15:16 UTC

[incubator-streampipes] 01/01: refactored cumsum processor

This is an automated email from the ASF dual-hosted git repository.

bossenti pushed a commit to branch chore/refactor-processing-elemets
in repository https://gitbox.apache.org/repos/asf/incubator-streampipes.git

commit 5e70e39a7e8550160b87b6d35b9022a646c34bce
Author: bossenti <bo...@posteo.de>
AuthorDate: Thu Nov 3 20:10:00 2022 +0100

    refactored cumsum processor
    
    co-authored-by: smlabt <gi...@samuealbt.de>
---
 .../jvm/ChangeDetectionJvmInit.java                |   6 +-
 .../changedetection/jvm/cusum/Cusum.java           |   1 +
 .../changedetection/jvm/cusum/CusumController.java |   1 +
 .../jvm/cusum/CusumEventFields.java                |   2 +
 .../changedetection/jvm/cusum/CusumParameters.java |   1 +
 .../jvm/cusum/WelfordAggregate.java                |   1 +
 .../jvm/{cusum => welford}/WelfordAggregate.java   |   6 +-
 .../jvm/welford/WelfordChangeDetection.java        | 132 +++++++++++++++++++++
 .../jvm/welford/WelfordEventFields.java            |  19 +++
 .../icon.png                                       | Bin 0 -> 62957 bytes
 .../documentation.md                               |  59 +++++++++
 .../icon.png                                       | Bin 0 -> 62957 bytes
 .../strings.en                                     |  11 ++
 .../StandaloneEventProcessingDeclarer.java         |   1 +
 .../binding/EventProcessorBindingParams.java       |   1 +
 .../wrapper/runtime/EventProcessor.java            |   1 +
 16 files changed, 236 insertions(+), 6 deletions(-)

diff --git a/streampipes-extensions/streampipes-processors-change-detection-jvm/src/main/java/org/apache/streampipes/processors/changedetection/jvm/ChangeDetectionJvmInit.java b/streampipes-extensions/streampipes-processors-change-detection-jvm/src/main/java/org/apache/streampipes/processors/changedetection/jvm/ChangeDetectionJvmInit.java
index ef55d2832..c561bf9b7 100644
--- a/streampipes-extensions/streampipes-processors-change-detection-jvm/src/main/java/org/apache/streampipes/processors/changedetection/jvm/ChangeDetectionJvmInit.java
+++ b/streampipes-extensions/streampipes-processors-change-detection-jvm/src/main/java/org/apache/streampipes/processors/changedetection/jvm/ChangeDetectionJvmInit.java
@@ -30,6 +30,7 @@ import org.apache.streampipes.messaging.jms.SpJmsProtocolFactory;
 import org.apache.streampipes.messaging.kafka.SpKafkaProtocolFactory;
 import org.apache.streampipes.messaging.mqtt.SpMqttProtocolFactory;
 import org.apache.streampipes.processors.changedetection.jvm.cusum.CusumController;
+import org.apache.streampipes.processors.changedetection.jvm.welford.WelfordChangeDetection;
 
 public class ChangeDetectionJvmInit extends StandaloneModelSubmitter {
 
@@ -43,7 +44,10 @@ public class ChangeDetectionJvmInit extends StandaloneModelSubmitter {
                 "Processors Change Detection JVM",
                 "",
                 8090)
-                .registerPipelineElements(new CusumController())
+                .registerPipelineElements(
+                        new CusumController(),
+                        new WelfordChangeDetection()
+                )
                 .registerMessagingFormats(
                         new JsonDataFormatFactory(),
                         new CborDataFormatFactory(),
diff --git a/streampipes-extensions/streampipes-processors-change-detection-jvm/src/main/java/org/apache/streampipes/processors/changedetection/jvm/cusum/Cusum.java b/streampipes-extensions/streampipes-processors-change-detection-jvm/src/main/java/org/apache/streampipes/processors/changedetection/jvm/cusum/Cusum.java
index 8c997b833..b65a01e6e 100644
--- a/streampipes-extensions/streampipes-processors-change-detection-jvm/src/main/java/org/apache/streampipes/processors/changedetection/jvm/cusum/Cusum.java
+++ b/streampipes-extensions/streampipes-processors-change-detection-jvm/src/main/java/org/apache/streampipes/processors/changedetection/jvm/cusum/Cusum.java
@@ -24,6 +24,7 @@ import org.apache.streampipes.wrapper.context.EventProcessorRuntimeContext;
 import org.apache.streampipes.wrapper.routing.SpOutputCollector;
 import org.apache.streampipes.wrapper.runtime.EventProcessor;
 
+@Deprecated(since="0.70.0", forRemoval = true)
 public class Cusum implements EventProcessor<CusumParameters> {
 
     private String selectedNumberMapping;
diff --git a/streampipes-extensions/streampipes-processors-change-detection-jvm/src/main/java/org/apache/streampipes/processors/changedetection/jvm/cusum/CusumController.java b/streampipes-extensions/streampipes-processors-change-detection-jvm/src/main/java/org/apache/streampipes/processors/changedetection/jvm/cusum/CusumController.java
index 5fad75db9..8028fced5 100644
--- a/streampipes-extensions/streampipes-processors-change-detection-jvm/src/main/java/org/apache/streampipes/processors/changedetection/jvm/cusum/CusumController.java
+++ b/streampipes-extensions/streampipes-processors-change-detection-jvm/src/main/java/org/apache/streampipes/processors/changedetection/jvm/cusum/CusumController.java
@@ -33,6 +33,7 @@ import org.apache.streampipes.wrapper.standalone.declarer.StandaloneEventProcess
 
 import java.util.Arrays;
 
+@Deprecated(since = "0.70.0", forRemoval = true)
 public class CusumController extends StandaloneEventProcessingDeclarer<CusumParameters> {
 
     private static final String NUMBER_MAPPING = "number-mapping";
diff --git a/streampipes-extensions/streampipes-processors-change-detection-jvm/src/main/java/org/apache/streampipes/processors/changedetection/jvm/cusum/CusumEventFields.java b/streampipes-extensions/streampipes-processors-change-detection-jvm/src/main/java/org/apache/streampipes/processors/changedetection/jvm/cusum/CusumEventFields.java
index 555c38964..f3fff0b30 100644
--- a/streampipes-extensions/streampipes-processors-change-detection-jvm/src/main/java/org/apache/streampipes/processors/changedetection/jvm/cusum/CusumEventFields.java
+++ b/streampipes-extensions/streampipes-processors-change-detection-jvm/src/main/java/org/apache/streampipes/processors/changedetection/jvm/cusum/CusumEventFields.java
@@ -18,6 +18,8 @@
 
 package org.apache.streampipes.processors.changedetection.jvm.cusum;
 
+
+@Deprecated(since = "0.70.0", forRemoval = true)
 public class CusumEventFields {
 
     public static final String VAL_LOW = "cusumLow";
diff --git a/streampipes-extensions/streampipes-processors-change-detection-jvm/src/main/java/org/apache/streampipes/processors/changedetection/jvm/cusum/CusumParameters.java b/streampipes-extensions/streampipes-processors-change-detection-jvm/src/main/java/org/apache/streampipes/processors/changedetection/jvm/cusum/CusumParameters.java
index 901f5c734..d243f7a53 100644
--- a/streampipes-extensions/streampipes-processors-change-detection-jvm/src/main/java/org/apache/streampipes/processors/changedetection/jvm/cusum/CusumParameters.java
+++ b/streampipes-extensions/streampipes-processors-change-detection-jvm/src/main/java/org/apache/streampipes/processors/changedetection/jvm/cusum/CusumParameters.java
@@ -21,6 +21,7 @@ package org.apache.streampipes.processors.changedetection.jvm.cusum;
 import org.apache.streampipes.model.graph.DataProcessorInvocation;
 import org.apache.streampipes.wrapper.params.binding.EventProcessorBindingParams;
 
+@Deprecated(since = "0.70.0", forRemoval = true)
 public class CusumParameters  extends EventProcessorBindingParams {
 
     private String selectedNumberMapping;
diff --git a/streampipes-extensions/streampipes-processors-change-detection-jvm/src/main/java/org/apache/streampipes/processors/changedetection/jvm/cusum/WelfordAggregate.java b/streampipes-extensions/streampipes-processors-change-detection-jvm/src/main/java/org/apache/streampipes/processors/changedetection/jvm/cusum/WelfordAggregate.java
index 3479bb7c3..019c494c1 100644
--- a/streampipes-extensions/streampipes-processors-change-detection-jvm/src/main/java/org/apache/streampipes/processors/changedetection/jvm/cusum/WelfordAggregate.java
+++ b/streampipes-extensions/streampipes-processors-change-detection-jvm/src/main/java/org/apache/streampipes/processors/changedetection/jvm/cusum/WelfordAggregate.java
@@ -18,6 +18,7 @@
 
 package org.apache.streampipes.processors.changedetection.jvm.cusum;
 
+@Deprecated(since = "0.70.0", forRemoval = true)
 public class WelfordAggregate {
     private Integer count;
     private Double mean;
diff --git a/streampipes-extensions/streampipes-processors-change-detection-jvm/src/main/java/org/apache/streampipes/processors/changedetection/jvm/cusum/WelfordAggregate.java b/streampipes-extensions/streampipes-processors-change-detection-jvm/src/main/java/org/apache/streampipes/processors/changedetection/jvm/welford/WelfordAggregate.java
similarity index 90%
copy from streampipes-extensions/streampipes-processors-change-detection-jvm/src/main/java/org/apache/streampipes/processors/changedetection/jvm/cusum/WelfordAggregate.java
copy to streampipes-extensions/streampipes-processors-change-detection-jvm/src/main/java/org/apache/streampipes/processors/changedetection/jvm/welford/WelfordAggregate.java
index 3479bb7c3..e447c318b 100644
--- a/streampipes-extensions/streampipes-processors-change-detection-jvm/src/main/java/org/apache/streampipes/processors/changedetection/jvm/cusum/WelfordAggregate.java
+++ b/streampipes-extensions/streampipes-processors-change-detection-jvm/src/main/java/org/apache/streampipes/processors/changedetection/jvm/welford/WelfordAggregate.java
@@ -16,7 +16,7 @@
  *
  */
 
-package org.apache.streampipes.processors.changedetection.jvm.cusum;
+package org.apache.streampipes.processors.changedetection.jvm.welford;
 
 public class WelfordAggregate {
     private Integer count;
@@ -52,8 +52,4 @@ public class WelfordAggregate {
     public Double getSampleStd() {
         return Math.sqrt(getSampleVariance());
     }
-
-    public Double getPopulationStd() {
-        return Math.sqrt(getPopulationVariance());
-    }
 }
diff --git a/streampipes-extensions/streampipes-processors-change-detection-jvm/src/main/java/org/apache/streampipes/processors/changedetection/jvm/welford/WelfordChangeDetection.java b/streampipes-extensions/streampipes-processors-change-detection-jvm/src/main/java/org/apache/streampipes/processors/changedetection/jvm/welford/WelfordChangeDetection.java
new file mode 100644
index 000000000..e084377fd
--- /dev/null
+++ b/streampipes-extensions/streampipes-processors-change-detection-jvm/src/main/java/org/apache/streampipes/processors/changedetection/jvm/welford/WelfordChangeDetection.java
@@ -0,0 +1,132 @@
+package org.apache.streampipes.processors.changedetection.jvm.welford;
+
+import org.apache.streampipes.commons.exceptions.SpRuntimeException;
+import org.apache.streampipes.model.DataProcessorType;
+import org.apache.streampipes.model.graph.DataProcessorDescription;
+import org.apache.streampipes.model.runtime.Event;
+import org.apache.streampipes.model.schema.PropertyScope;
+import org.apache.streampipes.processors.changedetection.jvm.cusum.CusumEventFields;
+import org.apache.streampipes.sdk.builder.ProcessingElementBuilder;
+import org.apache.streampipes.sdk.builder.StreamRequirementsBuilder;
+import org.apache.streampipes.sdk.extractor.ProcessingElementParameterExtractor;
+import org.apache.streampipes.sdk.helpers.*;
+import org.apache.streampipes.sdk.utils.Assets;
+import org.apache.streampipes.vocabulary.SO;
+import org.apache.streampipes.wrapper.context.EventProcessorRuntimeContext;
+import org.apache.streampipes.wrapper.routing.SpOutputCollector;
+import org.apache.streampipes.wrapper.standalone.ProcessorParams;
+import org.apache.streampipes.wrapper.standalone.StreamPipesDataProcessor;
+
+import java.util.Arrays;
+
+public class WelfordChangeDetection extends StreamPipesDataProcessor {
+
+    private static final String NUMBER_MAPPING = "number-mapping";
+    private static final String PARAM_K = "param-k";
+    private static final String PARAM_H = "param-h";
+
+    private String selectedNumberMapping;
+    private Double k;
+    private Double h;
+    private Double cuSumLow;
+    private Double cuSumHigh;
+    private WelfordAggregate welfordAggregate;
+
+    @Override
+    public DataProcessorDescription declareModel() {
+        return ProcessingElementBuilder.create("org.apache.streampipes.processors.changedetection.jvm.welford")
+                .category(DataProcessorType.VALUE_OBSERVER)
+                .withAssets(Assets.DOCUMENTATION)
+                .withAssets(Assets.ICON)
+                .withLocales(Locales.EN)
+                .requiredStream(StreamRequirementsBuilder
+                        .create()
+                        .requiredPropertyWithUnaryMapping(EpRequirements.numberReq(),
+                                Labels.withId(NUMBER_MAPPING),
+                                PropertyScope.NONE).build())
+                .requiredFloatParameter(Labels.withId(PARAM_K), 0.0f, 0.0f, 100.0f, 0.01f)
+                .requiredFloatParameter(Labels.withId(PARAM_H), 0.0f, 0.0f, 100.0f, 0.01f)
+                .outputStrategy(
+                        OutputStrategies.append(
+                                Arrays.asList(
+                                        EpProperties.numberEp(Labels.empty(), WelfordEventFields.VAL_LOW.label, SO.Number),
+                                        EpProperties.numberEp(Labels.empty(), WelfordEventFields.VAL_HIGH.label, SO.Number),
+                                        EpProperties.booleanEp(Labels.empty(), WelfordEventFields.DECISION_LOW.label, SO.Boolean),
+                                        EpProperties.booleanEp(Labels.empty(), WelfordEventFields.DECISION_HIGH.label, SO.Boolean)
+                                )
+                        ))
+                .build();
+    }
+
+    @Override
+    public void onInvocation(ProcessorParams parameters, SpOutputCollector spOutputCollector, EventProcessorRuntimeContext runtimeContext) throws SpRuntimeException {
+
+        ProcessingElementParameterExtractor extractor = parameters.extractor();
+        this.selectedNumberMapping = extractor.singleValueParameter(NUMBER_MAPPING, String.class);
+        this.k = extractor.singleValueParameter(PARAM_K, Double.class);
+        this.h = extractor.singleValueParameter(PARAM_H, Double.class);
+        this.cuSumLow = 0.0;
+        this.cuSumHigh = 0.0;
+        this.welfordAggregate = new WelfordAggregate();
+
+    }
+
+    @Override
+    public void onEvent(Event event, SpOutputCollector collector) throws SpRuntimeException {
+
+        Double number = event.getFieldBySelector(selectedNumberMapping).getAsPrimitive().getAsDouble();
+        welfordAggregate.update(number);  // update mean and standard deviation
+        Double normalized = getZScoreNormalizedValue(number);
+        updateStatistics(normalized);
+
+
+        Boolean isChangeHigh = getTestResult(this.cuSumHigh, h);
+        Boolean isChangeLow = getTestResult(this.cuSumLow, h);
+
+        Event updatedEvent = updateEvent(event, this.cuSumLow, this.cuSumHigh, isChangeLow, isChangeHigh);
+        collector.collect(updatedEvent);
+
+        if (isChangeHigh || isChangeLow) {
+            resetAfterChange();
+        }
+
+    }
+
+    @Override
+    public void onDetach() throws SpRuntimeException {
+        this.cuSumLow = 0.0;
+        this.cuSumHigh = 0.0;
+    }
+
+    private Double getZScoreNormalizedValue(Double value) {
+        Double mean = welfordAggregate.getMean();
+        Double std = welfordAggregate.getSampleStd();
+        return (value - mean) / std;
+    }
+
+    private void updateStatistics(Double newValue) {
+        if (newValue.isNaN()) {
+            return;
+        }
+        this.cuSumHigh = Math.max(0, this.cuSumHigh + newValue - k);
+        this.cuSumLow = Math.min(0, this.cuSumLow + newValue + k);
+    }
+
+    private Boolean getTestResult(Double cusum, Double h) {
+        return Math.abs(cusum) > this.h;
+    }
+
+    private Event updateEvent(Event event, Double cusumLow, Double cusumHigh, Boolean decisionLow, Boolean decisionHigh) {
+        event.addField(WelfordEventFields.VAL_LOW.label, cusumLow);
+        event.addField(WelfordEventFields.VAL_HIGH.label, cusumHigh);
+        event.addField(WelfordEventFields.DECISION_LOW.label, decisionLow);
+        event.addField(WelfordEventFields.DECISION_HIGH.label, decisionHigh);
+        return event;
+    }
+
+    private void resetAfterChange() {
+        this.cuSumHigh = 0.0;
+        this.cuSumLow = 0.0;
+        welfordAggregate = new WelfordAggregate();
+    }
+}
diff --git a/streampipes-extensions/streampipes-processors-change-detection-jvm/src/main/java/org/apache/streampipes/processors/changedetection/jvm/welford/WelfordEventFields.java b/streampipes-extensions/streampipes-processors-change-detection-jvm/src/main/java/org/apache/streampipes/processors/changedetection/jvm/welford/WelfordEventFields.java
new file mode 100644
index 000000000..b4e7098dc
--- /dev/null
+++ b/streampipes-extensions/streampipes-processors-change-detection-jvm/src/main/java/org/apache/streampipes/processors/changedetection/jvm/welford/WelfordEventFields.java
@@ -0,0 +1,19 @@
+package org.apache.streampipes.processors.changedetection.jvm.welford;
+
+public enum WelfordEventFields {
+    VAL_LOW("cumSumLow"),
+    VAL_HIGH("cumSumHigh"),
+    DECISION_LOW("changeDetectedLow"),
+    DECISION_HIGH("changeDetectedHigh");
+
+    public final String label;
+
+    WelfordEventFields(String label) {
+        this.label = label;
+    }
+
+    @Override
+    public String toString() {
+        return this.label;
+    }
+}
diff --git a/streampipes-extensions/streampipes-processors-change-detection-jvm/src/main/resources/org.apache.streampipes.processors.changedetection.jvm.cusum/icon.png b/streampipes-extensions/streampipes-processors-change-detection-jvm/src/main/resources/org.apache.streampipes.processors.changedetection.jvm.cusum/icon.png
new file mode 100644
index 000000000..444fa4e2c
Binary files /dev/null and b/streampipes-extensions/streampipes-processors-change-detection-jvm/src/main/resources/org.apache.streampipes.processors.changedetection.jvm.cusum/icon.png differ
diff --git a/streampipes-extensions/streampipes-processors-change-detection-jvm/src/main/resources/org.apache.streampipes.processors.changedetection.jvm.welford/documentation.md b/streampipes-extensions/streampipes-processors-change-detection-jvm/src/main/resources/org.apache.streampipes.processors.changedetection.jvm.welford/documentation.md
new file mode 100644
index 000000000..9dfaf814d
--- /dev/null
+++ b/streampipes-extensions/streampipes-processors-change-detection-jvm/src/main/resources/org.apache.streampipes.processors.changedetection.jvm.welford/documentation.md
@@ -0,0 +1,59 @@
+<!--
+  ~ Licensed to the Apache Software Foundation (ASF) under one or more
+  ~ contributor license agreements.  See the NOTICE file distributed with
+  ~ this work for additional information regarding copyright ownership.
+  ~ The ASF licenses this file to You under the Apache License, Version 2.0
+  ~ (the "License"); you may not use this file except in compliance with
+  ~ the License.  You may obtain a copy of the License at
+  ~
+  ~    http://www.apache.org/licenses/LICENSE-2.0
+  ~
+  ~ Unless required by applicable law or agreed to in writing, software
+  ~ distributed under the License is distributed on an "AS IS" BASIS,
+  ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  ~ See the License for the specific language governing permissions and
+  ~ limitations under the License.
+  ~
+  -->
+
+## Cusum (Cumulative Sum)
+
+<!--
+<p align="center"> 
+    <img src="icon.png" width="150px;" class="pe-image-documentation"/>
+</p>
+-->
+
+***
+
+## Description
+
+Performs change detection on a single dimension of the incoming data stream. A change is detected if the cumulative deviation from the mean exceeds a certain threshold. This implementation tracks the mean and the standard deviation using Welford's algorithm, which is well suited for data streams.
+
+***
+
+## Required input
+
+The welford dectection processor requires a data stream that has at least one field containing a numerical value.
+
+***
+
+## Configuration
+
+### Value to observe
+Specify the dimension of the data stream (e.g. the temperature) on which to perform change detection. 
+
+### Parameter `k`
+`k` controls the sensitivity of the change detector. Its unit are standard deviations. For an observation `x_n`, the Cusum value is `S_n = max(0, S_{n-1} - z-score(x_n) - k)`. Thus, the cusum-score `S` icnreases if `S_{n-1} - z-score(x_n) > k`. 
+
+### Parameter `h`
+The alarm theshold in standard deviations. An alarm occurs if `S_n > h` 
+
+## Output
+
+This processor outputs the original data stream plus 
+
+- `cusumLow`: The cusum value for negative changes
+- `cusumHigh`: The cusum value for positive changes
+- `changeDetectedLow`: Boolean indicating if a negative change was detected
+- `changeDetectedHigh`: Boolean indicating if a positive change was detected
\ No newline at end of file
diff --git a/streampipes-extensions/streampipes-processors-change-detection-jvm/src/main/resources/org.apache.streampipes.processors.changedetection.jvm.welford/icon.png b/streampipes-extensions/streampipes-processors-change-detection-jvm/src/main/resources/org.apache.streampipes.processors.changedetection.jvm.welford/icon.png
new file mode 100644
index 000000000..444fa4e2c
Binary files /dev/null and b/streampipes-extensions/streampipes-processors-change-detection-jvm/src/main/resources/org.apache.streampipes.processors.changedetection.jvm.welford/icon.png differ
diff --git a/streampipes-extensions/streampipes-processors-change-detection-jvm/src/main/resources/org.apache.streampipes.processors.changedetection.jvm.welford/strings.en b/streampipes-extensions/streampipes-processors-change-detection-jvm/src/main/resources/org.apache.streampipes.processors.changedetection.jvm.welford/strings.en
new file mode 100644
index 000000000..72b38eee7
--- /dev/null
+++ b/streampipes-extensions/streampipes-processors-change-detection-jvm/src/main/resources/org.apache.streampipes.processors.changedetection.jvm.welford/strings.en
@@ -0,0 +1,11 @@
+org.apache.streampipes.processors.changedetection.jvm.cusum.title=Cusum
+org.apache.streampipes.processors.changedetection.jvm.cusum.description=
+
+number-mapping.title=Value to observe
+number-mapping.description=Specifies the monitored dimension.
+
+param-k.title=Parameter k
+param-k.description=The sensitivity parameter. High value indicates low sensitivity. Unit: Standard deviations
+
+param-h.title=Parameter h
+param-h.description=The threshold above which a change is detected. Unit: Standard deviations
\ No newline at end of file
diff --git a/streampipes-wrapper-standalone/src/main/java/org/apache/streampipes/wrapper/standalone/declarer/StandaloneEventProcessingDeclarer.java b/streampipes-wrapper-standalone/src/main/java/org/apache/streampipes/wrapper/standalone/declarer/StandaloneEventProcessingDeclarer.java
index 9cba63920..7bbc6d84c 100644
--- a/streampipes-wrapper-standalone/src/main/java/org/apache/streampipes/wrapper/standalone/declarer/StandaloneEventProcessingDeclarer.java
+++ b/streampipes-wrapper-standalone/src/main/java/org/apache/streampipes/wrapper/standalone/declarer/StandaloneEventProcessingDeclarer.java
@@ -28,6 +28,7 @@ import org.apache.streampipes.wrapper.params.runtime.EventProcessorRuntimeParams
 import org.apache.streampipes.wrapper.standalone.ConfiguredEventProcessor;
 import org.apache.streampipes.wrapper.standalone.runtime.StandaloneEventProcessorRuntime;
 
+@Deprecated(since = "0.70.0", forRemoval = true)
 public abstract class StandaloneEventProcessingDeclarer<B extends
         EventProcessorBindingParams> extends EventProcessorDeclarer<B, StandaloneEventProcessorRuntime<B>> {
 
diff --git a/streampipes-wrapper/src/main/java/org/apache/streampipes/wrapper/params/binding/EventProcessorBindingParams.java b/streampipes-wrapper/src/main/java/org/apache/streampipes/wrapper/params/binding/EventProcessorBindingParams.java
index f00f12dc9..79971e03a 100644
--- a/streampipes-wrapper/src/main/java/org/apache/streampipes/wrapper/params/binding/EventProcessorBindingParams.java
+++ b/streampipes-wrapper/src/main/java/org/apache/streampipes/wrapper/params/binding/EventProcessorBindingParams.java
@@ -29,6 +29,7 @@ import java.util.List;
 import java.util.Map;
 import java.util.stream.Collectors;
 
+@Deprecated(since = "0.70.0", forRemoval = true)
 public abstract class EventProcessorBindingParams extends
         BindingParams<DataProcessorInvocation> implements
         Serializable {
diff --git a/streampipes-wrapper/src/main/java/org/apache/streampipes/wrapper/runtime/EventProcessor.java b/streampipes-wrapper/src/main/java/org/apache/streampipes/wrapper/runtime/EventProcessor.java
index bee014951..f6538321a 100644
--- a/streampipes-wrapper/src/main/java/org/apache/streampipes/wrapper/runtime/EventProcessor.java
+++ b/streampipes-wrapper/src/main/java/org/apache/streampipes/wrapper/runtime/EventProcessor.java
@@ -25,6 +25,7 @@ import org.apache.streampipes.wrapper.context.EventProcessorRuntimeContext;
 import org.apache.streampipes.wrapper.params.binding.EventProcessorBindingParams;
 import org.apache.streampipes.wrapper.routing.SpOutputCollector;
 
+@Deprecated(since = "0.70.0", forRemoval = true)
 public interface EventProcessor<B extends EventProcessorBindingParams> extends
         PipelineElement<B, DataProcessorInvocation> {