You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@streampipes.apache.org by wi...@apache.org on 2021/12/01 15:19:14 UTC

[incubator-streampipes] branch edge-extensions updated: latest changes to performance evaluation

This is an automated email from the ASF dual-hosted git repository.

wiener pushed a commit to branch edge-extensions
in repository https://gitbox.apache.org/repos/asf/incubator-streampipes.git


The following commit(s) were added to refs/heads/edge-extensions by this push:
     new d03e36a  latest changes to performance evaluation
d03e36a is described below

commit d03e36a0a8a314bd40442400b756909f019ed2fa
Author: Patrick Wiener <wi...@fzi.de>
AuthorDate: Tue Nov 30 16:05:42 2021 +0100

    latest changes to performance evaluation
---
 docker-build.sh                                    |   4 +-
 streampipes-node-controller/Dockerfile             |  13 +-
 streampipes-node-controller/aarch64.Dockerfile     |  13 +-
 streampipes-node-controller/arm.Dockerfile         |  13 +-
 .../node/controller/config/EnvConfigParam.java     |   1 +
 .../node/controller/config/NodeConfiguration.java  |  14 +
 .../offloading/OffloadingPolicyManager.java        |  17 +-
 .../strategies/OffloadingStrategyFactory.java      |  23 +-
 ...ionStrategy.java => SelectionStrategyType.java} |  21 +-
 .../ThresholdViolationOffloadingPolicy.java        |  33 +-
 .../selection/PrioritySelectionStrategy.java       |  65 ++--
 .../selection/RandomSelectionStrategy.java         |   9 +-
 .../strategies/selection/SelectionStrategy.java    |   2 +-
 .../PipelineElementReconfigurationHandler.java     |   6 +
 .../management/relay/DataStreamRelayManager.java   |   6 +
 .../statscollector/DockerStatsCollector.java       |   2 +-
 .../EdgeExtensionsGenericPerformanceTest.java      |   7 +-
 .../streampipes/performance/TestFactory.java       |  35 +-
 .../performancetest/CountBasedOffloadingTest.java  | 327 ++++++++++++++++++
 .../performance/performancetest/GenericTest.java   | 120 +++++--
 .../performancetest/PrioOffloadingTest.java        | 379 +++++++++++++++++++++
 .../performance/performancetest/TestConsumer.java  | 155 +++++++++
 22 files changed, 1154 insertions(+), 111 deletions(-)

diff --git a/docker-build.sh b/docker-build.sh
index 1cdde4a..7081b26 100755
--- a/docker-build.sh
+++ b/docker-build.sh
@@ -28,7 +28,7 @@ docker_build_amd(){
 
 docker_build_arm(){
   echo "Docker build for arm ..."
-  docker buildx build \
+  docker buildx build --no-cache --pull \
   --platform linux/arm/v7 \
   -t $repo/$1:$version-armv7 \
   -f $2/arm.Dockerfile $2 --load
@@ -36,7 +36,7 @@ docker_build_arm(){
 
 docker_build_aarch64(){
   echo "Docker build for aarch64 ..."
-  docker buildx build \
+  docker buildx build --no-cache --pull \
   --platform linux/arm64 \
   -t $repo/$1:$version-aarch64 \
   -f $2/aarch64.Dockerfile $2 --load
diff --git a/streampipes-node-controller/Dockerfile b/streampipes-node-controller/Dockerfile
index 7ded02f..8c3a6b4 100644
--- a/streampipes-node-controller/Dockerfile
+++ b/streampipes-node-controller/Dockerfile
@@ -19,7 +19,7 @@ FROM $BASE_IMAGE
 
 MAINTAINER dev@streampipes.apache.org
 
-EXPOSE 7077
+EXPOSE 7077 9010
 ENV CONSUL_LOCATION consul
 
 # Comment:
@@ -31,4 +31,13 @@ RUN set -x; \
 
 COPY target/streampipes-node-controller.jar  /streampipes-node-controller.jar
 
-ENTRYPOINT ["java", "-jar", "/streampipes-node-controller.jar"]
+ENTRYPOINT ["java", \
+            "-Djava.rmi.server.hostname=ipe-streamnuc-2.fzi.de", \
+            "-Dcom.sun.management.jmxremote", \
+            "-Dcom.sun.management.jmxremote.port=9010", \
+            "-Dcom.sun.management.jmxremote.rmi.port=9010", \
+            "-Dcom.sun.management.jmxremote.local.only=false", \
+            "-Dcom.sun.management.jmxremote.ssl=false", \
+            "-Dcom.sun.management.jmxremote.authenticate=false", \
+            "-jar", \
+            "/streampipes-node-controller.jar"]
diff --git a/streampipes-node-controller/aarch64.Dockerfile b/streampipes-node-controller/aarch64.Dockerfile
index 856f64a..b51677e 100644
--- a/streampipes-node-controller/aarch64.Dockerfile
+++ b/streampipes-node-controller/aarch64.Dockerfile
@@ -22,7 +22,7 @@ RUN apt -y update; \
 FROM $BASE_IMAGE
 MAINTAINER dev@streampipes.apache.org
 
-EXPOSE 7077
+EXPOSE 7077 9010
 ENV CONSUL_LOCATION consul
 
 COPY --from=build-dev /usr/bin/qemu-aarch64-static /usr/bin
@@ -34,4 +34,13 @@ RUN set -ex; \
 
 COPY target/streampipes-node-controller.jar  /streampipes-node-controller.jar
 
-ENTRYPOINT ["java", "-jar", "/streampipes-node-controller.jar"]
+ENTRYPOINT ["java", \
+            "-Djava.rmi.server.hostname=0.0.0.0", \
+            "-Dcom.sun.management.jmxremote", \
+            "-Dcom.sun.management.jmxremote.port=9010", \
+            "-Dcom.sun.management.jmxremote.rmi.port=9010", \
+            "-Dcom.sun.management.jmxremote.local.only=false", \
+            "-Dcom.sun.management.jmxremote.ssl=false", \
+            "-Dcom.sun.management.jmxremote.authenticate=false", \
+            "-jar", \
+            "/streampipes-node-controller.jar"]
diff --git a/streampipes-node-controller/arm.Dockerfile b/streampipes-node-controller/arm.Dockerfile
index a155288..01d639b 100644
--- a/streampipes-node-controller/arm.Dockerfile
+++ b/streampipes-node-controller/arm.Dockerfile
@@ -22,7 +22,7 @@ RUN apt -y update; \
 FROM $BASE_IMAGE
 MAINTAINER dev@streampipes.apache.org
 
-EXPOSE 7077
+EXPOSE 7077 9010
 ENV CONSUL_LOCATION consul
 
 COPY --from=build-dev /usr/bin/qemu-arm-static /usr/bin
@@ -38,4 +38,13 @@ RUN set -ex; \
 
 COPY target/streampipes-node-controller.jar  /streampipes-node-controller.jar
 
-ENTRYPOINT ["java", "-jar", "/streampipes-node-controller.jar"]
+ENTRYPOINT ["java", \
+            "-Djava.rmi.server.hostname=ipe-streampi-02.fzi.de", \
+            "-Dcom.sun.management.jmxremote", \
+            "-Dcom.sun.management.jmxremote.port=9010", \
+            "-Dcom.sun.management.jmxremote.rmi.port=9010", \
+            "-Dcom.sun.management.jmxremote.local.only=false", \
+            "-Dcom.sun.management.jmxremote.ssl=false", \
+            "-Dcom.sun.management.jmxremote.authenticate=false", \
+            "-jar", \
+            "/streampipes-node-controller.jar"]
diff --git a/streampipes-node-controller/src/main/java/org/apache/streampipes/node/controller/config/EnvConfigParam.java b/streampipes-node-controller/src/main/java/org/apache/streampipes/node/controller/config/EnvConfigParam.java
index c8ecd58..45c2e62 100644
--- a/streampipes-node-controller/src/main/java/org/apache/streampipes/node/controller/config/EnvConfigParam.java
+++ b/streampipes-node-controller/src/main/java/org/apache/streampipes/node/controller/config/EnvConfigParam.java
@@ -53,6 +53,7 @@ public enum EnvConfigParam {
     SUPPORTED_PIPELINE_ELEMENTS("SP_SUPPORTED_PIPELINE_ELEMENTS", ""),
     AUTO_OFFLOADING("SP_AUTO_OFFLOADING_ACTIVATED", "false"),
     AUTO_OFFLOADING_STRATEGY("SP_AUTO_OFFLOADING_STRATEGY", "default"),
+    AUTO_OFFLOADING_SELECTION_STRATEGY("SP_AUTO_OFFLOADING_SELECTION_STRATEGY", "random"),
     AUTO_OFFLOADING_THRESHOLD_IN_PERCENT("SP_AUTO_OFFLOADING_THRESHOLD_IN_PERCENT", "90"),
     AUTO_OFFLOADING_MAX_NUM_VIOLATIONS("SP_AUTO_OFFLOADING_MAX_NUM_VIOLATIONS", "4"),
     NODE_STORAGE_PATH("SP_NODE_STORAGE_PATH", "/var/lib/streampipes"),
diff --git a/streampipes-node-controller/src/main/java/org/apache/streampipes/node/controller/config/NodeConfiguration.java b/streampipes-node-controller/src/main/java/org/apache/streampipes/node/controller/config/NodeConfiguration.java
index 59d402f..6c487e8 100644
--- a/streampipes-node-controller/src/main/java/org/apache/streampipes/node/controller/config/NodeConfiguration.java
+++ b/streampipes-node-controller/src/main/java/org/apache/streampipes/node/controller/config/NodeConfiguration.java
@@ -21,6 +21,7 @@ import org.apache.commons.validator.routines.UrlValidator;
 import org.apache.streampipes.model.node.resources.fielddevice.FieldDeviceAccessResource;
 import org.apache.streampipes.node.controller.config.utils.ConfigUtils;
 import org.apache.streampipes.node.controller.management.offloading.strategies.OffloadingStrategyType;
+import org.apache.streampipes.node.controller.management.offloading.strategies.SelectionStrategyType;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -62,6 +63,7 @@ public final class NodeConfiguration {
     private static String consulHost;
     private static boolean autoOffloadingActivated;
     private static OffloadingStrategyType autoOffloadingStrategy;
+    private static SelectionStrategyType autoOffloadingSelectionStrategy;
     private static float autoOffloadingThresholdInPercent;
     private static int autoOffloadingMaxNumViolations;
     private static String nodeStoragePath;
@@ -313,6 +315,14 @@ public final class NodeConfiguration {
         NodeConfiguration.autoOffloadingStrategy = autoOffloadingStrategy;
     }
 
+    public static SelectionStrategyType getAutoOffloadingSelectionStrategy() {
+        return autoOffloadingSelectionStrategy;
+    }
+
+    public static void setAutoOffloadingSelectionStrategy(SelectionStrategyType autoOffloadingSelectionStrategy) {
+        NodeConfiguration.autoOffloadingSelectionStrategy = autoOffloadingSelectionStrategy;
+    }
+
     public static float getAutoOffloadingThresholdInPercent() {
         return autoOffloadingThresholdInPercent;
     }
@@ -564,6 +574,10 @@ public final class NodeConfiguration {
                     configMap.put(envKey, value);
                     setAutoOffloadingStrategy(OffloadingStrategyType.fromString(value));
                     break;
+                case AUTO_OFFLOADING_SELECTION_STRATEGY:
+                    configMap.put(envKey, value);
+                    setAutoOffloadingSelectionStrategy(SelectionStrategyType.fromString(value));
+                    break;
                 case AUTO_OFFLOADING_THRESHOLD_IN_PERCENT:
                     configMap.put(envKey, value);
                     setAutoOffloadingThresholdInPercent(Float.parseFloat(value));
diff --git a/streampipes-node-controller/src/main/java/org/apache/streampipes/node/controller/management/offloading/OffloadingPolicyManager.java b/streampipes-node-controller/src/main/java/org/apache/streampipes/node/controller/management/offloading/OffloadingPolicyManager.java
index 2fc1271..e834687 100644
--- a/streampipes-node-controller/src/main/java/org/apache/streampipes/node/controller/management/offloading/OffloadingPolicyManager.java
+++ b/streampipes-node-controller/src/main/java/org/apache/streampipes/node/controller/management/offloading/OffloadingPolicyManager.java
@@ -24,7 +24,6 @@ import org.apache.streampipes.model.Response;
 import org.apache.streampipes.model.base.InvocableStreamPipesEntity;
 import org.apache.streampipes.model.node.NodeInfoDescription;
 import org.apache.streampipes.model.node.monitor.ResourceMetrics;
-import org.apache.streampipes.model.staticproperty.FreeTextStaticProperty;
 import org.apache.streampipes.node.controller.config.NodeConfiguration;
 import org.apache.streampipes.node.controller.management.offloading.strategies.OffloadingStrategy;
 import org.apache.streampipes.node.controller.management.pe.PipelineElementManager;
@@ -44,7 +43,7 @@ public class OffloadingPolicyManager {
     private static final String NODE_MANAGEMENT_ONLINE_ENDPOINT = BACKEND_BASE_ROUTE + "/nodes/online";
 
     private final List<OffloadingStrategy<?>> offloadingStrategies = new ArrayList<>();
-    private final List<InvocableStreamPipesEntity> unsuccessfullyTriedEntities = new ArrayList<>();
+    private final List<InvocableStreamPipesEntity> blacklistInvocables = new ArrayList<>();
     private static OffloadingPolicyManager instance;
     private static final Logger LOG = LoggerFactory.getLogger(OffloadingPolicyManager.class.getCanonicalName());
     //private static EvaluationLogger logger = EvaluationLogger.getInstance();
@@ -74,26 +73,32 @@ public class OffloadingPolicyManager {
             triggerOffloading(violatedPolicies.get(0));
         }
         //Blacklist of entities is cleared when no policies were violated.
-        else unsuccessfullyTriedEntities.clear();
+        else blacklistInvocables.clear();
     }
 
     private void triggerOffloading(OffloadingStrategy strategy){
-        InvocableStreamPipesEntity offloadEntity = strategy.getSelectionStrategy().select(this.unsuccessfullyTriedEntities);
+        InvocableStreamPipesEntity offloadEntity = strategy.getSelectionStrategy().select(this.blacklistInvocables);
         EvaluationLogger.getInstance().logMQTT("Offloading", "entity to offload selected");
         if(offloadEntity != null){
             Response resp = PipelineElementManager.getInstance().offload(offloadEntity);
 
             String appId = offloadEntity.getAppId();
+            String runningInstanceId = offloadEntity.getDeploymentRunningInstanceId();
             String pipelineName = offloadEntity.getCorrespondingPipeline();
+            int prioScore = offloadEntity.getPriorityScore();
 
-            EvaluationLogger.getInstance().logMQTT("Offloading", "offloading done", strategy.getOffloadingPolicy().getClass().getSimpleName(), appId);
+            EvaluationLogger.getInstance().logMQTT(
+                    "Offloading",
+                    "offloading done",
+                    strategy.getOffloadingPolicy().getClass().getSimpleName(),
+                    appId + "_prio" + prioScore + "_" + pipelineName);
 
             if(resp.isSuccess()){
                 LOG.info("Successfully offloaded: {} of pipeline: {}", appId, pipelineName);
                 strategy.getOffloadingPolicy().reset();
             } else{
                 LOG.warn("Failed to offload: {} of pipeline: {}", appId, pipelineName);
-                unsuccessfullyTriedEntities.add(offloadEntity);
+                blacklistInvocables.add(offloadEntity);
             }
         } else LOG.info("No pipeline element found to offload");
     }
diff --git a/streampipes-node-controller/src/main/java/org/apache/streampipes/node/controller/management/offloading/strategies/OffloadingStrategyFactory.java b/streampipes-node-controller/src/main/java/org/apache/streampipes/node/controller/management/offloading/strategies/OffloadingStrategyFactory.java
index 8e17b6d..b7ce04b 100644
--- a/streampipes-node-controller/src/main/java/org/apache/streampipes/node/controller/management/offloading/strategies/OffloadingStrategyFactory.java
+++ b/streampipes-node-controller/src/main/java/org/apache/streampipes/node/controller/management/offloading/strategies/OffloadingStrategyFactory.java
@@ -25,7 +25,9 @@ import org.apache.streampipes.node.controller.management.offloading.strategies.p
 import org.apache.streampipes.node.controller.management.offloading.strategies.property.CPULoadResourceProperty;
 import org.apache.streampipes.node.controller.management.offloading.strategies.property.FreeDiskSpaceResourceProperty;
 import org.apache.streampipes.node.controller.management.offloading.strategies.property.FreeMemoryResourceProperty;
+import org.apache.streampipes.node.controller.management.offloading.strategies.selection.PrioritySelectionStrategy;
 import org.apache.streampipes.node.controller.management.offloading.strategies.selection.RandomSelectionStrategy;
+import org.apache.streampipes.node.controller.management.offloading.strategies.selection.SelectionStrategy;
 
 import java.util.ArrayList;
 import java.util.Collections;
@@ -33,7 +35,7 @@ import java.util.List;
 
 public class OffloadingStrategyFactory {
 
-    private static final int HISTORY_QUEUE_SIZE_FACTOR = 5;
+    private static final int HISTORY_QUEUE_SIZE_FACTOR = 1;
 
     public List<OffloadingStrategy<?>> select(){
 
@@ -51,6 +53,17 @@ public class OffloadingStrategyFactory {
         }
     }
 
+    private SelectionStrategy getAutoOffloadingSelectionStrategy() {
+        switch (NodeConfiguration.getAutoOffloadingSelectionStrategy()) {
+            case RANDOM:
+                return new RandomSelectionStrategy();
+            case PRIO:
+                return new PrioritySelectionStrategy();
+            default:
+                return new RandomSelectionStrategy();
+        }
+    }
+
     private OffloadingStrategy<Float> getDebugOffloadingPolicy() {
         return new OffloadingStrategy<Float>(
                 new ThresholdViolationOffloadingPolicy<>(
@@ -59,7 +72,7 @@ public class OffloadingStrategyFactory {
                         0.5f,
                         1),
                 new CPULoadResourceProperty(),
-                new RandomSelectionStrategy());
+                getAutoOffloadingSelectionStrategy());
     }
 
     private List<OffloadingStrategy<?>> getDefaultStrategy(){
@@ -78,7 +91,7 @@ public class OffloadingStrategyFactory {
                         NodeConfiguration.getAutoOffloadingThresholdInPercent(),
                         NodeConfiguration.getAutoOffloadingMaxNumViolations()),
                 new CPULoadResourceProperty(),
-                new RandomSelectionStrategy());
+                getAutoOffloadingSelectionStrategy());
     }
 
     private OffloadingStrategy<Long> getDefaultMemoryOffloadingPolicy(){
@@ -92,7 +105,7 @@ public class OffloadingStrategyFactory {
                         threshold,
                         NodeConfiguration.getAutoOffloadingMaxNumViolations()),
                 new FreeMemoryResourceProperty(),
-                new RandomSelectionStrategy());
+                getAutoOffloadingSelectionStrategy());
     }
 
     private OffloadingStrategy<Long> getDefaultDiskSpaceOffloadingPolicy(){
@@ -106,6 +119,6 @@ public class OffloadingStrategyFactory {
                         threshold,
                         NodeConfiguration.getAutoOffloadingMaxNumViolations()),
                 new FreeDiskSpaceResourceProperty(),
-                new RandomSelectionStrategy());
+                getAutoOffloadingSelectionStrategy());
     }
 }
diff --git a/streampipes-node-controller/src/main/java/org/apache/streampipes/node/controller/management/offloading/strategies/selection/SelectionStrategy.java b/streampipes-node-controller/src/main/java/org/apache/streampipes/node/controller/management/offloading/strategies/SelectionStrategyType.java
similarity index 65%
copy from streampipes-node-controller/src/main/java/org/apache/streampipes/node/controller/management/offloading/strategies/selection/SelectionStrategy.java
copy to streampipes-node-controller/src/main/java/org/apache/streampipes/node/controller/management/offloading/strategies/SelectionStrategyType.java
index 54ed1f8..5e01b92 100644
--- a/streampipes-node-controller/src/main/java/org/apache/streampipes/node/controller/management/offloading/strategies/selection/SelectionStrategy.java
+++ b/streampipes-node-controller/src/main/java/org/apache/streampipes/node/controller/management/offloading/strategies/SelectionStrategyType.java
@@ -15,13 +15,24 @@
  * limitations under the License.
  *
  */
+package org.apache.streampipes.node.controller.management.offloading.strategies;
 
-package org.apache.streampipes.node.controller.management.offloading.strategies.selection;
+public enum SelectionStrategyType {
+    RANDOM("random"),
+    PRIO("prio");
 
-import org.apache.streampipes.model.base.InvocableStreamPipesEntity;
+    String name;
 
-import java.util.List;
+    SelectionStrategyType(String name) {
+        this.name = name;
+    }
 
-public interface SelectionStrategy {
-    InvocableStreamPipesEntity select(List<InvocableStreamPipesEntity> blacklistedEntities);
+    public static SelectionStrategyType fromString(String name) {
+        for (SelectionStrategyType o : SelectionStrategyType.values()) {
+            if (o.name.equalsIgnoreCase(name.toLowerCase())) {
+                return o;
+            }
+        }
+        return null;
+    }
 }
diff --git a/streampipes-node-controller/src/main/java/org/apache/streampipes/node/controller/management/offloading/strategies/policies/ThresholdViolationOffloadingPolicy.java b/streampipes-node-controller/src/main/java/org/apache/streampipes/node/controller/management/offloading/strategies/policies/ThresholdViolationOffloadingPolicy.java
index 95eb8ff..5ff48dc 100644
--- a/streampipes-node-controller/src/main/java/org/apache/streampipes/node/controller/management/offloading/strategies/policies/ThresholdViolationOffloadingPolicy.java
+++ b/streampipes-node-controller/src/main/java/org/apache/streampipes/node/controller/management/offloading/strategies/policies/ThresholdViolationOffloadingPolicy.java
@@ -29,12 +29,16 @@ public class ThresholdViolationOffloadingPolicy<T extends Comparable<T>> impleme
             LoggerFactory.getLogger(ThresholdViolationOffloadingPolicy.class.getCanonicalName());
 
     private Queue<T> history;
+    private final int length;
     private final T threshold;
     private final Comparator comparator;
     private final int numberOfThresholdViolations;
 
+    private int numPreviousViolations = 0;
+
     public ThresholdViolationOffloadingPolicy(int length, Comparator comparator, T threshold,
                                               int numberOfThresholdViolations){
+        this.length = length;
         this.history = new ArrayBlockingQueue<>(length);
         this.comparator = comparator;
         this.threshold = threshold;
@@ -54,16 +58,16 @@ public class ThresholdViolationOffloadingPolicy<T extends Comparable<T>> impleme
         if(!this.history.offer(value)) {
             this.history.poll();
             this.history.offer(value);
-            //Only for logging; can be removed later
-            if(value.compareTo(this.threshold) > 0){
-                int numViolations = 0;
-                for(T val : this.history){
-                    if(val.compareTo(this.threshold) > 0){
-                        numViolations++;
-                    }
-                }
-                EvaluationLogger.getInstance().logMQTT("Offloading", "policy violation #" + numViolations);
-            }
+            //TODO: Only for logging; can be removed later
+//            if(value.compareTo(this.threshold) > 0){
+//                int numViolations = 0;
+//                for(T val : this.history){
+//                    if(val.compareTo(this.threshold) > 0){
+//                        numViolations++;
+//                    }
+//                }
+//                EvaluationLogger.getInstance().logMQTT("Offloading", "policy violation #" + numViolations);
+//            }
         }
     }
 
@@ -86,6 +90,12 @@ public class ThresholdViolationOffloadingPolicy<T extends Comparable<T>> impleme
                 }
                 break;
         }
+
+        if (numViolations > numPreviousViolations) {
+            EvaluationLogger.getInstance().logMQTT("Offloading", "policy violation #" + numViolations);
+            numPreviousViolations = numViolations;
+        }
+
         if(numViolations >= this.numberOfThresholdViolations){
             LOG.info("Threshold violation detected");
             return true;
@@ -95,7 +105,10 @@ public class ThresholdViolationOffloadingPolicy<T extends Comparable<T>> impleme
 
     @Override
     public void reset() {
+        LOG.info("Reset history queue to length: " + this.history.size());
         this.history = new ArrayBlockingQueue<>(this.history.size());
+        LOG.info("Reset number previous violation counter from: " + numPreviousViolations + "-> to: " + 0);
+        this.numPreviousViolations = 0;
     }
 
     //TODO: Remove -- Only for debugging purposes
diff --git a/streampipes-node-controller/src/main/java/org/apache/streampipes/node/controller/management/offloading/strategies/selection/PrioritySelectionStrategy.java b/streampipes-node-controller/src/main/java/org/apache/streampipes/node/controller/management/offloading/strategies/selection/PrioritySelectionStrategy.java
index e52ec72..ef6378a 100644
--- a/streampipes-node-controller/src/main/java/org/apache/streampipes/node/controller/management/offloading/strategies/selection/PrioritySelectionStrategy.java
+++ b/streampipes-node-controller/src/main/java/org/apache/streampipes/node/controller/management/offloading/strategies/selection/PrioritySelectionStrategy.java
@@ -19,12 +19,9 @@
 package org.apache.streampipes.node.controller.management.offloading.strategies.selection;
 
 import org.apache.streampipes.model.base.InvocableStreamPipesEntity;
-import org.apache.streampipes.model.node.NodeInfoDescription;
-import org.apache.streampipes.node.controller.config.NodeConfiguration;
-import org.apache.streampipes.node.controller.management.offloading.OffloadingPolicyManager;
+import org.apache.streampipes.model.graph.DataProcessorInvocation;
 import org.apache.streampipes.node.controller.management.pe.storage.RunningInvocableInstances;
 
-import java.util.ArrayList;
 import java.util.Comparator;
 import java.util.List;
 import java.util.stream.Collectors;
@@ -32,53 +29,29 @@ import java.util.stream.Collectors;
 public class PrioritySelectionStrategy implements SelectionStrategy{
 
     @Override
-    public InvocableStreamPipesEntity select(List<InvocableStreamPipesEntity> blacklistedEntities) {
-        List<InvocableStreamPipesEntity> runningPipelineElements = RunningInvocableInstances.INSTANCE.getAll();
-
-        //List all other nodes that are online
-        // TODO: this involves request to central backend (deactivated; assess if provides meaningful benefit)
-        /*List<NodeInfoDescription> onlineNodes = getNodeInfos().stream().filter(desc ->
-                !desc.getNodeControllerId().equals(NodeConfiguration.getNodeControllerId()))
-                .collect(Collectors.toList());*/
-
-        List<InvocableStreamPipesEntity> candidatesForOffloading = runningPipelineElements.stream()
-                        .filter(entity -> isNotBlacklisted(entity, blacklistedEntities))
-                        .filter(InvocableStreamPipesEntity::isPreemption)
-                        //.filter(entity -> verifyIfSupportedOnOtherNodes(entity, onlineNodes))
-                        .collect(Collectors.toList());
-
-        if (candidatesForOffloading.isEmpty()) {
+    public InvocableStreamPipesEntity select(List<InvocableStreamPipesEntity> blacklistInvocables) {
+        // Sinks cannot be migrated
+        List<InvocableStreamPipesEntity> candidateInvocables = RunningInvocableInstances.INSTANCE.getAll().stream()
+                .filter(e -> e instanceof DataProcessorInvocation)
+                .filter(entity -> isNotBlacklisted(entity, blacklistInvocables))
+                .filter(InvocableStreamPipesEntity::isPreemption)
+                .collect(Collectors.toList());
+        if(candidateInvocables.size() == 0)
             return null;
-        } else {
-            return selectPipelineElementWithLowestPriorityScore(candidatesForOffloading);
-        }
-    }
-
-    private List<NodeInfoDescription> getNodeInfos(){
-        return OffloadingPolicyManager.getInstance().getOnlineNodes();
+        return selectWithLowestPrioScore(candidateInvocables);
     }
 
-    private InvocableStreamPipesEntity selectPipelineElementWithLowestPriorityScore(List<InvocableStreamPipesEntity> entities){
-        // sort pipeline element priorityScores ascending
-        List<InvocableStreamPipesEntity> sortedPipelineElements = entities.stream()
+    private InvocableStreamPipesEntity selectWithLowestPrioScore(List<InvocableStreamPipesEntity> candidateInvocables){
+        // sort pipeline element priorityScores ascending and return element with lowest score
+        return candidateInvocables.stream()
                 .sorted(Comparator.comparingInt(InvocableStreamPipesEntity::getPriorityScore))
-                .collect(Collectors.toList());
-
-        // return first element with lowest score
-        return sortedPipelineElements.get(0);
-    }
-
-    private boolean verifyIfSupportedOnOtherNodes(InvocableStreamPipesEntity entity, List<NodeInfoDescription> nodeInfos){
-        List<NodeInfoDescription> candidateNodes = new ArrayList<>();
-        for(NodeInfoDescription nodeInfo : nodeInfos){
-            if (nodeInfo.getSupportedElements().contains(entity.getAppId()))
-                candidateNodes.add(nodeInfo);
-        }
-        return !candidateNodes.isEmpty();
+                .collect(Collectors.toList()).get(0);
     }
 
-    private boolean isNotBlacklisted(InvocableStreamPipesEntity entity, List<InvocableStreamPipesEntity> blacklist){
-        return blacklist.stream().noneMatch(blacklistedEntity ->
-                blacklistedEntity.getDeploymentRunningInstanceId().equals(entity.getDeploymentRunningInstanceId()));
+    private boolean isNotBlacklisted(InvocableStreamPipesEntity entity,
+                                     List<InvocableStreamPipesEntity> blacklistInvocables){
+        return blacklistInvocables.stream()
+                .noneMatch(blacklistEntity -> blacklistEntity
+                        .getDeploymentRunningInstanceId().equals(entity.getDeploymentRunningInstanceId()));
     }
 }
diff --git a/streampipes-node-controller/src/main/java/org/apache/streampipes/node/controller/management/offloading/strategies/selection/RandomSelectionStrategy.java b/streampipes-node-controller/src/main/java/org/apache/streampipes/node/controller/management/offloading/strategies/selection/RandomSelectionStrategy.java
index f89c9b5..dabe841 100644
--- a/streampipes-node-controller/src/main/java/org/apache/streampipes/node/controller/management/offloading/strategies/selection/RandomSelectionStrategy.java
+++ b/streampipes-node-controller/src/main/java/org/apache/streampipes/node/controller/management/offloading/strategies/selection/RandomSelectionStrategy.java
@@ -26,13 +26,14 @@ import java.util.List;
 import java.util.Random;
 import java.util.stream.Collectors;
 
-public class RandomSelectionStrategy implements SelectionStrategy{
+public class RandomSelectionStrategy implements SelectionStrategy {
 
     @Override
-    public InvocableStreamPipesEntity select(List<InvocableStreamPipesEntity> blacklistedEntities) {
+    public InvocableStreamPipesEntity select(List<InvocableStreamPipesEntity> blacklistInvocables) {
         //Sinks cannot be migrated atm
-        List<InvocableStreamPipesEntity> instances = RunningInvocableInstances.INSTANCE.getAll().stream().
-                filter(e -> e instanceof DataProcessorInvocation).collect(Collectors.toList());
+        List<InvocableStreamPipesEntity> instances = RunningInvocableInstances.INSTANCE.getAll().stream()
+                .filter(e -> e instanceof DataProcessorInvocation)
+                .collect(Collectors.toList());
         if(instances.size() == 0)
             return null;
         return instances.get(new Random().nextInt(instances.size()));
diff --git a/streampipes-node-controller/src/main/java/org/apache/streampipes/node/controller/management/offloading/strategies/selection/SelectionStrategy.java b/streampipes-node-controller/src/main/java/org/apache/streampipes/node/controller/management/offloading/strategies/selection/SelectionStrategy.java
index 54ed1f8..e889c0d 100644
--- a/streampipes-node-controller/src/main/java/org/apache/streampipes/node/controller/management/offloading/strategies/selection/SelectionStrategy.java
+++ b/streampipes-node-controller/src/main/java/org/apache/streampipes/node/controller/management/offloading/strategies/selection/SelectionStrategy.java
@@ -23,5 +23,5 @@ import org.apache.streampipes.model.base.InvocableStreamPipesEntity;
 import java.util.List;
 
 public interface SelectionStrategy {
-    InvocableStreamPipesEntity select(List<InvocableStreamPipesEntity> blacklistedEntities);
+    InvocableStreamPipesEntity select(List<InvocableStreamPipesEntity> blacklistInvocables);
 }
diff --git a/streampipes-node-controller/src/main/java/org/apache/streampipes/node/controller/management/pe/handler/PipelineElementReconfigurationHandler.java b/streampipes-node-controller/src/main/java/org/apache/streampipes/node/controller/management/pe/handler/PipelineElementReconfigurationHandler.java
index 979cdec..be5ea86 100644
--- a/streampipes-node-controller/src/main/java/org/apache/streampipes/node/controller/management/pe/handler/PipelineElementReconfigurationHandler.java
+++ b/streampipes-node-controller/src/main/java/org/apache/streampipes/node/controller/management/pe/handler/PipelineElementReconfigurationHandler.java
@@ -32,6 +32,8 @@ import org.apache.streampipes.model.staticproperty.StaticProperty;
 import org.apache.streampipes.node.controller.management.IHandler;
 import org.apache.streampipes.node.controller.management.pe.storage.RunningInvocableInstances;
 import org.apache.streampipes.node.controller.utils.HttpUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import java.nio.charset.StandardCharsets;
 import java.util.ArrayList;
@@ -41,6 +43,8 @@ import java.util.Map;
 
 public class PipelineElementReconfigurationHandler implements IHandler<Response> {
 
+    private static final Logger LOG = LoggerFactory.getLogger(PipelineElementReconfigurationHandler.class.getCanonicalName());
+
     private static final String DOT = ".";
     private static final String RECONFIGURATION_TOPIC = "org.apache.streampipes.control.event.reconfigure";
 
@@ -63,6 +67,8 @@ public class PipelineElementReconfigurationHandler implements IHandler<Response>
         EventProducer pub = getReconfigurationEventProducer();
 
         byte [] reconfigurationEvent = reconfigurationToByteArray();
+
+        LOG.info("Publish reconfiguration event to pipeline element {}", graph.getDeploymentRunningInstanceId());
         pub.publish(reconfigurationEvent);
         pub.disconnect();
 
diff --git a/streampipes-node-controller/src/main/java/org/apache/streampipes/node/controller/management/relay/DataStreamRelayManager.java b/streampipes-node-controller/src/main/java/org/apache/streampipes/node/controller/management/relay/DataStreamRelayManager.java
index ef8f07a..d2ad5a1 100644
--- a/streampipes-node-controller/src/main/java/org/apache/streampipes/node/controller/management/relay/DataStreamRelayManager.java
+++ b/streampipes-node-controller/src/main/java/org/apache/streampipes/node/controller/management/relay/DataStreamRelayManager.java
@@ -18,6 +18,7 @@
 package org.apache.streampipes.node.controller.management.relay;
 
 import org.apache.streampipes.commons.exceptions.SpRuntimeException;
+import org.apache.streampipes.logging.evaluation.EvaluationLogger;
 import org.apache.streampipes.model.Response;
 import org.apache.streampipes.model.base.InvocableStreamPipesEntity;
 import org.apache.streampipes.model.eventrelay.SpDataStreamRelay;
@@ -36,6 +37,9 @@ public class DataStreamRelayManager {
 
     private static DataStreamRelayManager instance = null;
 
+    // TODO: remove after tests
+    private static final EvaluationLogger logger = EvaluationLogger.getInstance();
+
     private DataStreamRelayManager() {}
 
     public static DataStreamRelayManager getInstance() {
@@ -59,6 +63,7 @@ public class DataStreamRelayManager {
 
         Map<String, EventRelay> eventRelayMap = new HashMap<>();
 
+        logger.logMQTT("Migration", "node controller start relay", "");
         // start data stream relay
         // 1:1 mapping -> remote forward
         // 1:n mapping -> remote fan-out
@@ -76,6 +81,7 @@ public class DataStreamRelayManager {
     }
 
     public Response stop(String id) {
+        logger.logMQTT("Migration", "node controller stop relay", "");
         Map<String, EventRelay> relays = RunningRelayInstances.INSTANCE.get(id);
         if (relays != null) {
             relays.values().forEach(EventRelay::stop);
diff --git a/streampipes-node-controller/src/main/java/org/apache/streampipes/node/controller/management/statscollector/DockerStatsCollector.java b/streampipes-node-controller/src/main/java/org/apache/streampipes/node/controller/management/statscollector/DockerStatsCollector.java
index e812b02..d7cc49d 100644
--- a/streampipes-node-controller/src/main/java/org/apache/streampipes/node/controller/management/statscollector/DockerStatsCollector.java
+++ b/streampipes-node-controller/src/main/java/org/apache/streampipes/node/controller/management/statscollector/DockerStatsCollector.java
@@ -54,7 +54,7 @@ public class DockerStatsCollector {
     public void run() {
         LOG.debug("Create Docker stats scheduler");
         ScheduledExecutorService scheduledExecutorService = Executors.newSingleThreadScheduledExecutor();
-        scheduledExecutorService.scheduleAtFixedRate(collect, 0, 1, TimeUnit.SECONDS);
+        scheduledExecutorService.scheduleAtFixedRate(collect, 0, DOCKER_STATS_COLLECT_FREQ_SECS, TimeUnit.SECONDS);
 
         Object[] header = new Object[]{
                 "timestamp",
diff --git a/streampipes-performance-tests/src/main/java/org/apache/streampipes/performance/EdgeExtensionsGenericPerformanceTest.java b/streampipes-performance-tests/src/main/java/org/apache/streampipes/performance/EdgeExtensionsGenericPerformanceTest.java
index d76d7b6..27e8358 100644
--- a/streampipes-performance-tests/src/main/java/org/apache/streampipes/performance/EdgeExtensionsGenericPerformanceTest.java
+++ b/streampipes-performance-tests/src/main/java/org/apache/streampipes/performance/EdgeExtensionsGenericPerformanceTest.java
@@ -18,11 +18,14 @@
 package org.apache.streampipes.performance;
 
 import org.apache.streampipes.performance.performancetest.Test;
-import org.apache.streampipes.logging.evaluation.EvaluationLogger;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 
 public class EdgeExtensionsGenericPerformanceTest {
 
+    private static final Logger LOG = LoggerFactory.getLogger(EdgeExtensionsGenericPerformanceTest.class.getCanonicalName());
+
     public static void main(String ... args){
 
         int numberOfRuns = Integer.parseInt(System.getenv("TEST_RUNS"));
@@ -31,6 +34,8 @@ public class EdgeExtensionsGenericPerformanceTest {
         for(int i = 1; i <= numberOfRuns; i++){
             if (i==numberOfRuns)
                 test.setStopPipeline(true);
+
+            LOG.info("Executing run: {}/{}", i, numberOfRuns);
             test.execute(i);
         }
     }
diff --git a/streampipes-performance-tests/src/main/java/org/apache/streampipes/performance/TestFactory.java b/streampipes-performance-tests/src/main/java/org/apache/streampipes/performance/TestFactory.java
index 062c067..c9dae0b 100644
--- a/streampipes-performance-tests/src/main/java/org/apache/streampipes/performance/TestFactory.java
+++ b/streampipes-performance-tests/src/main/java/org/apache/streampipes/performance/TestFactory.java
@@ -18,7 +18,9 @@
 package org.apache.streampipes.performance;
 
 import org.apache.streampipes.logging.evaluation.EvaluationLogger;
+import org.apache.streampipes.performance.performancetest.CountBasedOffloadingTest;
 import org.apache.streampipes.performance.performancetest.GenericTest;
+import org.apache.streampipes.performance.performancetest.PrioOffloadingTest;
 import org.apache.streampipes.performance.performancetest.Test;
 
 public class TestFactory {
@@ -45,6 +47,16 @@ public class TestFactory {
                 Object[] header_offloading = {"timestampInMillis", "deviceId", "event", "policy", "selectedProcessor"};
                 logger.logHeader("Offloading", header_offloading);
                 return getOffloadingTest();
+            case "OffloadingPrio":
+                Object[] header_offloading_multi = {"timestampInMillis", "deviceId", "event", "policy",
+                        "selectedProcessor"};
+                logger.logHeader("Offloading", header_offloading_multi);
+                return getOffloadingPrioSelectionTest();
+            case "OffloadingCountBased":
+                Object[] header_offloading_count_based = {"timestampInMillis", "deviceId", "event", "policy",
+                        "selectedProcessor"};
+                logger.logHeader("Offloading", header_offloading_count_based);
+                return getOffloadingCountBasedTest();
             default:
                 throw new RuntimeException("No test configuration found.");
         }
@@ -73,7 +85,16 @@ public class TestFactory {
 
     public static Test getOffloadingTest(){
         return new GenericTest(getPipelineName(), false, false,
-                true, 20000, 1500000);
+                true, 300000, 5400000);
+    }
+
+    public static Test getOffloadingPrioSelectionTest() {
+        return new PrioOffloadingTest(getLowPrioPipelineName(), getHighPrioPipelineName(),
+                false,  300000);
+    }
+
+    private static Test getOffloadingCountBasedTest() {
+        return new CountBasedOffloadingTest(getPipelineName(), false, 300000);
     }
 
     //Helpers
@@ -83,4 +104,16 @@ public class TestFactory {
         return pipelineName;
     }
 
+    private static String getLowPrioPipelineName() {
+        String pipelineName = System.getenv("TEST_LOW_PRIO_PIPELINE_NAME");
+        if (pipelineName==null) throw new RuntimeException("No Pipeline Name provided.");
+        return pipelineName;
+    }
+
+    private static String getHighPrioPipelineName() {
+        String pipelineName = System.getenv("TEST_HIGH_PRIO_PIPELINE_NAME");
+        if (pipelineName==null) throw new RuntimeException("No Pipeline Name provided.");
+        return pipelineName;
+    }
+
 }
diff --git a/streampipes-performance-tests/src/main/java/org/apache/streampipes/performance/performancetest/CountBasedOffloadingTest.java b/streampipes-performance-tests/src/main/java/org/apache/streampipes/performance/performancetest/CountBasedOffloadingTest.java
new file mode 100644
index 0000000..44f4dd7
--- /dev/null
+++ b/streampipes-performance-tests/src/main/java/org/apache/streampipes/performance/performancetest/CountBasedOffloadingTest.java
@@ -0,0 +1,327 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.apache.streampipes.performance.performancetest;
+
+import org.apache.streampipes.client.StreamPipesClient;
+import org.apache.streampipes.client.StreamPipesCredentials;
+import org.apache.streampipes.messaging.mqtt.MqttConsumer;
+import org.apache.streampipes.model.graph.DataProcessorInvocation;
+import org.apache.streampipes.model.grounding.MqttTransportProtocol;
+import org.apache.streampipes.model.grounding.SimpleTopicDefinition;
+import org.apache.streampipes.model.grounding.TopicDefinition;
+import org.apache.streampipes.model.pipeline.Pipeline;
+import org.apache.streampipes.model.pipeline.PipelineOperationStatus;
+import org.apache.streampipes.model.staticproperty.FreeTextStaticProperty;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.nio.charset.StandardCharsets;
+import java.time.Duration;
+import java.util.*;
+
+public class CountBasedOffloadingTest implements Test {
+
+    private static final Logger LOG = LoggerFactory.getLogger(CountBasedOffloadingTest.class.getCanonicalName());
+
+    private boolean stopPipeline;
+    private final long timeToSleepBetweenSteps;
+    private final StreamPipesClient client;
+    private final String pipelineName;
+    private Pipeline pipeline;
+    private final String[] cpuSteps;
+    private final int maxOffloadingActions;
+
+    public CountBasedOffloadingTest(String pipelineName,
+                                    boolean stopPipeline,
+                                    long timeToSleepBetweenSteps) {
+
+        this.pipelineName = pipelineName;
+        this.stopPipeline = stopPipeline;
+        this.timeToSleepBetweenSteps = timeToSleepBetweenSteps;
+
+        // Create an instance of the StreamPipes client
+        client = createStreamPipesClient();
+        pipeline = findPipelineByName(pipelineName);
+
+        this.cpuSteps = loadFromEnv();
+
+        this.maxOffloadingActions =  System.getenv("MAX_OFFLOADING_ACTIONS") != null ?
+                Integer.parseInt(System.getenv("MAX_OFFLOADING_ACTIONS")) : 50;
+    }
+
+    @Override
+    public void setStopPipeline(boolean stopPipeline) {
+        this.stopPipeline = stopPipeline;
+    }
+
+    @Override
+    public void execute(int nrRuns) {
+
+        //Start Pipeline
+        if (!pipeline.isRunning()) {
+            startPipeline();
+            // provide some time prior to reconfigure values
+            try {
+                Thread.sleep(5000);
+            } catch (InterruptedException e) {
+                e.printStackTrace();
+            }
+        }
+
+        Thread rampUpThread = beginRampUpAndOffloadingInterval();
+
+        // start offloading observer thread
+        Thread offloadingObserverThread = offloadingObserverThread(rampUpThread);
+        rampUpThread.start();
+        offloadingObserverThread.start();
+
+        try {
+            LOG.info("Wait until max. number offloadings is completed: {}", maxOffloadingActions);
+            rampUpThread.join();
+            offloadingObserverThread.join();
+        } catch (InterruptedException e) {
+            e.printStackTrace();
+        }
+
+        // reconfigure to base load and run for 5min
+        adaptPipelineCpuLoad();
+
+        Duration finalSegment =  Duration.ofMinutes(5);
+        try {
+            LOG.info("Sleep for final segment of offloading test: {} min", finalSegment.toMinutes());
+            Thread.sleep(finalSegment.toMillis());
+        } catch (InterruptedException e) {
+            e.printStackTrace();
+        }
+
+        //Stop Pipeline
+        if (stopPipeline && pipeline.isRunning()) {
+            stopPipeline(pipeline);
+        }
+    }
+
+
+    // Helpers
+
+    private void adaptPipelineCpuLoad() {
+        // Adjust pipeline CPU load
+        pipeline = findPipelineByName(pipelineName);
+
+        LOG.info("Prepare and reconfigure pipeline {} with {} on node {}",
+                pipeline.getName(),
+                "CPU Burner",
+                findDeploymentTargetByProcessorName(pipeline, "CPU Burner"));
+
+        prepareAndReconfigurePipeline(pipeline, "load", cpuSteps[0]);
+    }
+
+    private void startPipeline() {
+        PipelineOperationStatus startStatus = client.pipelines().start(pipeline);
+
+        String node = findDeploymentTargetByProcessorName(pipeline, "CPU Burner");
+
+        LOG.info("Start status pipeline {} with {} on node {}",
+                startStatus.getTitle(),
+                "CPU Burner",
+                node);
+        if (startStatus.isSuccess()) {
+            pipeline.setRunning(true);
+        }
+    }
+
+    private StreamPipesClient createStreamPipesClient() {
+        StreamPipesCredentials credentials = StreamPipesCredentials.from(
+                System.getenv("SP_USER"),
+                System.getenv("SP_API_KEY"));
+
+        return StreamPipesClient
+                .create(System.getenv("SP_HOST"),
+                        Integer.parseInt(System.getenv("SP_PORT")),
+                        credentials, true);
+    }
+
+    private void prepareAndReconfigurePipeline(Pipeline pipeline,
+                                               String fstInternalName,
+                                               String fstReconfigurationValue) {
+        preparingPipeline(pipeline, fstInternalName, fstReconfigurationValue);
+        reconfiguringPipeline(pipeline, fstReconfigurationValue);
+    }
+
+    private void preparingPipeline(Pipeline pipeline, String fstInternalName, String fstReconfigurationValue) {
+        LOG.info("Set CPU load for {} pipeline to: {} percent",
+                pipeline.getName(),
+                Float.parseFloat(fstReconfigurationValue) * 100);
+
+        pipeline.getSepas().forEach(processor -> processor.getStaticProperties().stream()
+                .filter(FreeTextStaticProperty.class::isInstance)
+                .map(FreeTextStaticProperty.class::cast)
+                .filter(FreeTextStaticProperty::isReconfigurable)
+                .forEach(fsp -> {
+                    if (fsp.getInternalName().equals(fstInternalName)) {
+                        fsp.setValue(fstReconfigurationValue);
+                    }
+                })
+        );
+    }
+
+    private void reconfiguringPipeline(Pipeline pipeline, String fstReconfigurationValue) {
+        LOG.info("Reconfiguration triggered with value " + fstReconfigurationValue);
+        PipelineOperationStatus statusReconfiguration = client.pipelines().reconfigure(pipeline);
+        if (!statusReconfiguration.isSuccess()) {
+            LOG.info("Pipeline {} successfully reconfigured", pipeline.getName());
+        }
+    }
+
+    private void stopPipeline(Pipeline pipeline) {
+        PipelineOperationStatus stopStatus = client.pipelines().stop(pipeline);
+        if (stopStatus.isSuccess()) {
+            LOG.info("Stop status pipeline: " + stopStatus.getTitle());
+            this.pipeline.setRunning(false);
+        } else {
+            LOG.info("Pipeline {} could not be stopped." + stopStatus.getPipelineName());
+        }
+    }
+
+    private Pipeline findPipelineByName(String pipelineName) {
+        List<Pipeline> pipelines = client.pipelines().all();
+        return pipelines.stream()
+                .filter(p -> p.getName().equals(pipelineName))
+                .findFirst()
+                .orElseThrow(() -> new RuntimeException("Pipeline not found"));
+    }
+
+    private String findDeploymentTargetByProcessorName(Pipeline pipeline, String processorName) {
+        DataProcessorInvocation graph = pipeline.getSepas().stream()
+                .filter(processor -> processor.getName().equals(processorName))
+                .findFirst()
+                .orElseThrow(() -> new RuntimeException("Processor not found"));
+        return graph.getDeploymentTargetNodeId();
+    }
+
+    private Thread offloadingObserverThread(Thread rampUpThread) {
+        LOG.info("Offloading observer thread started");
+        return new Thread() {
+            public synchronized void run() {
+
+                Map<String, Integer> offloadingDeviceMap = new HashMap<>();
+
+                String mqttLoggingUrlString = System.getenv("SP_LOGGING_MQTT_URL");
+                try {
+                    MqttTransportProtocol transportProtocol = makeMqttTransportProtocol(
+                            new URI(mqttLoggingUrlString),
+                            "Offloading");
+
+                    MqttConsumer consumer = new MqttConsumer();
+                    consumer.connect(transportProtocol, event -> {
+                        String[] eventArray = new String(event, StandardCharsets.UTF_8).split(",");
+                        boolean offloaded = Arrays.asList(eventArray).contains("offloading done");
+
+                        if (offloaded) {
+                            LOG.info("Pipeline element offloaded {}", eventArray[4]);
+
+                            String originDeviceId = eventArray[1];
+                            if (offloadingDeviceMap.get(originDeviceId) == null) {
+                                offloadingDeviceMap.put(originDeviceId, 1);
+                                LOG.info("Current number of offloadings for {}: {}/{}",
+                                        originDeviceId, 1, maxOffloadingActions);
+                            } else {
+                                int previousOffloadingCount = offloadingDeviceMap.get(originDeviceId);
+                                offloadingDeviceMap.put(originDeviceId, previousOffloadingCount + 1);
+                                LOG.info("Current number of offloadings for {}: {}/{}",
+                                        originDeviceId, previousOffloadingCount + 1, maxOffloadingActions);
+                            }
+
+                            boolean reachedMaxOffloadingActions = offloadingDeviceMap.values()
+                                    .stream()
+                                    .allMatch(numOffloadingActions -> numOffloadingActions >= maxOffloadingActions);
+
+                            if (reachedMaxOffloadingActions) {
+                                LOG.info("Reached max offloading actions");
+                                rampUpThread.interrupt();
+                                try {
+                                    consumer.close();
+                                } catch (Exception e) {
+                                    e.printStackTrace();
+                                }
+                            }
+                        }
+                    });
+                } catch (URISyntaxException e) {
+                    e.printStackTrace();
+                }
+            }
+        };
+    }
+
+    private Thread beginRampUpAndOffloadingInterval() {
+        LOG.info("Initial ramp up interval thread started");
+        return new Thread() {
+            public synchronized void run() {
+                int counter = 0;
+                while (!Thread.currentThread().isInterrupted()) {
+                    try {
+                        if (cpuSteps.length > 1 && counter != cpuSteps.length) {
+                            for (String cpuStep : cpuSteps) {
+
+                                if (counter > 0) {
+                                    LOG.info("Update pipeline locations");
+                                    pipeline = findPipelineByName(pipelineName);
+                                }
+
+                                String node = findDeploymentTargetByProcessorName(pipeline, "CPU Burner");
+
+                                LOG.info("Prepare and reconfigure pipeline {} with {} on node {}",
+                                        pipeline.getName(),
+                                        "CPU Burner",
+                                        node);
+                                prepareAndReconfigurePipeline(pipeline, "load", cpuStep);
+                                counter++;
+
+                                Thread.sleep(timeToSleepBetweenSteps);
+                            }
+                        }
+                    } catch (InterruptedException e) {
+                        LOG.info("Interrupted cpu steps thread");
+                        Thread.currentThread().interrupt();
+                    }
+                }
+            }
+        };
+    }
+
+    private MqttTransportProtocol makeMqttTransportProtocol(URI uri, String topic) {
+        MqttTransportProtocol transportProtocol = new MqttTransportProtocol();
+        TopicDefinition simpleTopicDefinition = new SimpleTopicDefinition(topic);
+        transportProtocol.setBrokerHostname(uri.getHost());
+        transportProtocol.setPort(uri.getPort());
+        transportProtocol.setTopicDefinition(simpleTopicDefinition);
+        return transportProtocol;
+    }
+
+    private String[] loadFromEnv() {
+        String[] cpuLoadSteps = System.getenv("OFFLOADING_CPU_LOAD_STEPS") != null ? System.getenv(
+                "OFFLOADING_CPU_LOAD_STEPS").split(";") : null;
+        if (cpuLoadSteps == null) {
+            throw new RuntimeException("CPU load steps must be provided");
+        }
+        return cpuLoadSteps;
+    }
+
+}
diff --git a/streampipes-performance-tests/src/main/java/org/apache/streampipes/performance/performancetest/GenericTest.java b/streampipes-performance-tests/src/main/java/org/apache/streampipes/performance/performancetest/GenericTest.java
index 25e7a1a..6921114 100644
--- a/streampipes-performance-tests/src/main/java/org/apache/streampipes/performance/performancetest/GenericTest.java
+++ b/streampipes-performance-tests/src/main/java/org/apache/streampipes/performance/performancetest/GenericTest.java
@@ -25,7 +25,6 @@ import org.apache.streampipes.model.graph.DataProcessorInvocation;
 import org.apache.streampipes.model.pipeline.Pipeline;
 import org.apache.streampipes.model.pipeline.PipelineOperationStatus;
 import org.apache.streampipes.model.staticproperty.FreeTextStaticProperty;
-
 import java.util.List;
 import java.util.Optional;
 
@@ -41,8 +40,16 @@ public class GenericTest implements Test{
     private final EvaluationLogger evalLogger = EvaluationLogger.getInstance();
     private float reconfigurableValue = 1;
 
-    public GenericTest(String pipelineName, boolean stopPipeline, boolean shouldBeMigrated, boolean shouldBeReconfigured,
-                       long timeToSleepBeforeManipulation, long timeToSleepAfterManipulation){
+    private final String pipelineName;
+
+    public GenericTest(String pipelineName,
+                       boolean stopPipeline,
+                       boolean shouldBeMigrated,
+                       boolean shouldBeReconfigured,
+                       long timeToSleepBeforeManipulation,
+                       long timeToSleepAfterManipulation){
+
+        this.pipelineName = pipelineName;
         this.stopPipeline = stopPipeline;
         this.shouldBeMigrated = shouldBeMigrated;
         this.shouldBeReconfigured = shouldBeReconfigured;
@@ -69,7 +76,13 @@ public class GenericTest implements Test{
     public void execute(int nrRuns) {
 
         String testType = System.getenv("TEST_TYPE");
-        String offloadingThreshold = System.getenv("OFFLOADING_THRESHOLD");
+
+        String[] cpuLoadTargets = System.getenv("OFFLOADING_CPU_LOAD_STEPS") != null ? System.getenv(
+                "OFFLOADING_CPU_LOAD_STEPS").split(";") : null;
+        String cpuEndLoad = System.getenv("OFFLOADING_CPU_LOAD_END") != null ?
+                System.getenv("OFFLOADING_CPU_LOAD_END") : "0.2";
+
+        // 0.5;0.9
 
         Object[] line = null;
 
@@ -115,32 +128,56 @@ public class GenericTest implements Test{
         }
         //Reconfiguration
         if (shouldBeReconfigured) {
-            if (testType.equals("Reconfiguration"))
-                pipeline.getSepas().forEach(p -> p.getStaticProperties().stream()
-                    .filter(FreeTextStaticProperty.class::isInstance)
-                    .map(FreeTextStaticProperty.class::cast)
-                    .filter(FreeTextStaticProperty::isReconfigurable)
-                    .forEach(sp -> {
-                        if (sp.getInternalName().equals("i-am-reconfigurable")) {
-                            sp.setValue(Float.toString(this.reconfigurableValue++));
-                        }
-                    }));
-            else if (testType.equals("Offloading"))
+            if (testType.equals("Reconfiguration")) {
                 pipeline.getSepas().forEach(p -> p.getStaticProperties().stream()
                         .filter(FreeTextStaticProperty.class::isInstance)
                         .map(FreeTextStaticProperty.class::cast)
                         .filter(FreeTextStaticProperty::isReconfigurable)
                         .forEach(sp -> {
-                            if (sp.getInternalName().equals("load")) {
-                                sp.setValue(offloadingThreshold != null ? offloadingThreshold : "0.9");
+                            if (sp.getInternalName().equals("i-am-reconfigurable")) {
+                                sp.setValue(Float.toString(this.reconfigurableValue++));
                             }
                         }));
-            line = new Object[]{"Reconfiguration triggered", nrRuns, (this.reconfigurableValue - 1), true};
-            System.out.println("Reconfiguration triggered with value " + (this.reconfigurableValue-1));
-            PipelineOperationStatus message = client.pipelines().reconfigure(pipeline);
-            System.out.println(message.getTitle());
-            if (!message.isSuccess()) {
-                line[line.length -1] = false;
+
+                line = reconfigurePipeline(pipeline, nrRuns);
+            }
+            else if (testType.equals("Offloading")) {
+
+                // when having load profiles
+                if (cpuLoadTargets != null && cpuLoadTargets.length > 1) {
+                    for (String load : cpuLoadTargets) {
+                        System.out.printf("Set CPU load to: %f percent", Float.parseFloat(load) * 100);
+                        pipeline.getSepas().forEach(p -> p.getStaticProperties().stream()
+                                .filter(FreeTextStaticProperty.class::isInstance)
+                                .map(FreeTextStaticProperty.class::cast)
+                                .filter(FreeTextStaticProperty::isReconfigurable)
+                                .forEach(sp -> {
+                                    if (sp.getInternalName().equals("load")) {
+                                        sp.setValue(load);
+                                    }
+                                }));
+
+                        line = reconfigurePipeline(pipeline, nrRuns);
+
+                        try {
+                            Thread.sleep(timeToSleepBeforeManipulation);
+                        } catch (InterruptedException e) {
+                            e.printStackTrace();
+                        }
+                    }
+                } else {
+                    pipeline.getSepas().forEach(p -> p.getStaticProperties().stream()
+                            .filter(FreeTextStaticProperty.class::isInstance)
+                            .map(FreeTextStaticProperty.class::cast)
+                            .filter(FreeTextStaticProperty::isReconfigurable)
+                            .forEach(sp -> {
+                                if (sp.getInternalName().equals("load")) {
+                                    sp.setValue("0.9");
+                                }
+                            }));
+
+                    line = reconfigurePipeline(pipeline, nrRuns);
+                }
             }
         }
 
@@ -149,7 +186,33 @@ public class GenericTest implements Test{
                 long sleepTime = timeToSleepAfterManipulation + (long)(Math.random()*10000);
                 Thread.sleep(sleepTime);
             }else {
+
                 Thread.sleep(timeToSleepAfterManipulation);
+
+                if (shouldBeReconfigured) {
+                    // Reset to baseload
+                    if (testType.equals("Offloading")) {
+                        System.out.printf("Set CPU load to: %f percent", Float.parseFloat(cpuEndLoad) * 100);
+
+                        List<Pipeline> pipelines = client.pipelines().all();
+                        Pipeline pipelineLatest = pipelines.stream()
+                                .filter(p -> p.getName().equals(pipelineName))
+                                .findFirst()
+                                .orElseThrow(() -> new RuntimeException("Pipeline not found"));
+
+                        pipelineLatest.getSepas().forEach(p -> p.getStaticProperties().stream()
+                                .filter(FreeTextStaticProperty.class::isInstance)
+                                .map(FreeTextStaticProperty.class::cast)
+                                .filter(FreeTextStaticProperty::isReconfigurable)
+                                .forEach(sp -> {
+                                    if (sp.getInternalName().equals("load")) {
+                                        sp.setValue(cpuEndLoad);
+                                    }
+                                }));
+                        line = reconfigurePipeline(pipelineLatest, nrRuns);
+                        Thread.sleep(timeToSleepBeforeManipulation);
+                    }
+                }
             }
         } catch (InterruptedException e) {
             e.printStackTrace();
@@ -176,6 +239,17 @@ public class GenericTest implements Test{
         }
     }
 
+    private Object[] reconfigurePipeline(Pipeline pipeline, int nrRuns) {
+        Object[] line = new Object[]{"Reconfiguration triggered", nrRuns, (this.reconfigurableValue - 1), true};
+        System.out.println("Reconfiguration triggered with value " + (this.reconfigurableValue-1));
+        PipelineOperationStatus message = client.pipelines().reconfigure(pipeline);
+        System.out.println(message.getTitle());
+        if (!message.isSuccess()) {
+            line[line.length -1] = false;
+        }
+        return line;
+    }
+
     private void executeReconfiguration(){
         if (!pipeline.isRunning()) {
             PipelineOperationStatus startMessage = client.pipelines().start(pipeline);
diff --git a/streampipes-performance-tests/src/main/java/org/apache/streampipes/performance/performancetest/PrioOffloadingTest.java b/streampipes-performance-tests/src/main/java/org/apache/streampipes/performance/performancetest/PrioOffloadingTest.java
new file mode 100644
index 0000000..8bd4469
--- /dev/null
+++ b/streampipes-performance-tests/src/main/java/org/apache/streampipes/performance/performancetest/PrioOffloadingTest.java
@@ -0,0 +1,379 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.apache.streampipes.performance.performancetest;
+
+import org.apache.streampipes.client.StreamPipesClient;
+import org.apache.streampipes.client.StreamPipesCredentials;
+import org.apache.streampipes.messaging.mqtt.MqttConsumer;
+import org.apache.streampipes.model.Tuple2;
+import org.apache.streampipes.model.graph.DataProcessorInvocation;
+import org.apache.streampipes.model.grounding.MqttTransportProtocol;
+import org.apache.streampipes.model.grounding.SimpleTopicDefinition;
+import org.apache.streampipes.model.grounding.TopicDefinition;
+import org.apache.streampipes.model.pipeline.Pipeline;
+import org.apache.streampipes.model.pipeline.PipelineOperationStatus;
+import org.apache.streampipes.model.staticproperty.FreeTextStaticProperty;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.nio.charset.StandardCharsets;
+import java.time.Duration;
+import java.time.Instant;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+
+public class PrioOffloadingTest implements Test {
+
+    private static final Logger LOG = LoggerFactory.getLogger(PrioOffloadingTest.class.getCanonicalName());
+
+    private boolean stopPipeline;
+    private final long timeToSleepBetweenSteps;
+    private final StreamPipesClient client;
+
+    private final String pipelineNameLow;
+    private final String pipelineNameHigh;
+
+    private Pipeline pipelineLowPrio;
+    private Pipeline pipelineHighPrio;
+
+    private final List<Tuple2<String, String>> cpuSteps;
+
+    public PrioOffloadingTest(String pipelineNameLow,
+                              String pipelineNameHigh,
+                              boolean stopPipeline,
+                              long timeToSleepBetweenSteps) {
+
+        this.pipelineNameLow = pipelineNameLow;
+        this.pipelineNameHigh = pipelineNameHigh;
+        this.stopPipeline = stopPipeline;
+        this.timeToSleepBetweenSteps = timeToSleepBetweenSteps;
+
+        // Create an instance of the StreamPipes client
+        client = createStreamPipesClient();
+
+        pipelineLowPrio = findPipelineByName(pipelineNameLow);
+        pipelineHighPrio = findPipelineByName(pipelineNameHigh);
+
+        this.cpuSteps = loadFromEnv();
+    }
+
+    @Override
+    public void setStopPipeline(boolean stopPipeline) {
+        this.stopPipeline = stopPipeline;
+    }
+
+    @Override
+    public void execute(int nrRuns) {
+
+        Instant start = Instant.now();
+
+        //Start Pipeline
+        if (!pipelineLowPrio.isRunning() && !pipelineHighPrio.isRunning()) {
+            startingPipelines();
+            // provide some time prior to reconfigure values
+            try {
+                Thread.sleep(5000);
+            } catch (InterruptedException e) {
+                e.printStackTrace();
+            }
+        }
+
+        Thread initialCpuStepThread = getInitialCpuStepThread();
+        // start offloading observer thread
+        Thread offloadingObserverThread = offloadingObserverThread(initialCpuStepThread);
+
+        initialCpuStepThread.start();
+        offloadingObserverThread.start();
+
+        try {
+
+            LOG.info("Wait for offloading to take place");
+            initialCpuStepThread.join();
+            offloadingObserverThread.join();
+
+        } catch (InterruptedException e) {
+            e.printStackTrace();
+        }
+
+        Instant end = Instant.now();
+        Duration elapsed = Duration.between(start, end);
+        Duration remaining = Duration.ofMinutes(30).minus(elapsed.plus(Duration.ofMinutes(5)));
+
+        adaptHighPrioPipelineCpuLoad();
+
+        try {
+            LOG.info("Sleep for remaining duration of offloading interval: {} min", remaining.toMinutes());
+            Thread.sleep(remaining.toMillis());
+        } catch (InterruptedException e) {
+            e.printStackTrace();
+        }
+
+        adaptHighLowPrioPipelineCpuLoad();
+
+        Duration finalSegment =  Duration.ofMinutes(5);
+        try {
+            LOG.info("Sleep for final segment of offloading test: {} min", finalSegment.toMinutes());
+            Thread.sleep(finalSegment.toMillis());
+        } catch (InterruptedException e) {
+            e.printStackTrace();
+        }
+
+        //Stop Pipeline
+        if (stopPipeline && pipelineLowPrio.isRunning() && pipelineHighPrio.isRunning()) {
+            stoppingPipeline(pipelineLowPrio);
+            stoppingPipeline(pipelineHighPrio);
+        }
+    }
+
+
+    // Helpers
+
+    private void adaptHighPrioPipelineCpuLoad() {
+
+        // Adjust high prio pipeline CPU load, leave low prio pipeline at given CPU load
+        pipelineHighPrio = findPipelineByName(pipelineNameHigh);
+
+        LOG.info("Prepare and reconfigure high prio pipeline {} with {} on node {}",
+                pipelineHighPrio.getName(),
+                "CPU Burner",
+                findDeploymentTargetByProcessorName(pipelineHighPrio, "CPU Burner"));
+        String highPrioCpuStepBeforeOffloading = cpuSteps.get(1).b;
+        prepareAndReconfigurePipeline(pipelineHighPrio, "load", highPrioCpuStepBeforeOffloading);
+    }
+
+    private void adaptHighLowPrioPipelineCpuLoad() {
+        LOG.info("Reduce CPU load for both high and low prio pipeline");
+
+        // Adjust high prio pipeline CPU load, leave low prio pipeline at given CPU load
+        pipelineLowPrio = findPipelineByName(pipelineNameLow);
+        pipelineHighPrio = findPipelineByName(pipelineNameHigh);
+
+        LOG.info("Prepare and reconfigure low prio pipeline {} with {} on node {}",
+                pipelineLowPrio.getName(),
+                "CPU Burner",
+                findDeploymentTargetByProcessorName(pipelineLowPrio, "CPU Burner"));
+        String lowPrioCpuStepBeforeOffloading = cpuSteps.get(0).a;
+        prepareAndReconfigurePipeline(pipelineLowPrio, "load", lowPrioCpuStepBeforeOffloading);
+
+        LOG.info("Prepare and reconfigure high prio pipeline {} with {} on node {}",
+                pipelineHighPrio.getName(),
+                "CPU Burner",
+                findDeploymentTargetByProcessorName(pipelineHighPrio, "CPU Burner"));
+        String highPrioCpuStepBeforeOffloading = cpuSteps.get(0).b;
+        prepareAndReconfigurePipeline(pipelineHighPrio, "load", highPrioCpuStepBeforeOffloading);
+    }
+
+    private void startingPipelines() {
+        PipelineOperationStatus startStatusLowPrio = client.pipelines().start(pipelineLowPrio);
+        PipelineOperationStatus startStatusHighPrio = client.pipelines().start(pipelineHighPrio);
+
+        String lowPrioNode = findDeploymentTargetByProcessorName(pipelineLowPrio, "CPU Burner");
+        String highPrioNode = findDeploymentTargetByProcessorName(pipelineHighPrio, "CPU Burner");
+
+        LOG.info("Start status low prio pipeline {} with {} on node {}",
+                startStatusLowPrio.getTitle(),
+                "CPU Burner",
+                lowPrioNode);
+        if (startStatusLowPrio.isSuccess()) {
+            pipelineLowPrio.setRunning(true);
+        }
+
+        LOG.info("Start status high prio pipeline {} with {} on node {}",
+                startStatusHighPrio.getTitle(),
+                "CPU Burner",
+                highPrioNode);
+        if (startStatusHighPrio.isSuccess()) {
+            pipelineHighPrio.setRunning(true);
+        }
+    }
+
+    private StreamPipesClient createStreamPipesClient() {
+        StreamPipesCredentials credentials = StreamPipesCredentials.from(
+                System.getenv("SP_USER"),
+                System.getenv("SP_API_KEY"));
+
+        return StreamPipesClient
+                .create(System.getenv("SP_HOST"),
+                        Integer.parseInt(System.getenv("SP_PORT")),
+                        credentials, true);
+    }
+
+    private void prepareAndReconfigurePipeline(Pipeline pipeline,
+                                               String fstInternalName,
+                                               String fstReconfigurationValue) {
+        preparingPipeline(pipeline, fstInternalName, fstReconfigurationValue);
+        reconfiguringPipeline(pipeline, fstReconfigurationValue);
+    }
+
+    private void preparingPipeline(Pipeline pipeline, String fstInternalName, String fstReconfigurationValue) {
+        LOG.info("Set CPU load for {} pipeline to: {} percent",
+                pipeline.getName(),
+                Float.parseFloat(fstReconfigurationValue) * 100);
+
+        pipeline.getSepas().forEach(processor -> processor.getStaticProperties().stream()
+                .filter(FreeTextStaticProperty.class::isInstance)
+                .map(FreeTextStaticProperty.class::cast)
+                .filter(FreeTextStaticProperty::isReconfigurable)
+                .forEach(fsp -> {
+                    if (fsp.getInternalName().equals(fstInternalName)) {
+                        fsp.setValue(fstReconfigurationValue);
+                    }
+                })
+        );
+    }
+
+    private void reconfiguringPipeline(Pipeline pipeline, String fstReconfigurationValue) {
+        LOG.info("Reconfiguration triggered with value " + fstReconfigurationValue);
+        PipelineOperationStatus statusReconfiguration = client.pipelines().reconfigure(pipeline);
+        if (!statusReconfiguration.isSuccess()) {
+            LOG.info("Pipeline {} successfully reconfigured", pipeline.getName());
+        }
+    }
+
+    private void stoppingPipeline(Pipeline pipeline) {
+        PipelineOperationStatus stopStatus = client.pipelines().stop(pipeline);
+        if (stopStatus.isSuccess()) {
+            LOG.info("Stop status pipeline: " + stopStatus.getTitle());
+            pipelineLowPrio.setRunning(false);
+        } else {
+            LOG.info("Pipeline {} could not be stopped." + stopStatus.getPipelineName());
+        }
+    }
+
+    private Pipeline findPipelineByName(String pipelineName) {
+        List<Pipeline> pipelines = client.pipelines().all();
+        return pipelines.stream()
+                .filter(p -> p.getName().equals(pipelineName))
+                .findFirst()
+                .orElseThrow(() -> new RuntimeException("Pipeline not found"));
+    }
+
+    private String findDeploymentTargetByProcessorName(Pipeline pipeline, String processorName) {
+        DataProcessorInvocation graph = pipeline.getSepas().stream()
+                .filter(processor -> processor.getName().equals(processorName))
+                .findFirst()
+                .orElseThrow(() -> new RuntimeException("Processor not found"));
+        return graph.getDeploymentTargetNodeId();
+    }
+
+    private Thread offloadingObserverThread(Thread cpuStepThread) {
+        LOG.info("Offloading observer thread started");
+        return new Thread() {
+            public synchronized void run() {
+                String mqttLoggingUrlString = System.getenv("SP_LOGGING_MQTT_URL");
+                try {
+                    MqttTransportProtocol transportProtocol = makeMqttTransportProtocol(
+                            new URI(mqttLoggingUrlString),
+                            "Offloading");
+
+                    MqttConsumer consumer = new MqttConsumer();
+                    consumer.connect(transportProtocol, event -> {
+                        String[] eventArray = new String(event, StandardCharsets.UTF_8).split(",");
+                        boolean offloaded = Arrays.asList(eventArray).contains("offloading done");
+                        if (offloaded) {
+                            LOG.info("Low prio pipeline element offloaded {}", eventArray[4]);
+                            cpuStepThread.interrupt();
+                            try {
+                                consumer.close();
+                            } catch (Exception e) {
+                                e.printStackTrace();
+                            }
+                        }
+                    });
+                } catch (URISyntaxException e) {
+                    e.printStackTrace();
+                }
+            }
+        };
+    }
+
+    private Thread getInitialCpuStepThread() {
+        LOG.info("Initial CPU step thread started");
+        return new Thread(() -> {
+            while (!Thread.currentThread().isInterrupted()) {
+                try {
+                    int counter = 0;
+                    if (cpuSteps.size() > 1) {
+                        for (Tuple2<String, String> cpuStep : cpuSteps) {
+
+                            if (counter > 0) {
+                                LOG.info("Update pipeline locations");
+                                pipelineLowPrio = findPipelineByName(pipelineNameLow);
+                                pipelineHighPrio = findPipelineByName(pipelineNameHigh);
+                            }
+
+                            String lowPrioCpuStep = cpuStep.a;
+                            String highPrioCpuStep = cpuStep.b;
+
+                            String lowPrioNode = findDeploymentTargetByProcessorName(pipelineLowPrio, "CPU Burner");
+                            String highPrioNode = findDeploymentTargetByProcessorName(pipelineHighPrio, "CPU Burner");
+
+                            LOG.info("Prepare and reconfigure low prio pipeline {} with {} on node {}",
+                                    pipelineLowPrio.getName(),
+                                    "CPU Burner",
+                                    lowPrioNode);
+                            prepareAndReconfigurePipeline(pipelineLowPrio, "load", lowPrioCpuStep);
+
+                            LOG.info("Prepare and reconfigure high prio pipeline {} with {} on node {}",
+                                    pipelineHighPrio.getName(),
+                                    "CPU Burner",
+                                    highPrioNode);
+                            prepareAndReconfigurePipeline(pipelineHighPrio, "load", highPrioCpuStep);
+
+                            counter++;
+
+                            Thread.sleep(timeToSleepBetweenSteps);
+                        }
+                    }
+                } catch (InterruptedException e) {
+                    LOG.info("Interrupted cpu steps thread");
+                    Thread.currentThread().interrupt();
+                }
+            }
+        });
+    }
+
+    private MqttTransportProtocol makeMqttTransportProtocol(URI uri, String topic) {
+        MqttTransportProtocol transportProtocol = new MqttTransportProtocol();
+        TopicDefinition simpleTopicDefinition = new SimpleTopicDefinition(topic);
+        transportProtocol.setBrokerHostname(uri.getHost());
+        transportProtocol.setPort(uri.getPort());
+        transportProtocol.setTopicDefinition(simpleTopicDefinition);
+        return transportProtocol;
+    }
+
+    private List<Tuple2<String, String>> loadFromEnv() {
+        String[] cpuLoadStepsLowPrio = System.getenv("OFFLOADING_CPU_LOAD_STEPS_LOW_PRIO").split(";");
+        String[] cpuLoadStepsHighPrio = System.getenv("OFFLOADING_CPU_LOAD_STEPS_HIGH_PRIO").split(";");
+
+        if (cpuLoadStepsLowPrio.length != cpuLoadStepsHighPrio.length) {
+            throw new RuntimeException("CPU load steps array must be of equal size");
+        }
+
+        List<Tuple2<String, String>> cpuSteps = new ArrayList<>();
+        int length = cpuLoadStepsLowPrio.length;
+        for (int i = 0; i < length; i++) {
+            cpuSteps.add(new Tuple2<>(cpuLoadStepsLowPrio[i], cpuLoadStepsHighPrio[i]));
+        }
+        return cpuSteps;
+    }
+
+}
diff --git a/streampipes-performance-tests/src/main/java/org/apache/streampipes/performance/performancetest/TestConsumer.java b/streampipes-performance-tests/src/main/java/org/apache/streampipes/performance/performancetest/TestConsumer.java
new file mode 100644
index 0000000..04f4284
--- /dev/null
+++ b/streampipes-performance-tests/src/main/java/org/apache/streampipes/performance/performancetest/TestConsumer.java
@@ -0,0 +1,155 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.apache.streampipes.performance.performancetest;
+
+import org.apache.streampipes.messaging.mqtt.MqttConsumer;
+import org.apache.streampipes.model.Tuple2;
+import org.apache.streampipes.model.grounding.MqttTransportProtocol;
+import org.apache.streampipes.model.grounding.SimpleTopicDefinition;
+import org.apache.streampipes.model.grounding.TopicDefinition;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.nio.charset.StandardCharsets;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.Map;
+
+public class TestConsumer {
+
+    private static final Logger LOG = LoggerFactory.getLogger(CountBasedOffloadingTest.class.getCanonicalName());
+    private static final int maxOffloadingActions = 2;
+
+    private static volatile boolean maxOffloadingsReached = false;
+    private static final String[] cpuSteps = new String[]{"0.2","0.5", "0.9"};
+
+    public static void main(String ... args) throws Exception {
+
+        // start offloading observer thread
+        Thread initialRampUpThread = getInitialCpuStepThread();
+        Thread offloadingObserverThread = offloadingObserverThread(initialRampUpThread);
+        initialRampUpThread.start();
+        offloadingObserverThread.start();
+
+        try {
+            LOG.info("Wait until max. number offloadings is completed: {}", maxOffloadingActions);
+            initialRampUpThread.join();
+            offloadingObserverThread.join();
+        } catch (InterruptedException e) {
+            e.printStackTrace();
+        }
+
+
+        LOG.info("Continue Outside...");
+
+    }
+
+
+    private static Thread offloadingObserverThread(Thread initialRampUpThread) {
+        LOG.info("Offloading observer thread started");
+        return new Thread() {
+            public synchronized void run() {
+
+                Map<String, Integer> offloadingDeviceMap = new HashMap<>();
+
+                String mqttLoggingUrlString = System.getenv("SP_LOGGING_MQTT_URL");
+                try {
+                    MqttTransportProtocol transportProtocol = makeMqttTransportProtocol(
+                            new URI(mqttLoggingUrlString),
+                            "Offloading");
+
+                    MqttConsumer consumer = new MqttConsumer();
+                    consumer.connect(transportProtocol, event -> {
+                        String[] eventArray = new String(event, StandardCharsets.UTF_8).split(",");
+                        boolean offloaded = Arrays.asList(eventArray).contains("offloading done");
+
+                        if (offloaded) {
+                            LOG.info("Pipeline element offloaded {}", eventArray[4]);
+
+                            String originDeviceId = eventArray[1];
+                            if (offloadingDeviceMap.get(originDeviceId) == null) {
+                                offloadingDeviceMap.put(originDeviceId, 1);
+                                LOG.info("Current number of offloadings for {}: {}/{}",
+                                        originDeviceId, 1, maxOffloadingActions);
+                            } else {
+                                int previousOffloadingCount = offloadingDeviceMap.get(originDeviceId);
+                                offloadingDeviceMap.put(originDeviceId, previousOffloadingCount + 1);
+                                LOG.info("Current number of offloadings for {}: {}/{}",
+                                        originDeviceId, previousOffloadingCount + 1, maxOffloadingActions);
+                            }
+
+                            boolean reachedMaxOffloadingActions = offloadingDeviceMap.values()
+                                    .stream()
+                                    .allMatch(numOffloadingActions -> numOffloadingActions >= maxOffloadingActions);
+
+                            if (reachedMaxOffloadingActions) {
+                                LOG.info("Reached max offloading actions");
+                                maxOffloadingsReached = true;
+                                initialRampUpThread.interrupt();
+                                try {
+                                    consumer.close();
+                                } catch (Exception e) {
+                                    e.printStackTrace();
+                                }
+                            }
+
+                        }
+                    });
+                } catch (URISyntaxException e) {
+                    e.printStackTrace();
+                }
+            }
+        };
+    }
+
+    private static MqttTransportProtocol makeMqttTransportProtocol(URI uri, String topic) {
+        MqttTransportProtocol transportProtocol = new MqttTransportProtocol();
+        TopicDefinition simpleTopicDefinition = new SimpleTopicDefinition(topic);
+        transportProtocol.setBrokerHostname(uri.getHost());
+        transportProtocol.setPort(uri.getPort());
+        transportProtocol.setTopicDefinition(simpleTopicDefinition);
+        return transportProtocol;
+    }
+
+    private static Thread getInitialCpuStepThread() {
+        LOG.info("Initial CPU step thread started");
+        return new Thread(() -> {
+            int counter = 0;
+            while (!Thread.currentThread().isInterrupted()) {
+                try {
+                    if (cpuSteps.length > 1 & counter != cpuSteps.length) {
+                        for (String cpuStep : cpuSteps) {
+
+                            LOG.info("Reconfigure wiht {}", cpuStep);
+
+                            counter++;
+
+                            Thread.sleep(5000);
+                        }
+                    }
+                } catch (InterruptedException e) {
+                    LOG.info("Interrupted cpu steps thread");
+                    Thread.currentThread().interrupt();
+                }
+            }
+        });
+    }
+
+}