You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@streampipes.apache.org by bo...@apache.org on 2022/11/03 19:26:31 UTC

[incubator-streampipes] branch chore/refactor-processing-elemets updated (f78f18467 -> 18c306c55)

This is an automated email from the ASF dual-hosted git repository.

bossenti pushed a change to branch chore/refactor-processing-elemets
in repository https://gitbox.apache.org/repos/asf/incubator-streampipes.git


 discard f78f18467 refactor cumsum processor
     new 18c306c55 refactor cumsum processor

This update added new revisions after undoing existing revisions.
That is to say, some revisions that were in the old version of the
branch are not in the new version.  This situation occurs
when a user --force pushes a change and generates a repository
containing something like this:

 * -- * -- B -- O -- O -- O   (f78f18467)
            \
             N -- N -- N   refs/heads/chore/refactor-processing-elemets (18c306c55)

You should already have received notification emails for all of the O
revisions, and so the following emails describe only the N revisions
from the common base, B.

Any revisions marked "omit" are not gone; other references still
refer to them.  Any revisions marked "discard" are gone forever.

The 1 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:


[incubator-streampipes] 01/01: refactor cumsum processor

Posted by bo...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

bossenti pushed a commit to branch chore/refactor-processing-elemets
in repository https://gitbox.apache.org/repos/asf/incubator-streampipes.git

commit 18c306c55437ff1e12bbeea165102da324f94e54
Author: bossenti <bo...@posteo.de>
AuthorDate: Thu Nov 3 20:22:34 2022 +0100

    refactor cumsum processor
    
    Co-authored-by: smlabt <gi...@samuelabt.de>
---
 .../jvm/ChangeDetectionJvmInit.java                |   6 +-
 .../changedetection/jvm/cusum/Cusum.java           |   1 +
 .../changedetection/jvm/cusum/CusumController.java |   1 +
 .../jvm/cusum/CusumEventFields.java                |   2 +
 .../changedetection/jvm/cusum/CusumParameters.java |   1 +
 .../jvm/cusum/WelfordAggregate.java                |   1 +
 .../jvm/{cusum => welford}/WelfordAggregate.java   |   6 +-
 .../jvm/welford/WelfordChangeDetection.java        | 132 +++++++++++++++++++++
 .../jvm/welford/WelfordEventFields.java            |  19 +++
 .../icon.png                                       | Bin 0 -> 62957 bytes
 .../documentation.md                               |  59 +++++++++
 .../icon.png                                       | Bin 0 -> 62957 bytes
 .../strings.en                                     |  11 ++
 .../StandaloneEventProcessingDeclarer.java         |   1 +
 .../binding/EventProcessorBindingParams.java       |   1 +
 .../wrapper/runtime/EventProcessor.java            |   1 +
 16 files changed, 236 insertions(+), 6 deletions(-)

diff --git a/streampipes-extensions/streampipes-processors-change-detection-jvm/src/main/java/org/apache/streampipes/processors/changedetection/jvm/ChangeDetectionJvmInit.java b/streampipes-extensions/streampipes-processors-change-detection-jvm/src/main/java/org/apache/streampipes/processors/changedetection/jvm/ChangeDetectionJvmInit.java
index ef55d2832..c561bf9b7 100644
--- a/streampipes-extensions/streampipes-processors-change-detection-jvm/src/main/java/org/apache/streampipes/processors/changedetection/jvm/ChangeDetectionJvmInit.java
+++ b/streampipes-extensions/streampipes-processors-change-detection-jvm/src/main/java/org/apache/streampipes/processors/changedetection/jvm/ChangeDetectionJvmInit.java
@@ -30,6 +30,7 @@ import org.apache.streampipes.messaging.jms.SpJmsProtocolFactory;
 import org.apache.streampipes.messaging.kafka.SpKafkaProtocolFactory;
 import org.apache.streampipes.messaging.mqtt.SpMqttProtocolFactory;
 import org.apache.streampipes.processors.changedetection.jvm.cusum.CusumController;
+import org.apache.streampipes.processors.changedetection.jvm.welford.WelfordChangeDetection;
 
 public class ChangeDetectionJvmInit extends StandaloneModelSubmitter {
 
@@ -43,7 +44,10 @@ public class ChangeDetectionJvmInit extends StandaloneModelSubmitter {
                 "Processors Change Detection JVM",
                 "",
                 8090)
-                .registerPipelineElements(new CusumController())
+                .registerPipelineElements(
+                        new CusumController(),
+                        new WelfordChangeDetection()
+                )
                 .registerMessagingFormats(
                         new JsonDataFormatFactory(),
                         new CborDataFormatFactory(),
diff --git a/streampipes-extensions/streampipes-processors-change-detection-jvm/src/main/java/org/apache/streampipes/processors/changedetection/jvm/cusum/Cusum.java b/streampipes-extensions/streampipes-processors-change-detection-jvm/src/main/java/org/apache/streampipes/processors/changedetection/jvm/cusum/Cusum.java
index 8c997b833..b65a01e6e 100644
--- a/streampipes-extensions/streampipes-processors-change-detection-jvm/src/main/java/org/apache/streampipes/processors/changedetection/jvm/cusum/Cusum.java
+++ b/streampipes-extensions/streampipes-processors-change-detection-jvm/src/main/java/org/apache/streampipes/processors/changedetection/jvm/cusum/Cusum.java
@@ -24,6 +24,7 @@ import org.apache.streampipes.wrapper.context.EventProcessorRuntimeContext;
 import org.apache.streampipes.wrapper.routing.SpOutputCollector;
 import org.apache.streampipes.wrapper.runtime.EventProcessor;
 
+@Deprecated(since="0.70.0", forRemoval = true)
 public class Cusum implements EventProcessor<CusumParameters> {
 
     private String selectedNumberMapping;
diff --git a/streampipes-extensions/streampipes-processors-change-detection-jvm/src/main/java/org/apache/streampipes/processors/changedetection/jvm/cusum/CusumController.java b/streampipes-extensions/streampipes-processors-change-detection-jvm/src/main/java/org/apache/streampipes/processors/changedetection/jvm/cusum/CusumController.java
index 5fad75db9..8028fced5 100644
--- a/streampipes-extensions/streampipes-processors-change-detection-jvm/src/main/java/org/apache/streampipes/processors/changedetection/jvm/cusum/CusumController.java
+++ b/streampipes-extensions/streampipes-processors-change-detection-jvm/src/main/java/org/apache/streampipes/processors/changedetection/jvm/cusum/CusumController.java
@@ -33,6 +33,7 @@ import org.apache.streampipes.wrapper.standalone.declarer.StandaloneEventProcess
 
 import java.util.Arrays;
 
+@Deprecated(since = "0.70.0", forRemoval = true)
 public class CusumController extends StandaloneEventProcessingDeclarer<CusumParameters> {
 
     private static final String NUMBER_MAPPING = "number-mapping";
diff --git a/streampipes-extensions/streampipes-processors-change-detection-jvm/src/main/java/org/apache/streampipes/processors/changedetection/jvm/cusum/CusumEventFields.java b/streampipes-extensions/streampipes-processors-change-detection-jvm/src/main/java/org/apache/streampipes/processors/changedetection/jvm/cusum/CusumEventFields.java
index 555c38964..f3fff0b30 100644
--- a/streampipes-extensions/streampipes-processors-change-detection-jvm/src/main/java/org/apache/streampipes/processors/changedetection/jvm/cusum/CusumEventFields.java
+++ b/streampipes-extensions/streampipes-processors-change-detection-jvm/src/main/java/org/apache/streampipes/processors/changedetection/jvm/cusum/CusumEventFields.java
@@ -18,6 +18,8 @@
 
 package org.apache.streampipes.processors.changedetection.jvm.cusum;
 
+
+@Deprecated(since = "0.70.0", forRemoval = true)
 public class CusumEventFields {
 
     public static final String VAL_LOW = "cusumLow";
diff --git a/streampipes-extensions/streampipes-processors-change-detection-jvm/src/main/java/org/apache/streampipes/processors/changedetection/jvm/cusum/CusumParameters.java b/streampipes-extensions/streampipes-processors-change-detection-jvm/src/main/java/org/apache/streampipes/processors/changedetection/jvm/cusum/CusumParameters.java
index 901f5c734..d243f7a53 100644
--- a/streampipes-extensions/streampipes-processors-change-detection-jvm/src/main/java/org/apache/streampipes/processors/changedetection/jvm/cusum/CusumParameters.java
+++ b/streampipes-extensions/streampipes-processors-change-detection-jvm/src/main/java/org/apache/streampipes/processors/changedetection/jvm/cusum/CusumParameters.java
@@ -21,6 +21,7 @@ package org.apache.streampipes.processors.changedetection.jvm.cusum;
 import org.apache.streampipes.model.graph.DataProcessorInvocation;
 import org.apache.streampipes.wrapper.params.binding.EventProcessorBindingParams;
 
+@Deprecated(since = "0.70.0", forRemoval = true)
 public class CusumParameters  extends EventProcessorBindingParams {
 
     private String selectedNumberMapping;
diff --git a/streampipes-extensions/streampipes-processors-change-detection-jvm/src/main/java/org/apache/streampipes/processors/changedetection/jvm/cusum/WelfordAggregate.java b/streampipes-extensions/streampipes-processors-change-detection-jvm/src/main/java/org/apache/streampipes/processors/changedetection/jvm/cusum/WelfordAggregate.java
index 3479bb7c3..019c494c1 100644
--- a/streampipes-extensions/streampipes-processors-change-detection-jvm/src/main/java/org/apache/streampipes/processors/changedetection/jvm/cusum/WelfordAggregate.java
+++ b/streampipes-extensions/streampipes-processors-change-detection-jvm/src/main/java/org/apache/streampipes/processors/changedetection/jvm/cusum/WelfordAggregate.java
@@ -18,6 +18,7 @@
 
 package org.apache.streampipes.processors.changedetection.jvm.cusum;
 
+@Deprecated(since = "0.70.0", forRemoval = true)
 public class WelfordAggregate {
     private Integer count;
     private Double mean;
diff --git a/streampipes-extensions/streampipes-processors-change-detection-jvm/src/main/java/org/apache/streampipes/processors/changedetection/jvm/cusum/WelfordAggregate.java b/streampipes-extensions/streampipes-processors-change-detection-jvm/src/main/java/org/apache/streampipes/processors/changedetection/jvm/welford/WelfordAggregate.java
similarity index 90%
copy from streampipes-extensions/streampipes-processors-change-detection-jvm/src/main/java/org/apache/streampipes/processors/changedetection/jvm/cusum/WelfordAggregate.java
copy to streampipes-extensions/streampipes-processors-change-detection-jvm/src/main/java/org/apache/streampipes/processors/changedetection/jvm/welford/WelfordAggregate.java
index 3479bb7c3..e447c318b 100644
--- a/streampipes-extensions/streampipes-processors-change-detection-jvm/src/main/java/org/apache/streampipes/processors/changedetection/jvm/cusum/WelfordAggregate.java
+++ b/streampipes-extensions/streampipes-processors-change-detection-jvm/src/main/java/org/apache/streampipes/processors/changedetection/jvm/welford/WelfordAggregate.java
@@ -16,7 +16,7 @@
  *
  */
 
-package org.apache.streampipes.processors.changedetection.jvm.cusum;
+package org.apache.streampipes.processors.changedetection.jvm.welford;
 
 public class WelfordAggregate {
     private Integer count;
@@ -52,8 +52,4 @@ public class WelfordAggregate {
     public Double getSampleStd() {
         return Math.sqrt(getSampleVariance());
     }
-
-    public Double getPopulationStd() {
-        return Math.sqrt(getPopulationVariance());
-    }
 }
diff --git a/streampipes-extensions/streampipes-processors-change-detection-jvm/src/main/java/org/apache/streampipes/processors/changedetection/jvm/welford/WelfordChangeDetection.java b/streampipes-extensions/streampipes-processors-change-detection-jvm/src/main/java/org/apache/streampipes/processors/changedetection/jvm/welford/WelfordChangeDetection.java
new file mode 100644
index 000000000..e084377fd
--- /dev/null
+++ b/streampipes-extensions/streampipes-processors-change-detection-jvm/src/main/java/org/apache/streampipes/processors/changedetection/jvm/welford/WelfordChangeDetection.java
@@ -0,0 +1,132 @@
+package org.apache.streampipes.processors.changedetection.jvm.welford;
+
+import org.apache.streampipes.commons.exceptions.SpRuntimeException;
+import org.apache.streampipes.model.DataProcessorType;
+import org.apache.streampipes.model.graph.DataProcessorDescription;
+import org.apache.streampipes.model.runtime.Event;
+import org.apache.streampipes.model.schema.PropertyScope;
+import org.apache.streampipes.processors.changedetection.jvm.cusum.CusumEventFields;
+import org.apache.streampipes.sdk.builder.ProcessingElementBuilder;
+import org.apache.streampipes.sdk.builder.StreamRequirementsBuilder;
+import org.apache.streampipes.sdk.extractor.ProcessingElementParameterExtractor;
+import org.apache.streampipes.sdk.helpers.*;
+import org.apache.streampipes.sdk.utils.Assets;
+import org.apache.streampipes.vocabulary.SO;
+import org.apache.streampipes.wrapper.context.EventProcessorRuntimeContext;
+import org.apache.streampipes.wrapper.routing.SpOutputCollector;
+import org.apache.streampipes.wrapper.standalone.ProcessorParams;
+import org.apache.streampipes.wrapper.standalone.StreamPipesDataProcessor;
+
+import java.util.Arrays;
+
+public class WelfordChangeDetection extends StreamPipesDataProcessor {
+
+    private static final String NUMBER_MAPPING = "number-mapping";
+    private static final String PARAM_K = "param-k";
+    private static final String PARAM_H = "param-h";
+
+    private String selectedNumberMapping;
+    private Double k;
+    private Double h;
+    private Double cuSumLow;
+    private Double cuSumHigh;
+    private WelfordAggregate welfordAggregate;
+
+    @Override
+    public DataProcessorDescription declareModel() {
+        return ProcessingElementBuilder.create("org.apache.streampipes.processors.changedetection.jvm.welford")
+                .category(DataProcessorType.VALUE_OBSERVER)
+                .withAssets(Assets.DOCUMENTATION)
+                .withAssets(Assets.ICON)
+                .withLocales(Locales.EN)
+                .requiredStream(StreamRequirementsBuilder
+                        .create()
+                        .requiredPropertyWithUnaryMapping(EpRequirements.numberReq(),
+                                Labels.withId(NUMBER_MAPPING),
+                                PropertyScope.NONE).build())
+                .requiredFloatParameter(Labels.withId(PARAM_K), 0.0f, 0.0f, 100.0f, 0.01f)
+                .requiredFloatParameter(Labels.withId(PARAM_H), 0.0f, 0.0f, 100.0f, 0.01f)
+                .outputStrategy(
+                        OutputStrategies.append(
+                                Arrays.asList(
+                                        EpProperties.numberEp(Labels.empty(), WelfordEventFields.VAL_LOW.label, SO.Number),
+                                        EpProperties.numberEp(Labels.empty(), WelfordEventFields.VAL_HIGH.label, SO.Number),
+                                        EpProperties.booleanEp(Labels.empty(), WelfordEventFields.DECISION_LOW.label, SO.Boolean),
+                                        EpProperties.booleanEp(Labels.empty(), WelfordEventFields.DECISION_HIGH.label, SO.Boolean)
+                                )
+                        ))
+                .build();
+    }
+
+    @Override
+    public void onInvocation(ProcessorParams parameters, SpOutputCollector spOutputCollector, EventProcessorRuntimeContext runtimeContext) throws SpRuntimeException {
+
+        ProcessingElementParameterExtractor extractor = parameters.extractor();
+        this.selectedNumberMapping = extractor.singleValueParameter(NUMBER_MAPPING, String.class);
+        this.k = extractor.singleValueParameter(PARAM_K, Double.class);
+        this.h = extractor.singleValueParameter(PARAM_H, Double.class);
+        this.cuSumLow = 0.0;
+        this.cuSumHigh = 0.0;
+        this.welfordAggregate = new WelfordAggregate();
+
+    }
+
+    @Override
+    public void onEvent(Event event, SpOutputCollector collector) throws SpRuntimeException {
+
+        Double number = event.getFieldBySelector(selectedNumberMapping).getAsPrimitive().getAsDouble();
+        welfordAggregate.update(number);  // update mean and standard deviation
+        Double normalized = getZScoreNormalizedValue(number);
+        updateStatistics(normalized);
+
+
+        Boolean isChangeHigh = getTestResult(this.cuSumHigh, h);
+        Boolean isChangeLow = getTestResult(this.cuSumLow, h);
+
+        Event updatedEvent = updateEvent(event, this.cuSumLow, this.cuSumHigh, isChangeLow, isChangeHigh);
+        collector.collect(updatedEvent);
+
+        if (isChangeHigh || isChangeLow) {
+            resetAfterChange();
+        }
+
+    }
+
+    @Override
+    public void onDetach() throws SpRuntimeException {
+        this.cuSumLow = 0.0;
+        this.cuSumHigh = 0.0;
+    }
+
+    private Double getZScoreNormalizedValue(Double value) {
+        Double mean = welfordAggregate.getMean();
+        Double std = welfordAggregate.getSampleStd();
+        return (value - mean) / std;
+    }
+
+    private void updateStatistics(Double newValue) {
+        if (newValue.isNaN()) {
+            return;
+        }
+        this.cuSumHigh = Math.max(0, this.cuSumHigh + newValue - k);
+        this.cuSumLow = Math.min(0, this.cuSumLow + newValue + k);
+    }
+
+    private Boolean getTestResult(Double cusum, Double h) {
+        return Math.abs(cusum) > this.h;
+    }
+
+    private Event updateEvent(Event event, Double cusumLow, Double cusumHigh, Boolean decisionLow, Boolean decisionHigh) {
+        event.addField(WelfordEventFields.VAL_LOW.label, cusumLow);
+        event.addField(WelfordEventFields.VAL_HIGH.label, cusumHigh);
+        event.addField(WelfordEventFields.DECISION_LOW.label, decisionLow);
+        event.addField(WelfordEventFields.DECISION_HIGH.label, decisionHigh);
+        return event;
+    }
+
+    private void resetAfterChange() {
+        this.cuSumHigh = 0.0;
+        this.cuSumLow = 0.0;
+        welfordAggregate = new WelfordAggregate();
+    }
+}
diff --git a/streampipes-extensions/streampipes-processors-change-detection-jvm/src/main/java/org/apache/streampipes/processors/changedetection/jvm/welford/WelfordEventFields.java b/streampipes-extensions/streampipes-processors-change-detection-jvm/src/main/java/org/apache/streampipes/processors/changedetection/jvm/welford/WelfordEventFields.java
new file mode 100644
index 000000000..b4e7098dc
--- /dev/null
+++ b/streampipes-extensions/streampipes-processors-change-detection-jvm/src/main/java/org/apache/streampipes/processors/changedetection/jvm/welford/WelfordEventFields.java
@@ -0,0 +1,19 @@
+package org.apache.streampipes.processors.changedetection.jvm.welford;
+
+public enum WelfordEventFields {
+    VAL_LOW("cumSumLow"),
+    VAL_HIGH("cumSumHigh"),
+    DECISION_LOW("changeDetectedLow"),
+    DECISION_HIGH("changeDetectedHigh");
+
+    public final String label;
+
+    WelfordEventFields(String label) {
+        this.label = label;
+    }
+
+    @Override
+    public String toString() {
+        return this.label;
+    }
+}
diff --git a/streampipes-extensions/streampipes-processors-change-detection-jvm/src/main/resources/org.apache.streampipes.processors.changedetection.jvm.cusum/icon.png b/streampipes-extensions/streampipes-processors-change-detection-jvm/src/main/resources/org.apache.streampipes.processors.changedetection.jvm.cusum/icon.png
new file mode 100644
index 000000000..444fa4e2c
Binary files /dev/null and b/streampipes-extensions/streampipes-processors-change-detection-jvm/src/main/resources/org.apache.streampipes.processors.changedetection.jvm.cusum/icon.png differ
diff --git a/streampipes-extensions/streampipes-processors-change-detection-jvm/src/main/resources/org.apache.streampipes.processors.changedetection.jvm.welford/documentation.md b/streampipes-extensions/streampipes-processors-change-detection-jvm/src/main/resources/org.apache.streampipes.processors.changedetection.jvm.welford/documentation.md
new file mode 100644
index 000000000..9dfaf814d
--- /dev/null
+++ b/streampipes-extensions/streampipes-processors-change-detection-jvm/src/main/resources/org.apache.streampipes.processors.changedetection.jvm.welford/documentation.md
@@ -0,0 +1,59 @@
+<!--
+  ~ Licensed to the Apache Software Foundation (ASF) under one or more
+  ~ contributor license agreements.  See the NOTICE file distributed with
+  ~ this work for additional information regarding copyright ownership.
+  ~ The ASF licenses this file to You under the Apache License, Version 2.0
+  ~ (the "License"); you may not use this file except in compliance with
+  ~ the License.  You may obtain a copy of the License at
+  ~
+  ~    http://www.apache.org/licenses/LICENSE-2.0
+  ~
+  ~ Unless required by applicable law or agreed to in writing, software
+  ~ distributed under the License is distributed on an "AS IS" BASIS,
+  ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  ~ See the License for the specific language governing permissions and
+  ~ limitations under the License.
+  ~
+  -->
+
+## Cusum (Cumulative Sum)
+
+<!--
+<p align="center"> 
+    <img src="icon.png" width="150px;" class="pe-image-documentation"/>
+</p>
+-->
+
+***
+
+## Description
+
+Performs change detection on a single dimension of the incoming data stream. A change is detected if the cumulative deviation from the mean exceeds a certain threshold. This implementation tracks the mean and the standard deviation using Welford's algorithm, which is well suited for data streams.
+
+***
+
+## Required input
+
+The welford dectection processor requires a data stream that has at least one field containing a numerical value.
+
+***
+
+## Configuration
+
+### Value to observe
+Specify the dimension of the data stream (e.g. the temperature) on which to perform change detection. 
+
+### Parameter `k`
+`k` controls the sensitivity of the change detector. Its unit are standard deviations. For an observation `x_n`, the Cusum value is `S_n = max(0, S_{n-1} - z-score(x_n) - k)`. Thus, the cusum-score `S` icnreases if `S_{n-1} - z-score(x_n) > k`. 
+
+### Parameter `h`
+The alarm theshold in standard deviations. An alarm occurs if `S_n > h` 
+
+## Output
+
+This processor outputs the original data stream plus 
+
+- `cusumLow`: The cusum value for negative changes
+- `cusumHigh`: The cusum value for positive changes
+- `changeDetectedLow`: Boolean indicating if a negative change was detected
+- `changeDetectedHigh`: Boolean indicating if a positive change was detected
\ No newline at end of file
diff --git a/streampipes-extensions/streampipes-processors-change-detection-jvm/src/main/resources/org.apache.streampipes.processors.changedetection.jvm.welford/icon.png b/streampipes-extensions/streampipes-processors-change-detection-jvm/src/main/resources/org.apache.streampipes.processors.changedetection.jvm.welford/icon.png
new file mode 100644
index 000000000..444fa4e2c
Binary files /dev/null and b/streampipes-extensions/streampipes-processors-change-detection-jvm/src/main/resources/org.apache.streampipes.processors.changedetection.jvm.welford/icon.png differ
diff --git a/streampipes-extensions/streampipes-processors-change-detection-jvm/src/main/resources/org.apache.streampipes.processors.changedetection.jvm.welford/strings.en b/streampipes-extensions/streampipes-processors-change-detection-jvm/src/main/resources/org.apache.streampipes.processors.changedetection.jvm.welford/strings.en
new file mode 100644
index 000000000..72b38eee7
--- /dev/null
+++ b/streampipes-extensions/streampipes-processors-change-detection-jvm/src/main/resources/org.apache.streampipes.processors.changedetection.jvm.welford/strings.en
@@ -0,0 +1,11 @@
+org.apache.streampipes.processors.changedetection.jvm.cusum.title=Cusum
+org.apache.streampipes.processors.changedetection.jvm.cusum.description=
+
+number-mapping.title=Value to observe
+number-mapping.description=Specifies the monitored dimension.
+
+param-k.title=Parameter k
+param-k.description=The sensitivity parameter. High value indicates low sensitivity. Unit: Standard deviations
+
+param-h.title=Parameter h
+param-h.description=The threshold above which a change is detected. Unit: Standard deviations
\ No newline at end of file
diff --git a/streampipes-wrapper-standalone/src/main/java/org/apache/streampipes/wrapper/standalone/declarer/StandaloneEventProcessingDeclarer.java b/streampipes-wrapper-standalone/src/main/java/org/apache/streampipes/wrapper/standalone/declarer/StandaloneEventProcessingDeclarer.java
index 9cba63920..7bbc6d84c 100644
--- a/streampipes-wrapper-standalone/src/main/java/org/apache/streampipes/wrapper/standalone/declarer/StandaloneEventProcessingDeclarer.java
+++ b/streampipes-wrapper-standalone/src/main/java/org/apache/streampipes/wrapper/standalone/declarer/StandaloneEventProcessingDeclarer.java
@@ -28,6 +28,7 @@ import org.apache.streampipes.wrapper.params.runtime.EventProcessorRuntimeParams
 import org.apache.streampipes.wrapper.standalone.ConfiguredEventProcessor;
 import org.apache.streampipes.wrapper.standalone.runtime.StandaloneEventProcessorRuntime;
 
+@Deprecated(since = "0.70.0", forRemoval = true)
 public abstract class StandaloneEventProcessingDeclarer<B extends
         EventProcessorBindingParams> extends EventProcessorDeclarer<B, StandaloneEventProcessorRuntime<B>> {
 
diff --git a/streampipes-wrapper/src/main/java/org/apache/streampipes/wrapper/params/binding/EventProcessorBindingParams.java b/streampipes-wrapper/src/main/java/org/apache/streampipes/wrapper/params/binding/EventProcessorBindingParams.java
index f00f12dc9..79971e03a 100644
--- a/streampipes-wrapper/src/main/java/org/apache/streampipes/wrapper/params/binding/EventProcessorBindingParams.java
+++ b/streampipes-wrapper/src/main/java/org/apache/streampipes/wrapper/params/binding/EventProcessorBindingParams.java
@@ -29,6 +29,7 @@ import java.util.List;
 import java.util.Map;
 import java.util.stream.Collectors;
 
+@Deprecated(since = "0.70.0", forRemoval = true)
 public abstract class EventProcessorBindingParams extends
         BindingParams<DataProcessorInvocation> implements
         Serializable {
diff --git a/streampipes-wrapper/src/main/java/org/apache/streampipes/wrapper/runtime/EventProcessor.java b/streampipes-wrapper/src/main/java/org/apache/streampipes/wrapper/runtime/EventProcessor.java
index bee014951..f6538321a 100644
--- a/streampipes-wrapper/src/main/java/org/apache/streampipes/wrapper/runtime/EventProcessor.java
+++ b/streampipes-wrapper/src/main/java/org/apache/streampipes/wrapper/runtime/EventProcessor.java
@@ -25,6 +25,7 @@ import org.apache.streampipes.wrapper.context.EventProcessorRuntimeContext;
 import org.apache.streampipes.wrapper.params.binding.EventProcessorBindingParams;
 import org.apache.streampipes.wrapper.routing.SpOutputCollector;
 
+@Deprecated(since = "0.70.0", forRemoval = true)
 public interface EventProcessor<B extends EventProcessorBindingParams> extends
         PipelineElement<B, DataProcessorInvocation> {