You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@streampipes.apache.org by bo...@apache.org on 2022/11/05 14:59:29 UTC

[incubator-streampipes] branch chore/STREAMPIPES-616 updated (c766f5d3e -> 94f7671a0)

This is an automated email from the ASF dual-hosted git repository.

bossenti pushed a change to branch chore/STREAMPIPES-616
in repository https://gitbox.apache.org/repos/asf/incubator-streampipes.git


 discard c766f5d3e [STREAMPIPES-616] refactor cumsum processor
     new 94f7671a0 [STREAMPIPES-616] refactor cumsum processor

This update added new revisions after undoing existing revisions.
That is to say, some revisions that were in the old version of the
branch are not in the new version.  This situation occurs
when a user --force pushes a change and generates a repository
containing something like this:

 * -- * -- B -- O -- O -- O   (c766f5d3e)
            \
             N -- N -- N   refs/heads/chore/STREAMPIPES-616 (94f7671a0)

You should already have received notification emails for all of the O
revisions, and so the following emails describe only the N revisions
from the common base, B.

Any revisions marked "omit" are not gone; other references still
refer to them.  Any revisions marked "discard" are gone forever.

The 1 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../documentation.md                                                  | 4 +++-
 1 file changed, 3 insertions(+), 1 deletion(-)


[incubator-streampipes] 01/01: [STREAMPIPES-616] refactor cumsum processor

Posted by bo...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

bossenti pushed a commit to branch chore/STREAMPIPES-616
in repository https://gitbox.apache.org/repos/asf/incubator-streampipes.git

commit 94f7671a064fb421e6575f450d49653fc0d4dd8f
Author: bossenti <bo...@posteo.de>
AuthorDate: Thu Nov 3 20:22:34 2022 +0100

    [STREAMPIPES-616] refactor cumsum processor
    
    Co-authored-by: smlabt <gi...@samuelabt.de>
---
 .../jvm/ChangeDetectionJvmInit.java                |   6 +-
 .../changedetection/jvm/cusum/Cusum.java           |   1 +
 .../changedetection/jvm/cusum/CusumController.java |   2 +
 .../jvm/cusum/CusumEventFields.java                |   2 +
 .../changedetection/jvm/cusum/CusumParameters.java |   1 +
 .../jvm/cusum/WelfordAggregate.java                |   1 +
 .../jvm/{cusum => welford}/WelfordAggregate.java   |   6 +-
 .../jvm/welford/WelfordChangeDetection.java        | 133 +++++++++++++++++++++
 .../WelfordEventFields.java}                       |  22 +++-
 .../documentation.md                               |   4 +-
 .../icon.png                                       | Bin 0 -> 62957 bytes
 .../strings.en                                     |   4 +-
 .../documentation.md                               |  10 +-
 .../icon.png                                       | Bin 0 -> 62957 bytes
 .../strings.en                                     |   4 +-
 .../StandaloneEventProcessingDeclarer.java         |   1 +
 .../binding/EventProcessorBindingParams.java       |   1 +
 .../wrapper/runtime/EventProcessor.java            |   1 +
 18 files changed, 177 insertions(+), 22 deletions(-)

diff --git a/streampipes-extensions/streampipes-processors-change-detection-jvm/src/main/java/org/apache/streampipes/processors/changedetection/jvm/ChangeDetectionJvmInit.java b/streampipes-extensions/streampipes-processors-change-detection-jvm/src/main/java/org/apache/streampipes/processors/changedetection/jvm/ChangeDetectionJvmInit.java
index ef55d2832..c561bf9b7 100644
--- a/streampipes-extensions/streampipes-processors-change-detection-jvm/src/main/java/org/apache/streampipes/processors/changedetection/jvm/ChangeDetectionJvmInit.java
+++ b/streampipes-extensions/streampipes-processors-change-detection-jvm/src/main/java/org/apache/streampipes/processors/changedetection/jvm/ChangeDetectionJvmInit.java
@@ -30,6 +30,7 @@ import org.apache.streampipes.messaging.jms.SpJmsProtocolFactory;
 import org.apache.streampipes.messaging.kafka.SpKafkaProtocolFactory;
 import org.apache.streampipes.messaging.mqtt.SpMqttProtocolFactory;
 import org.apache.streampipes.processors.changedetection.jvm.cusum.CusumController;
+import org.apache.streampipes.processors.changedetection.jvm.welford.WelfordChangeDetection;
 
 public class ChangeDetectionJvmInit extends StandaloneModelSubmitter {
 
@@ -43,7 +44,10 @@ public class ChangeDetectionJvmInit extends StandaloneModelSubmitter {
                 "Processors Change Detection JVM",
                 "",
                 8090)
-                .registerPipelineElements(new CusumController())
+                .registerPipelineElements(
+                        new CusumController(),
+                        new WelfordChangeDetection()
+                )
                 .registerMessagingFormats(
                         new JsonDataFormatFactory(),
                         new CborDataFormatFactory(),
diff --git a/streampipes-extensions/streampipes-processors-change-detection-jvm/src/main/java/org/apache/streampipes/processors/changedetection/jvm/cusum/Cusum.java b/streampipes-extensions/streampipes-processors-change-detection-jvm/src/main/java/org/apache/streampipes/processors/changedetection/jvm/cusum/Cusum.java
index 8c997b833..b65a01e6e 100644
--- a/streampipes-extensions/streampipes-processors-change-detection-jvm/src/main/java/org/apache/streampipes/processors/changedetection/jvm/cusum/Cusum.java
+++ b/streampipes-extensions/streampipes-processors-change-detection-jvm/src/main/java/org/apache/streampipes/processors/changedetection/jvm/cusum/Cusum.java
@@ -24,6 +24,7 @@ import org.apache.streampipes.wrapper.context.EventProcessorRuntimeContext;
 import org.apache.streampipes.wrapper.routing.SpOutputCollector;
 import org.apache.streampipes.wrapper.runtime.EventProcessor;
 
+@Deprecated(since="0.70.0", forRemoval = true)
 public class Cusum implements EventProcessor<CusumParameters> {
 
     private String selectedNumberMapping;
diff --git a/streampipes-extensions/streampipes-processors-change-detection-jvm/src/main/java/org/apache/streampipes/processors/changedetection/jvm/cusum/CusumController.java b/streampipes-extensions/streampipes-processors-change-detection-jvm/src/main/java/org/apache/streampipes/processors/changedetection/jvm/cusum/CusumController.java
index 5fad75db9..50497b505 100644
--- a/streampipes-extensions/streampipes-processors-change-detection-jvm/src/main/java/org/apache/streampipes/processors/changedetection/jvm/cusum/CusumController.java
+++ b/streampipes-extensions/streampipes-processors-change-detection-jvm/src/main/java/org/apache/streampipes/processors/changedetection/jvm/cusum/CusumController.java
@@ -33,6 +33,7 @@ import org.apache.streampipes.wrapper.standalone.declarer.StandaloneEventProcess
 
 import java.util.Arrays;
 
+@Deprecated(since = "0.70.0", forRemoval = true)
 public class CusumController extends StandaloneEventProcessingDeclarer<CusumParameters> {
 
     private static final String NUMBER_MAPPING = "number-mapping";
@@ -44,6 +45,7 @@ public class CusumController extends StandaloneEventProcessingDeclarer<CusumPara
         return ProcessingElementBuilder.create("org.apache.streampipes.processors.changedetection.jvm.cusum")
                 .category(DataProcessorType.VALUE_OBSERVER)
                 .withAssets(Assets.DOCUMENTATION)
+                .withAssets(Assets.ICON)
                 .withLocales(Locales.EN)
                 .requiredStream(StreamRequirementsBuilder
                         .create()
diff --git a/streampipes-extensions/streampipes-processors-change-detection-jvm/src/main/java/org/apache/streampipes/processors/changedetection/jvm/cusum/CusumEventFields.java b/streampipes-extensions/streampipes-processors-change-detection-jvm/src/main/java/org/apache/streampipes/processors/changedetection/jvm/cusum/CusumEventFields.java
index 555c38964..f3fff0b30 100644
--- a/streampipes-extensions/streampipes-processors-change-detection-jvm/src/main/java/org/apache/streampipes/processors/changedetection/jvm/cusum/CusumEventFields.java
+++ b/streampipes-extensions/streampipes-processors-change-detection-jvm/src/main/java/org/apache/streampipes/processors/changedetection/jvm/cusum/CusumEventFields.java
@@ -18,6 +18,8 @@
 
 package org.apache.streampipes.processors.changedetection.jvm.cusum;
 
+
+@Deprecated(since = "0.70.0", forRemoval = true)
 public class CusumEventFields {
 
     public static final String VAL_LOW = "cusumLow";
diff --git a/streampipes-extensions/streampipes-processors-change-detection-jvm/src/main/java/org/apache/streampipes/processors/changedetection/jvm/cusum/CusumParameters.java b/streampipes-extensions/streampipes-processors-change-detection-jvm/src/main/java/org/apache/streampipes/processors/changedetection/jvm/cusum/CusumParameters.java
index 901f5c734..d243f7a53 100644
--- a/streampipes-extensions/streampipes-processors-change-detection-jvm/src/main/java/org/apache/streampipes/processors/changedetection/jvm/cusum/CusumParameters.java
+++ b/streampipes-extensions/streampipes-processors-change-detection-jvm/src/main/java/org/apache/streampipes/processors/changedetection/jvm/cusum/CusumParameters.java
@@ -21,6 +21,7 @@ package org.apache.streampipes.processors.changedetection.jvm.cusum;
 import org.apache.streampipes.model.graph.DataProcessorInvocation;
 import org.apache.streampipes.wrapper.params.binding.EventProcessorBindingParams;
 
+@Deprecated(since = "0.70.0", forRemoval = true)
 public class CusumParameters  extends EventProcessorBindingParams {
 
     private String selectedNumberMapping;
diff --git a/streampipes-extensions/streampipes-processors-change-detection-jvm/src/main/java/org/apache/streampipes/processors/changedetection/jvm/cusum/WelfordAggregate.java b/streampipes-extensions/streampipes-processors-change-detection-jvm/src/main/java/org/apache/streampipes/processors/changedetection/jvm/cusum/WelfordAggregate.java
index 3479bb7c3..019c494c1 100644
--- a/streampipes-extensions/streampipes-processors-change-detection-jvm/src/main/java/org/apache/streampipes/processors/changedetection/jvm/cusum/WelfordAggregate.java
+++ b/streampipes-extensions/streampipes-processors-change-detection-jvm/src/main/java/org/apache/streampipes/processors/changedetection/jvm/cusum/WelfordAggregate.java
@@ -18,6 +18,7 @@
 
 package org.apache.streampipes.processors.changedetection.jvm.cusum;
 
+@Deprecated(since = "0.70.0", forRemoval = true)
 public class WelfordAggregate {
     private Integer count;
     private Double mean;
diff --git a/streampipes-extensions/streampipes-processors-change-detection-jvm/src/main/java/org/apache/streampipes/processors/changedetection/jvm/cusum/WelfordAggregate.java b/streampipes-extensions/streampipes-processors-change-detection-jvm/src/main/java/org/apache/streampipes/processors/changedetection/jvm/welford/WelfordAggregate.java
similarity index 90%
copy from streampipes-extensions/streampipes-processors-change-detection-jvm/src/main/java/org/apache/streampipes/processors/changedetection/jvm/cusum/WelfordAggregate.java
copy to streampipes-extensions/streampipes-processors-change-detection-jvm/src/main/java/org/apache/streampipes/processors/changedetection/jvm/welford/WelfordAggregate.java
index 3479bb7c3..e447c318b 100644
--- a/streampipes-extensions/streampipes-processors-change-detection-jvm/src/main/java/org/apache/streampipes/processors/changedetection/jvm/cusum/WelfordAggregate.java
+++ b/streampipes-extensions/streampipes-processors-change-detection-jvm/src/main/java/org/apache/streampipes/processors/changedetection/jvm/welford/WelfordAggregate.java
@@ -16,7 +16,7 @@
  *
  */
 
-package org.apache.streampipes.processors.changedetection.jvm.cusum;
+package org.apache.streampipes.processors.changedetection.jvm.welford;
 
 public class WelfordAggregate {
     private Integer count;
@@ -52,8 +52,4 @@ public class WelfordAggregate {
     public Double getSampleStd() {
         return Math.sqrt(getSampleVariance());
     }
-
-    public Double getPopulationStd() {
-        return Math.sqrt(getPopulationVariance());
-    }
 }
diff --git a/streampipes-extensions/streampipes-processors-change-detection-jvm/src/main/java/org/apache/streampipes/processors/changedetection/jvm/welford/WelfordChangeDetection.java b/streampipes-extensions/streampipes-processors-change-detection-jvm/src/main/java/org/apache/streampipes/processors/changedetection/jvm/welford/WelfordChangeDetection.java
new file mode 100644
index 000000000..50a778aab
--- /dev/null
+++ b/streampipes-extensions/streampipes-processors-change-detection-jvm/src/main/java/org/apache/streampipes/processors/changedetection/jvm/welford/WelfordChangeDetection.java
@@ -0,0 +1,133 @@
+package org.apache.streampipes.processors.changedetection.jvm.welford;
+
+import org.apache.streampipes.commons.exceptions.SpRuntimeException;
+import org.apache.streampipes.model.DataProcessorType;
+import org.apache.streampipes.model.graph.DataProcessorDescription;
+import org.apache.streampipes.model.runtime.Event;
+import org.apache.streampipes.model.schema.PropertyScope;
+import org.apache.streampipes.processors.changedetection.jvm.cusum.CusumEventFields;
+import org.apache.streampipes.sdk.builder.ProcessingElementBuilder;
+import org.apache.streampipes.sdk.builder.StreamRequirementsBuilder;
+import org.apache.streampipes.sdk.extractor.ProcessingElementParameterExtractor;
+import org.apache.streampipes.sdk.helpers.*;
+import org.apache.streampipes.sdk.utils.Assets;
+import org.apache.streampipes.vocabulary.SO;
+import org.apache.streampipes.wrapper.context.EventProcessorRuntimeContext;
+import org.apache.streampipes.wrapper.routing.SpOutputCollector;
+import org.apache.streampipes.wrapper.standalone.ProcessorParams;
+import org.apache.streampipes.wrapper.standalone.StreamPipesDataProcessor;
+
+import java.util.Arrays;
+import java.util.List;
+
+public class WelfordChangeDetection extends StreamPipesDataProcessor {
+
+    private static final String NUMBER_MAPPING = "number-mapping";
+    private static final String PARAM_K = "param-k";
+    private static final String PARAM_H = "param-h";
+
+    private String selectedNumberMapping;
+    private Double k;
+    private Double h;
+    private Double cuSumLow;
+    private Double cuSumHigh;
+    private WelfordAggregate welfordAggregate;
+
+    @Override
+    public DataProcessorDescription declareModel() {
+        return ProcessingElementBuilder.create("org.apache.streampipes.processors.changedetection.jvm.welford")
+                .category(DataProcessorType.VALUE_OBSERVER)
+                .withAssets(Assets.DOCUMENTATION)
+                .withAssets(Assets.ICON)
+                .withLocales(Locales.EN)
+                .requiredStream(StreamRequirementsBuilder
+                        .create()
+                        .requiredPropertyWithUnaryMapping(EpRequirements.numberReq(),
+                                Labels.withId(NUMBER_MAPPING),
+                                PropertyScope.NONE).build())
+                .requiredFloatParameter(Labels.withId(PARAM_K), 0.0f, 0.0f, 100.0f, 0.01f)
+                .requiredFloatParameter(Labels.withId(PARAM_H), 0.0f, 0.0f, 100.0f, 0.01f)
+                .outputStrategy(
+                        OutputStrategies.append(
+                                Arrays.asList(
+                                        EpProperties.numberEp(Labels.empty(), WelfordEventFields.VAL_LOW.label, SO.Number),
+                                        EpProperties.numberEp(Labels.empty(), WelfordEventFields.VAL_HIGH.label, SO.Number),
+                                        EpProperties.booleanEp(Labels.empty(), WelfordEventFields.DECISION_LOW.label, SO.Boolean),
+                                        EpProperties.booleanEp(Labels.empty(), WelfordEventFields.DECISION_HIGH.label, SO.Boolean)
+                                )
+                        ))
+                .build();
+    }
+
+    @Override
+    public void onInvocation(ProcessorParams parameters, SpOutputCollector spOutputCollector, EventProcessorRuntimeContext runtimeContext) throws SpRuntimeException {
+
+        ProcessingElementParameterExtractor extractor = parameters.extractor();
+        this.selectedNumberMapping = extractor.mappingPropertyValue(NUMBER_MAPPING);
+        this.k = extractor.singleValueParameter(PARAM_K, Double.class);
+        this.h = extractor.singleValueParameter(PARAM_H, Double.class);
+        this.cuSumLow = 0.0;
+        this.cuSumHigh = 0.0;
+        this.welfordAggregate = new WelfordAggregate();
+
+    }
+
+    @Override
+    public void onEvent(Event event, SpOutputCollector collector) throws SpRuntimeException {
+
+        Double number = event.getFieldBySelector(selectedNumberMapping).getAsPrimitive().getAsDouble();
+        welfordAggregate.update(number);  // update mean and standard deviation
+        Double normalized = getZScoreNormalizedValue(number);
+        updateStatistics(normalized);
+
+
+        Boolean isChangeHigh = getTestResult(this.cuSumHigh, h);
+        Boolean isChangeLow = getTestResult(this.cuSumLow, h);
+
+        Event updatedEvent = updateEvent(event, this.cuSumLow, this.cuSumHigh, isChangeLow, isChangeHigh);
+        collector.collect(updatedEvent);
+
+        if (isChangeHigh || isChangeLow) {
+            resetAfterChange();
+        }
+
+    }
+
+    @Override
+    public void onDetach() throws SpRuntimeException {
+        this.cuSumLow = 0.0;
+        this.cuSumHigh = 0.0;
+    }
+
+    private Double getZScoreNormalizedValue(Double value) {
+        Double mean = welfordAggregate.getMean();
+        Double std = welfordAggregate.getSampleStd();
+        return (value - mean) / std;
+    }
+
+    private void updateStatistics(Double newValue) {
+        if (newValue.isNaN()) {
+            return;
+        }
+        this.cuSumHigh = Math.max(0, this.cuSumHigh + newValue - k);
+        this.cuSumLow = Math.min(0, this.cuSumLow + newValue + k);
+    }
+
+    private Boolean getTestResult(Double cusum, Double h) {
+        return Math.abs(cusum) > this.h;
+    }
+
+    private Event updateEvent(Event event, Double cusumLow, Double cusumHigh, Boolean decisionLow, Boolean decisionHigh) {
+        event.addField(WelfordEventFields.VAL_LOW.label, cusumLow);
+        event.addField(WelfordEventFields.VAL_HIGH.label, cusumHigh);
+        event.addField(WelfordEventFields.DECISION_LOW.label, decisionLow);
+        event.addField(WelfordEventFields.DECISION_HIGH.label, decisionHigh);
+        return event;
+    }
+
+    private void resetAfterChange() {
+        this.cuSumHigh = 0.0;
+        this.cuSumLow = 0.0;
+        welfordAggregate = new WelfordAggregate();
+    }
+}
diff --git a/streampipes-extensions/streampipes-processors-change-detection-jvm/src/main/java/org/apache/streampipes/processors/changedetection/jvm/cusum/CusumEventFields.java b/streampipes-extensions/streampipes-processors-change-detection-jvm/src/main/java/org/apache/streampipes/processors/changedetection/jvm/welford/WelfordEventFields.java
similarity index 65%
copy from streampipes-extensions/streampipes-processors-change-detection-jvm/src/main/java/org/apache/streampipes/processors/changedetection/jvm/cusum/CusumEventFields.java
copy to streampipes-extensions/streampipes-processors-change-detection-jvm/src/main/java/org/apache/streampipes/processors/changedetection/jvm/welford/WelfordEventFields.java
index 555c38964..f1c87991c 100644
--- a/streampipes-extensions/streampipes-processors-change-detection-jvm/src/main/java/org/apache/streampipes/processors/changedetection/jvm/cusum/CusumEventFields.java
+++ b/streampipes-extensions/streampipes-processors-change-detection-jvm/src/main/java/org/apache/streampipes/processors/changedetection/jvm/welford/WelfordEventFields.java
@@ -16,12 +16,22 @@
  *
  */
 
-package org.apache.streampipes.processors.changedetection.jvm.cusum;
+package org.apache.streampipes.processors.changedetection.jvm.welford;
 
-public class CusumEventFields {
+public enum WelfordEventFields {
+    VAL_LOW("cumSumLow"),
+    VAL_HIGH("cumSumHigh"),
+    DECISION_LOW("changeDetectedLow"),
+    DECISION_HIGH("changeDetectedHigh");
 
-    public static final String VAL_LOW = "cusumLow";
-    public static final String VAL_HIGH = "cusumHigh";
-    public static final String DECISION_LOW = "changeDetectedLow";
-    public static final String DECISION_HIGH = "changeDetectedHigh";
+    public final String label;
+
+    WelfordEventFields(String label) {
+        this.label = label;
+    }
+
+    @Override
+    public String toString() {
+        return this.label;
+    }
 }
diff --git a/streampipes-extensions/streampipes-processors-change-detection-jvm/src/main/resources/org.apache.streampipes.processors.changedetection.jvm.cusum/documentation.md b/streampipes-extensions/streampipes-processors-change-detection-jvm/src/main/resources/org.apache.streampipes.processors.changedetection.jvm.cusum/documentation.md
index 0803b7f8e..ea352f283 100644
--- a/streampipes-extensions/streampipes-processors-change-detection-jvm/src/main/resources/org.apache.streampipes.processors.changedetection.jvm.cusum/documentation.md
+++ b/streampipes-extensions/streampipes-processors-change-detection-jvm/src/main/resources/org.apache.streampipes.processors.changedetection.jvm.cusum/documentation.md
@@ -16,7 +16,7 @@
   ~
   -->
 
-## Cusum (Cumulative Sum)
+## Cusum (Cumulative Sum) - Deprecated
 
 <!--
 <p align="center"> 
@@ -28,6 +28,8 @@
 
 ## Description
 
+**This processing element is deprecated please use the `WelfordChangeDetection` processing element instead.**
+
 Performs change detection on a single dimension of the incoming data stream. A change is detected if the cumulative deviation from the mean exceeds a certain threshold. This implementation tracks the mean and the standard deviation using Welford's algorithm, which is well suited for data streams.
 
 ***
diff --git a/streampipes-extensions/streampipes-processors-change-detection-jvm/src/main/resources/org.apache.streampipes.processors.changedetection.jvm.cusum/icon.png b/streampipes-extensions/streampipes-processors-change-detection-jvm/src/main/resources/org.apache.streampipes.processors.changedetection.jvm.cusum/icon.png
new file mode 100644
index 000000000..444fa4e2c
Binary files /dev/null and b/streampipes-extensions/streampipes-processors-change-detection-jvm/src/main/resources/org.apache.streampipes.processors.changedetection.jvm.cusum/icon.png differ
diff --git a/streampipes-extensions/streampipes-processors-change-detection-jvm/src/main/resources/org.apache.streampipes.processors.changedetection.jvm.cusum/strings.en b/streampipes-extensions/streampipes-processors-change-detection-jvm/src/main/resources/org.apache.streampipes.processors.changedetection.jvm.cusum/strings.en
index 72b38eee7..cea9a13f8 100644
--- a/streampipes-extensions/streampipes-processors-change-detection-jvm/src/main/resources/org.apache.streampipes.processors.changedetection.jvm.cusum/strings.en
+++ b/streampipes-extensions/streampipes-processors-change-detection-jvm/src/main/resources/org.apache.streampipes.processors.changedetection.jvm.cusum/strings.en
@@ -1,5 +1,5 @@
-org.apache.streampipes.processors.changedetection.jvm.cusum.title=Cusum
-org.apache.streampipes.processors.changedetection.jvm.cusum.description=
+org.apache.streampipes.processors.changedetection.jvm.cusum.title=Cusum (Deprecated)
+org.apache.streampipes.processors.changedetection.jvm.cusum.description=This processor is deprecated please, use the processing element `WelfordChangeDetection` instead.
 
 number-mapping.title=Value to observe
 number-mapping.description=Specifies the monitored dimension.
diff --git a/streampipes-extensions/streampipes-processors-change-detection-jvm/src/main/resources/org.apache.streampipes.processors.changedetection.jvm.cusum/documentation.md b/streampipes-extensions/streampipes-processors-change-detection-jvm/src/main/resources/org.apache.streampipes.processors.changedetection.jvm.welford/documentation.md
similarity index 77%
copy from streampipes-extensions/streampipes-processors-change-detection-jvm/src/main/resources/org.apache.streampipes.processors.changedetection.jvm.cusum/documentation.md
copy to streampipes-extensions/streampipes-processors-change-detection-jvm/src/main/resources/org.apache.streampipes.processors.changedetection.jvm.welford/documentation.md
index 0803b7f8e..a7d08188b 100644
--- a/streampipes-extensions/streampipes-processors-change-detection-jvm/src/main/resources/org.apache.streampipes.processors.changedetection.jvm.cusum/documentation.md
+++ b/streampipes-extensions/streampipes-processors-change-detection-jvm/src/main/resources/org.apache.streampipes.processors.changedetection.jvm.welford/documentation.md
@@ -16,7 +16,7 @@
   ~
   -->
 
-## Cusum (Cumulative Sum)
+## Welford Change Detection
 
 <!--
 <p align="center"> 
@@ -28,13 +28,13 @@
 
 ## Description
 
-Performs change detection on a single dimension of the incoming data stream. A change is detected if the cumulative deviation from the mean exceeds a certain threshold. This implementation tracks the mean and the standard deviation using Welford's algorithm, which is well suited for data streams.
+Performs change detection on a single dimension of the incoming data stream. This implementation tracks the mean and the standard deviation using Welford's algorithm, which is well suited for data streams. A change is detected if the cumulative deviation from the mean exceeds a certain threshold.
 
 ***
 
 ## Required input
 
-The cusum processor requires a data stream that has at least one field containing a numerical value.
+The welford change dectection processor requires a data stream that has at least one field containing a numerical value.
 
 ***
 
@@ -53,7 +53,7 @@ The alarm theshold in standard deviations. An alarm occurs if `S_n > h`
 
 This processor outputs the original data stream plus 
 
-- `cusumLow`: The cusum value for negative changes
-- `cusumHigh`: The cusum value for positive changes
+- `cumSumLow`: The cumulative sum value for negative changes
+- `cumSumHigh`: The cumulative sum value for positive changes
 - `changeDetectedLow`: Boolean indicating if a negative change was detected
 - `changeDetectedHigh`: Boolean indicating if a positive change was detected
\ No newline at end of file
diff --git a/streampipes-extensions/streampipes-processors-change-detection-jvm/src/main/resources/org.apache.streampipes.processors.changedetection.jvm.welford/icon.png b/streampipes-extensions/streampipes-processors-change-detection-jvm/src/main/resources/org.apache.streampipes.processors.changedetection.jvm.welford/icon.png
new file mode 100644
index 000000000..444fa4e2c
Binary files /dev/null and b/streampipes-extensions/streampipes-processors-change-detection-jvm/src/main/resources/org.apache.streampipes.processors.changedetection.jvm.welford/icon.png differ
diff --git a/streampipes-extensions/streampipes-processors-change-detection-jvm/src/main/resources/org.apache.streampipes.processors.changedetection.jvm.cusum/strings.en b/streampipes-extensions/streampipes-processors-change-detection-jvm/src/main/resources/org.apache.streampipes.processors.changedetection.jvm.welford/strings.en
similarity index 55%
copy from streampipes-extensions/streampipes-processors-change-detection-jvm/src/main/resources/org.apache.streampipes.processors.changedetection.jvm.cusum/strings.en
copy to streampipes-extensions/streampipes-processors-change-detection-jvm/src/main/resources/org.apache.streampipes.processors.changedetection.jvm.welford/strings.en
index 72b38eee7..f6e75a14c 100644
--- a/streampipes-extensions/streampipes-processors-change-detection-jvm/src/main/resources/org.apache.streampipes.processors.changedetection.jvm.cusum/strings.en
+++ b/streampipes-extensions/streampipes-processors-change-detection-jvm/src/main/resources/org.apache.streampipes.processors.changedetection.jvm.welford/strings.en
@@ -1,5 +1,5 @@
-org.apache.streampipes.processors.changedetection.jvm.cusum.title=Cusum
-org.apache.streampipes.processors.changedetection.jvm.cusum.description=
+org.apache.streampipes.processors.changedetection.jvm.welford.title=Welford Change Detection
+org.apache.streampipes.processors.changedetection.jvm.welford.description=Change detection for a time series based on Welford's algorithm.
 
 number-mapping.title=Value to observe
 number-mapping.description=Specifies the monitored dimension.
diff --git a/streampipes-wrapper-standalone/src/main/java/org/apache/streampipes/wrapper/standalone/declarer/StandaloneEventProcessingDeclarer.java b/streampipes-wrapper-standalone/src/main/java/org/apache/streampipes/wrapper/standalone/declarer/StandaloneEventProcessingDeclarer.java
index 9cba63920..7bbc6d84c 100644
--- a/streampipes-wrapper-standalone/src/main/java/org/apache/streampipes/wrapper/standalone/declarer/StandaloneEventProcessingDeclarer.java
+++ b/streampipes-wrapper-standalone/src/main/java/org/apache/streampipes/wrapper/standalone/declarer/StandaloneEventProcessingDeclarer.java
@@ -28,6 +28,7 @@ import org.apache.streampipes.wrapper.params.runtime.EventProcessorRuntimeParams
 import org.apache.streampipes.wrapper.standalone.ConfiguredEventProcessor;
 import org.apache.streampipes.wrapper.standalone.runtime.StandaloneEventProcessorRuntime;
 
+@Deprecated(since = "0.70.0", forRemoval = true)
 public abstract class StandaloneEventProcessingDeclarer<B extends
         EventProcessorBindingParams> extends EventProcessorDeclarer<B, StandaloneEventProcessorRuntime<B>> {
 
diff --git a/streampipes-wrapper/src/main/java/org/apache/streampipes/wrapper/params/binding/EventProcessorBindingParams.java b/streampipes-wrapper/src/main/java/org/apache/streampipes/wrapper/params/binding/EventProcessorBindingParams.java
index f00f12dc9..79971e03a 100644
--- a/streampipes-wrapper/src/main/java/org/apache/streampipes/wrapper/params/binding/EventProcessorBindingParams.java
+++ b/streampipes-wrapper/src/main/java/org/apache/streampipes/wrapper/params/binding/EventProcessorBindingParams.java
@@ -29,6 +29,7 @@ import java.util.List;
 import java.util.Map;
 import java.util.stream.Collectors;
 
+@Deprecated(since = "0.70.0", forRemoval = true)
 public abstract class EventProcessorBindingParams extends
         BindingParams<DataProcessorInvocation> implements
         Serializable {
diff --git a/streampipes-wrapper/src/main/java/org/apache/streampipes/wrapper/runtime/EventProcessor.java b/streampipes-wrapper/src/main/java/org/apache/streampipes/wrapper/runtime/EventProcessor.java
index bee014951..f6538321a 100644
--- a/streampipes-wrapper/src/main/java/org/apache/streampipes/wrapper/runtime/EventProcessor.java
+++ b/streampipes-wrapper/src/main/java/org/apache/streampipes/wrapper/runtime/EventProcessor.java
@@ -25,6 +25,7 @@ import org.apache.streampipes.wrapper.context.EventProcessorRuntimeContext;
 import org.apache.streampipes.wrapper.params.binding.EventProcessorBindingParams;
 import org.apache.streampipes.wrapper.routing.SpOutputCollector;
 
+@Deprecated(since = "0.70.0", forRemoval = true)
 public interface EventProcessor<B extends EventProcessorBindingParams> extends
         PipelineElement<B, DataProcessorInvocation> {