You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@streampipes.apache.org by bo...@apache.org on 2022/11/03 20:34:02 UTC

[incubator-streampipes] branch chore/STREAMPIPES-616 created (now a82b0269f)

This is an automated email from the ASF dual-hosted git repository.

bossenti pushed a change to branch chore/STREAMPIPES-616
in repository https://gitbox.apache.org/repos/asf/incubator-streampipes.git


      at a82b0269f [STREAMPIPES-616] refactor cumsum processor

This branch includes the following new commits:

     new a82b0269f [STREAMPIPES-616] refactor cumsum processor

The 1 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.



[incubator-streampipes] 01/01: [STREAMPIPES-616] refactor cumsum processor

Posted by bo...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

bossenti pushed a commit to branch chore/STREAMPIPES-616
in repository https://gitbox.apache.org/repos/asf/incubator-streampipes.git

commit a82b0269f6e266e7c9dd43968541b71796f42fc0
Author: bossenti <bo...@posteo.de>
AuthorDate: Thu Nov 3 20:22:34 2022 +0100

    [STREAMPIPES-616] refactor cumsum processor
    
    Co-authored-by: smlabt <gi...@samuelabt.de>
---
 .../jvm/ChangeDetectionJvmInit.java                |   6 +-
 .../changedetection/jvm/cusum/Cusum.java           |   1 +
 .../changedetection/jvm/cusum/CusumController.java |   1 +
 .../jvm/cusum/CusumEventFields.java                |   2 +
 .../changedetection/jvm/cusum/CusumParameters.java |   1 +
 .../jvm/cusum/WelfordAggregate.java                |   1 +
 .../jvm/{cusum => welford}/WelfordAggregate.java   |   6 +-
 .../jvm/welford/WelfordChangeDetection.java        | 150 +++++++++++++++++++++
 .../WelfordEventFields.java}                       |  22 ++-
 .../icon.png                                       | Bin 0 -> 62957 bytes
 .../strings.en                                     |   4 +-
 .../documentation.md                               |  59 ++++++++
 .../icon.png                                       | Bin 0 -> 62957 bytes
 .../strings.en                                     |   0
 .../StandaloneEventProcessingDeclarer.java         |   1 +
 .../binding/EventProcessorBindingParams.java       |   1 +
 .../wrapper/runtime/EventProcessor.java            |   1 +
 17 files changed, 242 insertions(+), 14 deletions(-)

diff --git a/streampipes-extensions/streampipes-processors-change-detection-jvm/src/main/java/org/apache/streampipes/processors/changedetection/jvm/ChangeDetectionJvmInit.java b/streampipes-extensions/streampipes-processors-change-detection-jvm/src/main/java/org/apache/streampipes/processors/changedetection/jvm/ChangeDetectionJvmInit.java
index ef55d2832..c561bf9b7 100644
--- a/streampipes-extensions/streampipes-processors-change-detection-jvm/src/main/java/org/apache/streampipes/processors/changedetection/jvm/ChangeDetectionJvmInit.java
+++ b/streampipes-extensions/streampipes-processors-change-detection-jvm/src/main/java/org/apache/streampipes/processors/changedetection/jvm/ChangeDetectionJvmInit.java
@@ -30,6 +30,7 @@ import org.apache.streampipes.messaging.jms.SpJmsProtocolFactory;
 import org.apache.streampipes.messaging.kafka.SpKafkaProtocolFactory;
 import org.apache.streampipes.messaging.mqtt.SpMqttProtocolFactory;
 import org.apache.streampipes.processors.changedetection.jvm.cusum.CusumController;
+import org.apache.streampipes.processors.changedetection.jvm.welford.WelfordChangeDetection;
 
 public class ChangeDetectionJvmInit extends StandaloneModelSubmitter {
 
@@ -43,7 +44,10 @@ public class ChangeDetectionJvmInit extends StandaloneModelSubmitter {
                 "Processors Change Detection JVM",
                 "",
                 8090)
-                .registerPipelineElements(new CusumController())
+                .registerPipelineElements(
+                        new CusumController(),
+                        new WelfordChangeDetection()
+                )
                 .registerMessagingFormats(
                         new JsonDataFormatFactory(),
                         new CborDataFormatFactory(),
diff --git a/streampipes-extensions/streampipes-processors-change-detection-jvm/src/main/java/org/apache/streampipes/processors/changedetection/jvm/cusum/Cusum.java b/streampipes-extensions/streampipes-processors-change-detection-jvm/src/main/java/org/apache/streampipes/processors/changedetection/jvm/cusum/Cusum.java
index 8c997b833..b65a01e6e 100644
--- a/streampipes-extensions/streampipes-processors-change-detection-jvm/src/main/java/org/apache/streampipes/processors/changedetection/jvm/cusum/Cusum.java
+++ b/streampipes-extensions/streampipes-processors-change-detection-jvm/src/main/java/org/apache/streampipes/processors/changedetection/jvm/cusum/Cusum.java
@@ -24,6 +24,7 @@ import org.apache.streampipes.wrapper.context.EventProcessorRuntimeContext;
 import org.apache.streampipes.wrapper.routing.SpOutputCollector;
 import org.apache.streampipes.wrapper.runtime.EventProcessor;
 
+@Deprecated(since="0.70.0", forRemoval = true)
 public class Cusum implements EventProcessor<CusumParameters> {
 
     private String selectedNumberMapping;
diff --git a/streampipes-extensions/streampipes-processors-change-detection-jvm/src/main/java/org/apache/streampipes/processors/changedetection/jvm/cusum/CusumController.java b/streampipes-extensions/streampipes-processors-change-detection-jvm/src/main/java/org/apache/streampipes/processors/changedetection/jvm/cusum/CusumController.java
index 5fad75db9..8028fced5 100644
--- a/streampipes-extensions/streampipes-processors-change-detection-jvm/src/main/java/org/apache/streampipes/processors/changedetection/jvm/cusum/CusumController.java
+++ b/streampipes-extensions/streampipes-processors-change-detection-jvm/src/main/java/org/apache/streampipes/processors/changedetection/jvm/cusum/CusumController.java
@@ -33,6 +33,7 @@ import org.apache.streampipes.wrapper.standalone.declarer.StandaloneEventProcess
 
 import java.util.Arrays;
 
+@Deprecated(since = "0.70.0", forRemoval = true)
 public class CusumController extends StandaloneEventProcessingDeclarer<CusumParameters> {
 
     private static final String NUMBER_MAPPING = "number-mapping";
diff --git a/streampipes-extensions/streampipes-processors-change-detection-jvm/src/main/java/org/apache/streampipes/processors/changedetection/jvm/cusum/CusumEventFields.java b/streampipes-extensions/streampipes-processors-change-detection-jvm/src/main/java/org/apache/streampipes/processors/changedetection/jvm/cusum/CusumEventFields.java
index 555c38964..f3fff0b30 100644
--- a/streampipes-extensions/streampipes-processors-change-detection-jvm/src/main/java/org/apache/streampipes/processors/changedetection/jvm/cusum/CusumEventFields.java
+++ b/streampipes-extensions/streampipes-processors-change-detection-jvm/src/main/java/org/apache/streampipes/processors/changedetection/jvm/cusum/CusumEventFields.java
@@ -18,6 +18,8 @@
 
 package org.apache.streampipes.processors.changedetection.jvm.cusum;
 
+
+@Deprecated(since = "0.70.0", forRemoval = true)
 public class CusumEventFields {
 
     public static final String VAL_LOW = "cusumLow";
diff --git a/streampipes-extensions/streampipes-processors-change-detection-jvm/src/main/java/org/apache/streampipes/processors/changedetection/jvm/cusum/CusumParameters.java b/streampipes-extensions/streampipes-processors-change-detection-jvm/src/main/java/org/apache/streampipes/processors/changedetection/jvm/cusum/CusumParameters.java
index 901f5c734..d243f7a53 100644
--- a/streampipes-extensions/streampipes-processors-change-detection-jvm/src/main/java/org/apache/streampipes/processors/changedetection/jvm/cusum/CusumParameters.java
+++ b/streampipes-extensions/streampipes-processors-change-detection-jvm/src/main/java/org/apache/streampipes/processors/changedetection/jvm/cusum/CusumParameters.java
@@ -21,6 +21,7 @@ package org.apache.streampipes.processors.changedetection.jvm.cusum;
 import org.apache.streampipes.model.graph.DataProcessorInvocation;
 import org.apache.streampipes.wrapper.params.binding.EventProcessorBindingParams;
 
+@Deprecated(since = "0.70.0", forRemoval = true)
 public class CusumParameters  extends EventProcessorBindingParams {
 
     private String selectedNumberMapping;
diff --git a/streampipes-extensions/streampipes-processors-change-detection-jvm/src/main/java/org/apache/streampipes/processors/changedetection/jvm/cusum/WelfordAggregate.java b/streampipes-extensions/streampipes-processors-change-detection-jvm/src/main/java/org/apache/streampipes/processors/changedetection/jvm/cusum/WelfordAggregate.java
index 3479bb7c3..019c494c1 100644
--- a/streampipes-extensions/streampipes-processors-change-detection-jvm/src/main/java/org/apache/streampipes/processors/changedetection/jvm/cusum/WelfordAggregate.java
+++ b/streampipes-extensions/streampipes-processors-change-detection-jvm/src/main/java/org/apache/streampipes/processors/changedetection/jvm/cusum/WelfordAggregate.java
@@ -18,6 +18,7 @@
 
 package org.apache.streampipes.processors.changedetection.jvm.cusum;
 
+@Deprecated(since = "0.70.0", forRemoval = true)
 public class WelfordAggregate {
     private Integer count;
     private Double mean;
diff --git a/streampipes-extensions/streampipes-processors-change-detection-jvm/src/main/java/org/apache/streampipes/processors/changedetection/jvm/cusum/WelfordAggregate.java b/streampipes-extensions/streampipes-processors-change-detection-jvm/src/main/java/org/apache/streampipes/processors/changedetection/jvm/welford/WelfordAggregate.java
similarity index 90%
copy from streampipes-extensions/streampipes-processors-change-detection-jvm/src/main/java/org/apache/streampipes/processors/changedetection/jvm/cusum/WelfordAggregate.java
copy to streampipes-extensions/streampipes-processors-change-detection-jvm/src/main/java/org/apache/streampipes/processors/changedetection/jvm/welford/WelfordAggregate.java
index 3479bb7c3..e447c318b 100644
--- a/streampipes-extensions/streampipes-processors-change-detection-jvm/src/main/java/org/apache/streampipes/processors/changedetection/jvm/cusum/WelfordAggregate.java
+++ b/streampipes-extensions/streampipes-processors-change-detection-jvm/src/main/java/org/apache/streampipes/processors/changedetection/jvm/welford/WelfordAggregate.java
@@ -16,7 +16,7 @@
  *
  */
 
-package org.apache.streampipes.processors.changedetection.jvm.cusum;
+package org.apache.streampipes.processors.changedetection.jvm.welford;
 
 public class WelfordAggregate {
     private Integer count;
@@ -52,8 +52,4 @@ public class WelfordAggregate {
     public Double getSampleStd() {
         return Math.sqrt(getSampleVariance());
     }
-
-    public Double getPopulationStd() {
-        return Math.sqrt(getPopulationVariance());
-    }
 }
diff --git a/streampipes-extensions/streampipes-processors-change-detection-jvm/src/main/java/org/apache/streampipes/processors/changedetection/jvm/welford/WelfordChangeDetection.java b/streampipes-extensions/streampipes-processors-change-detection-jvm/src/main/java/org/apache/streampipes/processors/changedetection/jvm/welford/WelfordChangeDetection.java
new file mode 100644
index 000000000..d2c468714
--- /dev/null
+++ b/streampipes-extensions/streampipes-processors-change-detection-jvm/src/main/java/org/apache/streampipes/processors/changedetection/jvm/welford/WelfordChangeDetection.java
@@ -0,0 +1,150 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.streampipes.processors.changedetection.jvm.welford;
+
+import org.apache.streampipes.commons.exceptions.SpRuntimeException;
+import org.apache.streampipes.model.DataProcessorType;
+import org.apache.streampipes.model.graph.DataProcessorDescription;
+import org.apache.streampipes.model.runtime.Event;
+import org.apache.streampipes.model.schema.PropertyScope;
+import org.apache.streampipes.processors.changedetection.jvm.cusum.CusumEventFields;
+import org.apache.streampipes.sdk.builder.ProcessingElementBuilder;
+import org.apache.streampipes.sdk.builder.StreamRequirementsBuilder;
+import org.apache.streampipes.sdk.extractor.ProcessingElementParameterExtractor;
+import org.apache.streampipes.sdk.helpers.*;
+import org.apache.streampipes.sdk.utils.Assets;
+import org.apache.streampipes.vocabulary.SO;
+import org.apache.streampipes.wrapper.context.EventProcessorRuntimeContext;
+import org.apache.streampipes.wrapper.routing.SpOutputCollector;
+import org.apache.streampipes.wrapper.standalone.ProcessorParams;
+import org.apache.streampipes.wrapper.standalone.StreamPipesDataProcessor;
+
+import java.util.Arrays;
+
+public class WelfordChangeDetection extends StreamPipesDataProcessor {
+
+    private static final String NUMBER_MAPPING = "number-mapping";
+    private static final String PARAM_K = "param-k";
+    private static final String PARAM_H = "param-h";
+
+    private String selectedNumberMapping;
+    private Double k;
+    private Double h;
+    private Double cuSumLow;
+    private Double cuSumHigh;
+    private WelfordAggregate welfordAggregate;
+
+    @Override
+    public DataProcessorDescription declareModel() {
+        return ProcessingElementBuilder.create("org.apache.streampipes.processors.changedetection.jvm.welford")
+                .category(DataProcessorType.VALUE_OBSERVER)
+                .withAssets(Assets.DOCUMENTATION)
+                .withAssets(Assets.ICON)
+                .withLocales(Locales.EN)
+                .requiredStream(StreamRequirementsBuilder
+                        .create()
+                        .requiredPropertyWithUnaryMapping(EpRequirements.numberReq(),
+                                Labels.withId(NUMBER_MAPPING),
+                                PropertyScope.NONE).build())
+                .requiredFloatParameter(Labels.withId(PARAM_K), 0.0f, 0.0f, 100.0f, 0.01f)
+                .requiredFloatParameter(Labels.withId(PARAM_H), 0.0f, 0.0f, 100.0f, 0.01f)
+                .outputStrategy(
+                        OutputStrategies.append(
+                                Arrays.asList(
+                                        EpProperties.numberEp(Labels.empty(), WelfordEventFields.VAL_LOW.label, SO.Number),
+                                        EpProperties.numberEp(Labels.empty(), WelfordEventFields.VAL_HIGH.label, SO.Number),
+                                        EpProperties.booleanEp(Labels.empty(), WelfordEventFields.DECISION_LOW.label, SO.Boolean),
+                                        EpProperties.booleanEp(Labels.empty(), WelfordEventFields.DECISION_HIGH.label, SO.Boolean)
+                                )
+                        ))
+                .build();
+    }
+
+    @Override
+    public void onInvocation(ProcessorParams parameters, SpOutputCollector spOutputCollector, EventProcessorRuntimeContext runtimeContext) throws SpRuntimeException {
+
+        ProcessingElementParameterExtractor extractor = parameters.extractor();
+        this.selectedNumberMapping = extractor.singleValueParameter(NUMBER_MAPPING, String.class);
+        this.k = extractor.singleValueParameter(PARAM_K, Double.class);
+        this.h = extractor.singleValueParameter(PARAM_H, Double.class);
+        this.cuSumLow = 0.0;
+        this.cuSumHigh = 0.0;
+        this.welfordAggregate = new WelfordAggregate();
+
+    }
+
+    @Override
+    public void onEvent(Event event, SpOutputCollector collector) throws SpRuntimeException {
+
+        Double number = event.getFieldBySelector(selectedNumberMapping).getAsPrimitive().getAsDouble();
+        welfordAggregate.update(number);  // update mean and standard deviation
+        Double normalized = getZScoreNormalizedValue(number);
+        updateStatistics(normalized);
+
+
+        Boolean isChangeHigh = getTestResult(this.cuSumHigh, h);
+        Boolean isChangeLow = getTestResult(this.cuSumLow, h);
+
+        Event updatedEvent = updateEvent(event, this.cuSumLow, this.cuSumHigh, isChangeLow, isChangeHigh);
+        collector.collect(updatedEvent);
+
+        if (isChangeHigh || isChangeLow) {
+            resetAfterChange();
+        }
+
+    }
+
+    @Override
+    public void onDetach() throws SpRuntimeException {
+        this.cuSumLow = 0.0;
+        this.cuSumHigh = 0.0;
+    }
+
+    private Double getZScoreNormalizedValue(Double value) {
+        Double mean = welfordAggregate.getMean();
+        Double std = welfordAggregate.getSampleStd();
+        return (value - mean) / std;
+    }
+
+    private void updateStatistics(Double newValue) {
+        if (newValue.isNaN()) {
+            return;
+        }
+        this.cuSumHigh = Math.max(0, this.cuSumHigh + newValue - k);
+        this.cuSumLow = Math.min(0, this.cuSumLow + newValue + k);
+    }
+
+    private Boolean getTestResult(Double cusum, Double h) {
+        return Math.abs(cusum) > this.h;
+    }
+
+    private Event updateEvent(Event event, Double cusumLow, Double cusumHigh, Boolean decisionLow, Boolean decisionHigh) {
+        event.addField(WelfordEventFields.VAL_LOW.label, cusumLow);
+        event.addField(WelfordEventFields.VAL_HIGH.label, cusumHigh);
+        event.addField(WelfordEventFields.DECISION_LOW.label, decisionLow);
+        event.addField(WelfordEventFields.DECISION_HIGH.label, decisionHigh);
+        return event;
+    }
+
+    private void resetAfterChange() {
+        this.cuSumHigh = 0.0;
+        this.cuSumLow = 0.0;
+        welfordAggregate = new WelfordAggregate();
+    }
+}
diff --git a/streampipes-extensions/streampipes-processors-change-detection-jvm/src/main/java/org/apache/streampipes/processors/changedetection/jvm/cusum/CusumEventFields.java b/streampipes-extensions/streampipes-processors-change-detection-jvm/src/main/java/org/apache/streampipes/processors/changedetection/jvm/welford/WelfordEventFields.java
similarity index 65%
copy from streampipes-extensions/streampipes-processors-change-detection-jvm/src/main/java/org/apache/streampipes/processors/changedetection/jvm/cusum/CusumEventFields.java
copy to streampipes-extensions/streampipes-processors-change-detection-jvm/src/main/java/org/apache/streampipes/processors/changedetection/jvm/welford/WelfordEventFields.java
index 555c38964..f1c87991c 100644
--- a/streampipes-extensions/streampipes-processors-change-detection-jvm/src/main/java/org/apache/streampipes/processors/changedetection/jvm/cusum/CusumEventFields.java
+++ b/streampipes-extensions/streampipes-processors-change-detection-jvm/src/main/java/org/apache/streampipes/processors/changedetection/jvm/welford/WelfordEventFields.java
@@ -16,12 +16,22 @@
  *
  */
 
-package org.apache.streampipes.processors.changedetection.jvm.cusum;
+package org.apache.streampipes.processors.changedetection.jvm.welford;
 
-public class CusumEventFields {
+public enum WelfordEventFields {
+    VAL_LOW("cumSumLow"),
+    VAL_HIGH("cumSumHigh"),
+    DECISION_LOW("changeDetectedLow"),
+    DECISION_HIGH("changeDetectedHigh");
 
-    public static final String VAL_LOW = "cusumLow";
-    public static final String VAL_HIGH = "cusumHigh";
-    public static final String DECISION_LOW = "changeDetectedLow";
-    public static final String DECISION_HIGH = "changeDetectedHigh";
+    public final String label;
+
+    WelfordEventFields(String label) {
+        this.label = label;
+    }
+
+    @Override
+    public String toString() {
+        return this.label;
+    }
 }
diff --git a/streampipes-extensions/streampipes-processors-change-detection-jvm/src/main/resources/org.apache.streampipes.processors.changedetection.jvm.cusum/icon.png b/streampipes-extensions/streampipes-processors-change-detection-jvm/src/main/resources/org.apache.streampipes.processors.changedetection.jvm.cusum/icon.png
new file mode 100644
index 000000000..444fa4e2c
Binary files /dev/null and b/streampipes-extensions/streampipes-processors-change-detection-jvm/src/main/resources/org.apache.streampipes.processors.changedetection.jvm.cusum/icon.png differ
diff --git a/streampipes-extensions/streampipes-processors-change-detection-jvm/src/main/resources/org.apache.streampipes.processors.changedetection.jvm.cusum/strings.en b/streampipes-extensions/streampipes-processors-change-detection-jvm/src/main/resources/org.apache.streampipes.processors.changedetection.jvm.cusum/strings.en
index 72b38eee7..cea9a13f8 100644
--- a/streampipes-extensions/streampipes-processors-change-detection-jvm/src/main/resources/org.apache.streampipes.processors.changedetection.jvm.cusum/strings.en
+++ b/streampipes-extensions/streampipes-processors-change-detection-jvm/src/main/resources/org.apache.streampipes.processors.changedetection.jvm.cusum/strings.en
@@ -1,5 +1,5 @@
-org.apache.streampipes.processors.changedetection.jvm.cusum.title=Cusum
-org.apache.streampipes.processors.changedetection.jvm.cusum.description=
+org.apache.streampipes.processors.changedetection.jvm.cusum.title=Cusum (Deprecated)
+org.apache.streampipes.processors.changedetection.jvm.cusum.description=This processor is deprecated please, use the processing element `WelfordChangeDetection` instead.
 
 number-mapping.title=Value to observe
 number-mapping.description=Specifies the monitored dimension.
diff --git a/streampipes-extensions/streampipes-processors-change-detection-jvm/src/main/resources/org.apache.streampipes.processors.changedetection.jvm.welford/documentation.md b/streampipes-extensions/streampipes-processors-change-detection-jvm/src/main/resources/org.apache.streampipes.processors.changedetection.jvm.welford/documentation.md
new file mode 100644
index 000000000..e3ff41bee
--- /dev/null
+++ b/streampipes-extensions/streampipes-processors-change-detection-jvm/src/main/resources/org.apache.streampipes.processors.changedetection.jvm.welford/documentation.md
@@ -0,0 +1,59 @@
+<!--
+  ~ Licensed to the Apache Software Foundation (ASF) under one or more
+  ~ contributor license agreements.  See the NOTICE file distributed with
+  ~ this work for additional information regarding copyright ownership.
+  ~ The ASF licenses this file to You under the Apache License, Version 2.0
+  ~ (the "License"); you may not use this file except in compliance with
+  ~ the License.  You may obtain a copy of the License at
+  ~
+  ~    http://www.apache.org/licenses/LICENSE-2.0
+  ~
+  ~ Unless required by applicable law or agreed to in writing, software
+  ~ distributed under the License is distributed on an "AS IS" BASIS,
+  ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  ~ See the License for the specific language governing permissions and
+  ~ limitations under the License.
+  ~
+  -->
+
+## Welford Change Detection
+
+<!--
+<p align="center"> 
+    <img src="icon.png" width="150px;" class="pe-image-documentation"/>
+</p>
+-->
+
+***
+
+## Description
+
+Performs change detection on a single dimension of the incoming data stream. This implementation tracks the mean and the standard deviation using Welford's algorithm, which is well suited for data streams. A change is detected if the cumulative deviation from the mean exceeds a certain threshold.
+
+***
+
+## Required input
+
+The welford dectection processor requires a data stream that has at least one field containing a numerical value.
+
+***
+
+## Configuration
+
+### Value to observe
+Specify the dimension of the data stream (e.g. the temperature) on which to perform change detection. 
+
+### Parameter `k`
+`k` controls the sensitivity of the change detector. Its unit are standard deviations. For an observation `x_n`, the value of the cumulative sum is defined as `S_n = max(0, S_{n-1} - z-score(x_n) - k)`. Thus, the cumsum-score `S` icnreases if `S_{n-1} - z-score(x_n) > k`. 
+
+### Parameter `h`
+The alarm threshold in standard deviations. An alarm occurs if `S_n > h` 
+
+## Output
+
+This processor outputs the original data stream plus 
+
+- `cumSumLow`: The cumSum value for negative changes
+- `cumSumHigh`: The cumSum value for positive changes
+- `changeDetectedLow`: Boolean indicating if a negative change was detected
+- `changeDetectedHigh`: Boolean indicating if a positive change was detected
\ No newline at end of file
diff --git a/streampipes-extensions/streampipes-processors-change-detection-jvm/src/main/resources/org.apache.streampipes.processors.changedetection.jvm.welford/icon.png b/streampipes-extensions/streampipes-processors-change-detection-jvm/src/main/resources/org.apache.streampipes.processors.changedetection.jvm.welford/icon.png
new file mode 100644
index 000000000..444fa4e2c
Binary files /dev/null and b/streampipes-extensions/streampipes-processors-change-detection-jvm/src/main/resources/org.apache.streampipes.processors.changedetection.jvm.welford/icon.png differ
diff --git a/streampipes-extensions/streampipes-processors-change-detection-jvm/src/main/resources/org.apache.streampipes.processors.changedetection.jvm.cusum/strings.en b/streampipes-extensions/streampipes-processors-change-detection-jvm/src/main/resources/org.apache.streampipes.processors.changedetection.jvm.welford/strings.en
similarity index 100%
copy from streampipes-extensions/streampipes-processors-change-detection-jvm/src/main/resources/org.apache.streampipes.processors.changedetection.jvm.cusum/strings.en
copy to streampipes-extensions/streampipes-processors-change-detection-jvm/src/main/resources/org.apache.streampipes.processors.changedetection.jvm.welford/strings.en
diff --git a/streampipes-wrapper-standalone/src/main/java/org/apache/streampipes/wrapper/standalone/declarer/StandaloneEventProcessingDeclarer.java b/streampipes-wrapper-standalone/src/main/java/org/apache/streampipes/wrapper/standalone/declarer/StandaloneEventProcessingDeclarer.java
index 9cba63920..7bbc6d84c 100644
--- a/streampipes-wrapper-standalone/src/main/java/org/apache/streampipes/wrapper/standalone/declarer/StandaloneEventProcessingDeclarer.java
+++ b/streampipes-wrapper-standalone/src/main/java/org/apache/streampipes/wrapper/standalone/declarer/StandaloneEventProcessingDeclarer.java
@@ -28,6 +28,7 @@ import org.apache.streampipes.wrapper.params.runtime.EventProcessorRuntimeParams
 import org.apache.streampipes.wrapper.standalone.ConfiguredEventProcessor;
 import org.apache.streampipes.wrapper.standalone.runtime.StandaloneEventProcessorRuntime;
 
+@Deprecated(since = "0.70.0", forRemoval = true)
 public abstract class StandaloneEventProcessingDeclarer<B extends
         EventProcessorBindingParams> extends EventProcessorDeclarer<B, StandaloneEventProcessorRuntime<B>> {
 
diff --git a/streampipes-wrapper/src/main/java/org/apache/streampipes/wrapper/params/binding/EventProcessorBindingParams.java b/streampipes-wrapper/src/main/java/org/apache/streampipes/wrapper/params/binding/EventProcessorBindingParams.java
index f00f12dc9..79971e03a 100644
--- a/streampipes-wrapper/src/main/java/org/apache/streampipes/wrapper/params/binding/EventProcessorBindingParams.java
+++ b/streampipes-wrapper/src/main/java/org/apache/streampipes/wrapper/params/binding/EventProcessorBindingParams.java
@@ -29,6 +29,7 @@ import java.util.List;
 import java.util.Map;
 import java.util.stream.Collectors;
 
+@Deprecated(since = "0.70.0", forRemoval = true)
 public abstract class EventProcessorBindingParams extends
         BindingParams<DataProcessorInvocation> implements
         Serializable {
diff --git a/streampipes-wrapper/src/main/java/org/apache/streampipes/wrapper/runtime/EventProcessor.java b/streampipes-wrapper/src/main/java/org/apache/streampipes/wrapper/runtime/EventProcessor.java
index bee014951..f6538321a 100644
--- a/streampipes-wrapper/src/main/java/org/apache/streampipes/wrapper/runtime/EventProcessor.java
+++ b/streampipes-wrapper/src/main/java/org/apache/streampipes/wrapper/runtime/EventProcessor.java
@@ -25,6 +25,7 @@ import org.apache.streampipes.wrapper.context.EventProcessorRuntimeContext;
 import org.apache.streampipes.wrapper.params.binding.EventProcessorBindingParams;
 import org.apache.streampipes.wrapper.routing.SpOutputCollector;
 
+@Deprecated(since = "0.70.0", forRemoval = true)
 public interface EventProcessor<B extends EventProcessorBindingParams> extends
         PipelineElement<B, DataProcessorInvocation> {