You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@streampipes.apache.org by wi...@apache.org on 2021/12/01 15:19:14 UTC
[incubator-streampipes] branch edge-extensions updated: latest changes to performance evaluation
This is an automated email from the ASF dual-hosted git repository.
wiener pushed a commit to branch edge-extensions
in repository https://gitbox.apache.org/repos/asf/incubator-streampipes.git
The following commit(s) were added to refs/heads/edge-extensions by this push:
new d03e36a latest changes to performance evaluation
d03e36a is described below
commit d03e36a0a8a314bd40442400b756909f019ed2fa
Author: Patrick Wiener <wi...@fzi.de>
AuthorDate: Tue Nov 30 16:05:42 2021 +0100
latest changes to performance evaluation
---
docker-build.sh | 4 +-
streampipes-node-controller/Dockerfile | 13 +-
streampipes-node-controller/aarch64.Dockerfile | 13 +-
streampipes-node-controller/arm.Dockerfile | 13 +-
.../node/controller/config/EnvConfigParam.java | 1 +
.../node/controller/config/NodeConfiguration.java | 14 +
.../offloading/OffloadingPolicyManager.java | 17 +-
.../strategies/OffloadingStrategyFactory.java | 23 +-
...ionStrategy.java => SelectionStrategyType.java} | 21 +-
.../ThresholdViolationOffloadingPolicy.java | 33 +-
.../selection/PrioritySelectionStrategy.java | 65 ++--
.../selection/RandomSelectionStrategy.java | 9 +-
.../strategies/selection/SelectionStrategy.java | 2 +-
.../PipelineElementReconfigurationHandler.java | 6 +
.../management/relay/DataStreamRelayManager.java | 6 +
.../statscollector/DockerStatsCollector.java | 2 +-
.../EdgeExtensionsGenericPerformanceTest.java | 7 +-
.../streampipes/performance/TestFactory.java | 35 +-
.../performancetest/CountBasedOffloadingTest.java | 327 ++++++++++++++++++
.../performance/performancetest/GenericTest.java | 120 +++++--
.../performancetest/PrioOffloadingTest.java | 379 +++++++++++++++++++++
.../performance/performancetest/TestConsumer.java | 155 +++++++++
22 files changed, 1154 insertions(+), 111 deletions(-)
diff --git a/docker-build.sh b/docker-build.sh
index 1cdde4a..7081b26 100755
--- a/docker-build.sh
+++ b/docker-build.sh
@@ -28,7 +28,7 @@ docker_build_amd(){
docker_build_arm(){
echo "Docker build for arm ..."
- docker buildx build \
+ docker buildx build --no-cache --pull \
--platform linux/arm/v7 \
-t $repo/$1:$version-armv7 \
-f $2/arm.Dockerfile $2 --load
@@ -36,7 +36,7 @@ docker_build_arm(){
docker_build_aarch64(){
echo "Docker build for aarch64 ..."
- docker buildx build \
+ docker buildx build --no-cache --pull \
--platform linux/arm64 \
-t $repo/$1:$version-aarch64 \
-f $2/aarch64.Dockerfile $2 --load
diff --git a/streampipes-node-controller/Dockerfile b/streampipes-node-controller/Dockerfile
index 7ded02f..8c3a6b4 100644
--- a/streampipes-node-controller/Dockerfile
+++ b/streampipes-node-controller/Dockerfile
@@ -19,7 +19,7 @@ FROM $BASE_IMAGE
MAINTAINER dev@streampipes.apache.org
-EXPOSE 7077
+EXPOSE 7077 9010
ENV CONSUL_LOCATION consul
# Comment:
@@ -31,4 +31,13 @@ RUN set -x; \
COPY target/streampipes-node-controller.jar /streampipes-node-controller.jar
-ENTRYPOINT ["java", "-jar", "/streampipes-node-controller.jar"]
+ENTRYPOINT ["java", \
+ "-Djava.rmi.server.hostname=ipe-streamnuc-2.fzi.de", \
+ "-Dcom.sun.management.jmxremote", \
+ "-Dcom.sun.management.jmxremote.port=9010", \
+ "-Dcom.sun.management.jmxremote.rmi.port=9010", \
+ "-Dcom.sun.management.jmxremote.local.only=false", \
+ "-Dcom.sun.management.jmxremote.ssl=false", \
+ "-Dcom.sun.management.jmxremote.authenticate=false", \
+ "-jar", \
+ "/streampipes-node-controller.jar"]
diff --git a/streampipes-node-controller/aarch64.Dockerfile b/streampipes-node-controller/aarch64.Dockerfile
index 856f64a..b51677e 100644
--- a/streampipes-node-controller/aarch64.Dockerfile
+++ b/streampipes-node-controller/aarch64.Dockerfile
@@ -22,7 +22,7 @@ RUN apt -y update; \
FROM $BASE_IMAGE
MAINTAINER dev@streampipes.apache.org
-EXPOSE 7077
+EXPOSE 7077 9010
ENV CONSUL_LOCATION consul
COPY --from=build-dev /usr/bin/qemu-aarch64-static /usr/bin
@@ -34,4 +34,13 @@ RUN set -ex; \
COPY target/streampipes-node-controller.jar /streampipes-node-controller.jar
-ENTRYPOINT ["java", "-jar", "/streampipes-node-controller.jar"]
+ENTRYPOINT ["java", \
+ "-Djava.rmi.server.hostname=0.0.0.0", \
+ "-Dcom.sun.management.jmxremote", \
+ "-Dcom.sun.management.jmxremote.port=9010", \
+ "-Dcom.sun.management.jmxremote.rmi.port=9010", \
+ "-Dcom.sun.management.jmxremote.local.only=false", \
+ "-Dcom.sun.management.jmxremote.ssl=false", \
+ "-Dcom.sun.management.jmxremote.authenticate=false", \
+ "-jar", \
+ "/streampipes-node-controller.jar"]
diff --git a/streampipes-node-controller/arm.Dockerfile b/streampipes-node-controller/arm.Dockerfile
index a155288..01d639b 100644
--- a/streampipes-node-controller/arm.Dockerfile
+++ b/streampipes-node-controller/arm.Dockerfile
@@ -22,7 +22,7 @@ RUN apt -y update; \
FROM $BASE_IMAGE
MAINTAINER dev@streampipes.apache.org
-EXPOSE 7077
+EXPOSE 7077 9010
ENV CONSUL_LOCATION consul
COPY --from=build-dev /usr/bin/qemu-arm-static /usr/bin
@@ -38,4 +38,13 @@ RUN set -ex; \
COPY target/streampipes-node-controller.jar /streampipes-node-controller.jar
-ENTRYPOINT ["java", "-jar", "/streampipes-node-controller.jar"]
+ENTRYPOINT ["java", \
+ "-Djava.rmi.server.hostname=ipe-streampi-02.fzi.de", \
+ "-Dcom.sun.management.jmxremote", \
+ "-Dcom.sun.management.jmxremote.port=9010", \
+ "-Dcom.sun.management.jmxremote.rmi.port=9010", \
+ "-Dcom.sun.management.jmxremote.local.only=false", \
+ "-Dcom.sun.management.jmxremote.ssl=false", \
+ "-Dcom.sun.management.jmxremote.authenticate=false", \
+ "-jar", \
+ "/streampipes-node-controller.jar"]
diff --git a/streampipes-node-controller/src/main/java/org/apache/streampipes/node/controller/config/EnvConfigParam.java b/streampipes-node-controller/src/main/java/org/apache/streampipes/node/controller/config/EnvConfigParam.java
index c8ecd58..45c2e62 100644
--- a/streampipes-node-controller/src/main/java/org/apache/streampipes/node/controller/config/EnvConfigParam.java
+++ b/streampipes-node-controller/src/main/java/org/apache/streampipes/node/controller/config/EnvConfigParam.java
@@ -53,6 +53,7 @@ public enum EnvConfigParam {
SUPPORTED_PIPELINE_ELEMENTS("SP_SUPPORTED_PIPELINE_ELEMENTS", ""),
AUTO_OFFLOADING("SP_AUTO_OFFLOADING_ACTIVATED", "false"),
AUTO_OFFLOADING_STRATEGY("SP_AUTO_OFFLOADING_STRATEGY", "default"),
+ AUTO_OFFLOADING_SELECTION_STRATEGY("SP_AUTO_OFFLOADING_SELECTION_STRATEGY", "random"),
AUTO_OFFLOADING_THRESHOLD_IN_PERCENT("SP_AUTO_OFFLOADING_THRESHOLD_IN_PERCENT", "90"),
AUTO_OFFLOADING_MAX_NUM_VIOLATIONS("SP_AUTO_OFFLOADING_MAX_NUM_VIOLATIONS", "4"),
NODE_STORAGE_PATH("SP_NODE_STORAGE_PATH", "/var/lib/streampipes"),
diff --git a/streampipes-node-controller/src/main/java/org/apache/streampipes/node/controller/config/NodeConfiguration.java b/streampipes-node-controller/src/main/java/org/apache/streampipes/node/controller/config/NodeConfiguration.java
index 59d402f..6c487e8 100644
--- a/streampipes-node-controller/src/main/java/org/apache/streampipes/node/controller/config/NodeConfiguration.java
+++ b/streampipes-node-controller/src/main/java/org/apache/streampipes/node/controller/config/NodeConfiguration.java
@@ -21,6 +21,7 @@ import org.apache.commons.validator.routines.UrlValidator;
import org.apache.streampipes.model.node.resources.fielddevice.FieldDeviceAccessResource;
import org.apache.streampipes.node.controller.config.utils.ConfigUtils;
import org.apache.streampipes.node.controller.management.offloading.strategies.OffloadingStrategyType;
+import org.apache.streampipes.node.controller.management.offloading.strategies.SelectionStrategyType;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -62,6 +63,7 @@ public final class NodeConfiguration {
private static String consulHost;
private static boolean autoOffloadingActivated;
private static OffloadingStrategyType autoOffloadingStrategy;
+ private static SelectionStrategyType autoOffloadingSelectionStrategy;
private static float autoOffloadingThresholdInPercent;
private static int autoOffloadingMaxNumViolations;
private static String nodeStoragePath;
@@ -313,6 +315,14 @@ public final class NodeConfiguration {
NodeConfiguration.autoOffloadingStrategy = autoOffloadingStrategy;
}
+ public static SelectionStrategyType getAutoOffloadingSelectionStrategy() {
+ return autoOffloadingSelectionStrategy;
+ }
+
+ public static void setAutoOffloadingSelectionStrategy(SelectionStrategyType autoOffloadingSelectionStrategy) {
+ NodeConfiguration.autoOffloadingSelectionStrategy = autoOffloadingSelectionStrategy;
+ }
+
public static float getAutoOffloadingThresholdInPercent() {
return autoOffloadingThresholdInPercent;
}
@@ -564,6 +574,10 @@ public final class NodeConfiguration {
configMap.put(envKey, value);
setAutoOffloadingStrategy(OffloadingStrategyType.fromString(value));
break;
+ case AUTO_OFFLOADING_SELECTION_STRATEGY:
+ configMap.put(envKey, value);
+ setAutoOffloadingSelectionStrategy(SelectionStrategyType.fromString(value));
+ break;
case AUTO_OFFLOADING_THRESHOLD_IN_PERCENT:
configMap.put(envKey, value);
setAutoOffloadingThresholdInPercent(Float.parseFloat(value));
diff --git a/streampipes-node-controller/src/main/java/org/apache/streampipes/node/controller/management/offloading/OffloadingPolicyManager.java b/streampipes-node-controller/src/main/java/org/apache/streampipes/node/controller/management/offloading/OffloadingPolicyManager.java
index 2fc1271..e834687 100644
--- a/streampipes-node-controller/src/main/java/org/apache/streampipes/node/controller/management/offloading/OffloadingPolicyManager.java
+++ b/streampipes-node-controller/src/main/java/org/apache/streampipes/node/controller/management/offloading/OffloadingPolicyManager.java
@@ -24,7 +24,6 @@ import org.apache.streampipes.model.Response;
import org.apache.streampipes.model.base.InvocableStreamPipesEntity;
import org.apache.streampipes.model.node.NodeInfoDescription;
import org.apache.streampipes.model.node.monitor.ResourceMetrics;
-import org.apache.streampipes.model.staticproperty.FreeTextStaticProperty;
import org.apache.streampipes.node.controller.config.NodeConfiguration;
import org.apache.streampipes.node.controller.management.offloading.strategies.OffloadingStrategy;
import org.apache.streampipes.node.controller.management.pe.PipelineElementManager;
@@ -44,7 +43,7 @@ public class OffloadingPolicyManager {
private static final String NODE_MANAGEMENT_ONLINE_ENDPOINT = BACKEND_BASE_ROUTE + "/nodes/online";
private final List<OffloadingStrategy<?>> offloadingStrategies = new ArrayList<>();
- private final List<InvocableStreamPipesEntity> unsuccessfullyTriedEntities = new ArrayList<>();
+ private final List<InvocableStreamPipesEntity> blacklistInvocables = new ArrayList<>();
private static OffloadingPolicyManager instance;
private static final Logger LOG = LoggerFactory.getLogger(OffloadingPolicyManager.class.getCanonicalName());
//private static EvaluationLogger logger = EvaluationLogger.getInstance();
@@ -74,26 +73,32 @@ public class OffloadingPolicyManager {
triggerOffloading(violatedPolicies.get(0));
}
//Blacklist of entities is cleared when no policies were violated.
- else unsuccessfullyTriedEntities.clear();
+ else blacklistInvocables.clear();
}
private void triggerOffloading(OffloadingStrategy strategy){
- InvocableStreamPipesEntity offloadEntity = strategy.getSelectionStrategy().select(this.unsuccessfullyTriedEntities);
+ InvocableStreamPipesEntity offloadEntity = strategy.getSelectionStrategy().select(this.blacklistInvocables);
EvaluationLogger.getInstance().logMQTT("Offloading", "entity to offload selected");
if(offloadEntity != null){
Response resp = PipelineElementManager.getInstance().offload(offloadEntity);
String appId = offloadEntity.getAppId();
+ String runningInstanceId = offloadEntity.getDeploymentRunningInstanceId();
String pipelineName = offloadEntity.getCorrespondingPipeline();
+ int prioScore = offloadEntity.getPriorityScore();
- EvaluationLogger.getInstance().logMQTT("Offloading", "offloading done", strategy.getOffloadingPolicy().getClass().getSimpleName(), appId);
+ EvaluationLogger.getInstance().logMQTT(
+ "Offloading",
+ "offloading done",
+ strategy.getOffloadingPolicy().getClass().getSimpleName(),
+ appId + "_prio" + prioScore + "_" + pipelineName);
if(resp.isSuccess()){
LOG.info("Successfully offloaded: {} of pipeline: {}", appId, pipelineName);
strategy.getOffloadingPolicy().reset();
} else{
LOG.warn("Failed to offload: {} of pipeline: {}", appId, pipelineName);
- unsuccessfullyTriedEntities.add(offloadEntity);
+ blacklistInvocables.add(offloadEntity);
}
} else LOG.info("No pipeline element found to offload");
}
diff --git a/streampipes-node-controller/src/main/java/org/apache/streampipes/node/controller/management/offloading/strategies/OffloadingStrategyFactory.java b/streampipes-node-controller/src/main/java/org/apache/streampipes/node/controller/management/offloading/strategies/OffloadingStrategyFactory.java
index 8e17b6d..b7ce04b 100644
--- a/streampipes-node-controller/src/main/java/org/apache/streampipes/node/controller/management/offloading/strategies/OffloadingStrategyFactory.java
+++ b/streampipes-node-controller/src/main/java/org/apache/streampipes/node/controller/management/offloading/strategies/OffloadingStrategyFactory.java
@@ -25,7 +25,9 @@ import org.apache.streampipes.node.controller.management.offloading.strategies.p
import org.apache.streampipes.node.controller.management.offloading.strategies.property.CPULoadResourceProperty;
import org.apache.streampipes.node.controller.management.offloading.strategies.property.FreeDiskSpaceResourceProperty;
import org.apache.streampipes.node.controller.management.offloading.strategies.property.FreeMemoryResourceProperty;
+import org.apache.streampipes.node.controller.management.offloading.strategies.selection.PrioritySelectionStrategy;
import org.apache.streampipes.node.controller.management.offloading.strategies.selection.RandomSelectionStrategy;
+import org.apache.streampipes.node.controller.management.offloading.strategies.selection.SelectionStrategy;
import java.util.ArrayList;
import java.util.Collections;
@@ -33,7 +35,7 @@ import java.util.List;
public class OffloadingStrategyFactory {
- private static final int HISTORY_QUEUE_SIZE_FACTOR = 5;
+ private static final int HISTORY_QUEUE_SIZE_FACTOR = 1;
public List<OffloadingStrategy<?>> select(){
@@ -51,6 +53,17 @@ public class OffloadingStrategyFactory {
}
}
+ private SelectionStrategy getAutoOffloadingSelectionStrategy() {
+ switch (NodeConfiguration.getAutoOffloadingSelectionStrategy()) {
+ case RANDOM:
+ return new RandomSelectionStrategy();
+ case PRIO:
+ return new PrioritySelectionStrategy();
+ default:
+ return new RandomSelectionStrategy();
+ }
+ }
+
private OffloadingStrategy<Float> getDebugOffloadingPolicy() {
return new OffloadingStrategy<Float>(
new ThresholdViolationOffloadingPolicy<>(
@@ -59,7 +72,7 @@ public class OffloadingStrategyFactory {
0.5f,
1),
new CPULoadResourceProperty(),
- new RandomSelectionStrategy());
+ getAutoOffloadingSelectionStrategy());
}
private List<OffloadingStrategy<?>> getDefaultStrategy(){
@@ -78,7 +91,7 @@ public class OffloadingStrategyFactory {
NodeConfiguration.getAutoOffloadingThresholdInPercent(),
NodeConfiguration.getAutoOffloadingMaxNumViolations()),
new CPULoadResourceProperty(),
- new RandomSelectionStrategy());
+ getAutoOffloadingSelectionStrategy());
}
private OffloadingStrategy<Long> getDefaultMemoryOffloadingPolicy(){
@@ -92,7 +105,7 @@ public class OffloadingStrategyFactory {
threshold,
NodeConfiguration.getAutoOffloadingMaxNumViolations()),
new FreeMemoryResourceProperty(),
- new RandomSelectionStrategy());
+ getAutoOffloadingSelectionStrategy());
}
private OffloadingStrategy<Long> getDefaultDiskSpaceOffloadingPolicy(){
@@ -106,6 +119,6 @@ public class OffloadingStrategyFactory {
threshold,
NodeConfiguration.getAutoOffloadingMaxNumViolations()),
new FreeDiskSpaceResourceProperty(),
- new RandomSelectionStrategy());
+ getAutoOffloadingSelectionStrategy());
}
}
diff --git a/streampipes-node-controller/src/main/java/org/apache/streampipes/node/controller/management/offloading/strategies/selection/SelectionStrategy.java b/streampipes-node-controller/src/main/java/org/apache/streampipes/node/controller/management/offloading/strategies/SelectionStrategyType.java
similarity index 65%
copy from streampipes-node-controller/src/main/java/org/apache/streampipes/node/controller/management/offloading/strategies/selection/SelectionStrategy.java
copy to streampipes-node-controller/src/main/java/org/apache/streampipes/node/controller/management/offloading/strategies/SelectionStrategyType.java
index 54ed1f8..5e01b92 100644
--- a/streampipes-node-controller/src/main/java/org/apache/streampipes/node/controller/management/offloading/strategies/selection/SelectionStrategy.java
+++ b/streampipes-node-controller/src/main/java/org/apache/streampipes/node/controller/management/offloading/strategies/SelectionStrategyType.java
@@ -15,13 +15,24 @@
* limitations under the License.
*
*/
+package org.apache.streampipes.node.controller.management.offloading.strategies;
-package org.apache.streampipes.node.controller.management.offloading.strategies.selection;
+public enum SelectionStrategyType {
+ RANDOM("random"),
+ PRIO("prio");
-import org.apache.streampipes.model.base.InvocableStreamPipesEntity;
+ String name;
-import java.util.List;
+ SelectionStrategyType(String name) {
+ this.name = name;
+ }
-public interface SelectionStrategy {
- InvocableStreamPipesEntity select(List<InvocableStreamPipesEntity> blacklistedEntities);
+ public static SelectionStrategyType fromString(String name) {
+ for (SelectionStrategyType o : SelectionStrategyType.values()) {
+ if (o.name.equalsIgnoreCase(name.toLowerCase())) {
+ return o;
+ }
+ }
+ return null;
+ }
}
diff --git a/streampipes-node-controller/src/main/java/org/apache/streampipes/node/controller/management/offloading/strategies/policies/ThresholdViolationOffloadingPolicy.java b/streampipes-node-controller/src/main/java/org/apache/streampipes/node/controller/management/offloading/strategies/policies/ThresholdViolationOffloadingPolicy.java
index 95eb8ff..5ff48dc 100644
--- a/streampipes-node-controller/src/main/java/org/apache/streampipes/node/controller/management/offloading/strategies/policies/ThresholdViolationOffloadingPolicy.java
+++ b/streampipes-node-controller/src/main/java/org/apache/streampipes/node/controller/management/offloading/strategies/policies/ThresholdViolationOffloadingPolicy.java
@@ -29,12 +29,16 @@ public class ThresholdViolationOffloadingPolicy<T extends Comparable<T>> impleme
LoggerFactory.getLogger(ThresholdViolationOffloadingPolicy.class.getCanonicalName());
private Queue<T> history;
+ private final int length;
private final T threshold;
private final Comparator comparator;
private final int numberOfThresholdViolations;
+ private int numPreviousViolations = 0;
+
public ThresholdViolationOffloadingPolicy(int length, Comparator comparator, T threshold,
int numberOfThresholdViolations){
+ this.length = length;
this.history = new ArrayBlockingQueue<>(length);
this.comparator = comparator;
this.threshold = threshold;
@@ -54,16 +58,16 @@ public class ThresholdViolationOffloadingPolicy<T extends Comparable<T>> impleme
if(!this.history.offer(value)) {
this.history.poll();
this.history.offer(value);
- //Only for logging; can be removed later
- if(value.compareTo(this.threshold) > 0){
- int numViolations = 0;
- for(T val : this.history){
- if(val.compareTo(this.threshold) > 0){
- numViolations++;
- }
- }
- EvaluationLogger.getInstance().logMQTT("Offloading", "policy violation #" + numViolations);
- }
+ //TODO: Only for logging; can be removed later
+// if(value.compareTo(this.threshold) > 0){
+// int numViolations = 0;
+// for(T val : this.history){
+// if(val.compareTo(this.threshold) > 0){
+// numViolations++;
+// }
+// }
+// EvaluationLogger.getInstance().logMQTT("Offloading", "policy violation #" + numViolations);
+// }
}
}
@@ -86,6 +90,12 @@ public class ThresholdViolationOffloadingPolicy<T extends Comparable<T>> impleme
}
break;
}
+
+ if (numViolations > numPreviousViolations) {
+ EvaluationLogger.getInstance().logMQTT("Offloading", "policy violation #" + numViolations);
+ numPreviousViolations = numViolations;
+ }
+
if(numViolations >= this.numberOfThresholdViolations){
LOG.info("Threshold violation detected");
return true;
@@ -95,7 +105,10 @@ public class ThresholdViolationOffloadingPolicy<T extends Comparable<T>> impleme
@Override
public void reset() {
+ LOG.info("Reset history queue to length: " + this.history.size());
this.history = new ArrayBlockingQueue<>(this.history.size());
+ LOG.info("Reset number previous violation counter from: " + numPreviousViolations + "-> to: " + 0);
+ this.numPreviousViolations = 0;
}
//TODO: Remove -- Only for debugging purposes
diff --git a/streampipes-node-controller/src/main/java/org/apache/streampipes/node/controller/management/offloading/strategies/selection/PrioritySelectionStrategy.java b/streampipes-node-controller/src/main/java/org/apache/streampipes/node/controller/management/offloading/strategies/selection/PrioritySelectionStrategy.java
index e52ec72..ef6378a 100644
--- a/streampipes-node-controller/src/main/java/org/apache/streampipes/node/controller/management/offloading/strategies/selection/PrioritySelectionStrategy.java
+++ b/streampipes-node-controller/src/main/java/org/apache/streampipes/node/controller/management/offloading/strategies/selection/PrioritySelectionStrategy.java
@@ -19,12 +19,9 @@
package org.apache.streampipes.node.controller.management.offloading.strategies.selection;
import org.apache.streampipes.model.base.InvocableStreamPipesEntity;
-import org.apache.streampipes.model.node.NodeInfoDescription;
-import org.apache.streampipes.node.controller.config.NodeConfiguration;
-import org.apache.streampipes.node.controller.management.offloading.OffloadingPolicyManager;
+import org.apache.streampipes.model.graph.DataProcessorInvocation;
import org.apache.streampipes.node.controller.management.pe.storage.RunningInvocableInstances;
-import java.util.ArrayList;
import java.util.Comparator;
import java.util.List;
import java.util.stream.Collectors;
@@ -32,53 +29,29 @@ import java.util.stream.Collectors;
public class PrioritySelectionStrategy implements SelectionStrategy{
@Override
- public InvocableStreamPipesEntity select(List<InvocableStreamPipesEntity> blacklistedEntities) {
- List<InvocableStreamPipesEntity> runningPipelineElements = RunningInvocableInstances.INSTANCE.getAll();
-
- //List all other nodes that are online
- // TODO: this involves request to central backend (deactivated; assess if provides meaningful benefit)
- /*List<NodeInfoDescription> onlineNodes = getNodeInfos().stream().filter(desc ->
- !desc.getNodeControllerId().equals(NodeConfiguration.getNodeControllerId()))
- .collect(Collectors.toList());*/
-
- List<InvocableStreamPipesEntity> candidatesForOffloading = runningPipelineElements.stream()
- .filter(entity -> isNotBlacklisted(entity, blacklistedEntities))
- .filter(InvocableStreamPipesEntity::isPreemption)
- //.filter(entity -> verifyIfSupportedOnOtherNodes(entity, onlineNodes))
- .collect(Collectors.toList());
-
- if (candidatesForOffloading.isEmpty()) {
+ public InvocableStreamPipesEntity select(List<InvocableStreamPipesEntity> blacklistInvocables) {
+ // Sinks cannot be migrated
+ List<InvocableStreamPipesEntity> candidateInvocables = RunningInvocableInstances.INSTANCE.getAll().stream()
+ .filter(e -> e instanceof DataProcessorInvocation)
+ .filter(entity -> isNotBlacklisted(entity, blacklistInvocables))
+ .filter(InvocableStreamPipesEntity::isPreemption)
+ .collect(Collectors.toList());
+ if(candidateInvocables.size() == 0)
return null;
- } else {
- return selectPipelineElementWithLowestPriorityScore(candidatesForOffloading);
- }
- }
-
- private List<NodeInfoDescription> getNodeInfos(){
- return OffloadingPolicyManager.getInstance().getOnlineNodes();
+ return selectWithLowestPrioScore(candidateInvocables);
}
- private InvocableStreamPipesEntity selectPipelineElementWithLowestPriorityScore(List<InvocableStreamPipesEntity> entities){
- // sort pipeline element priorityScores ascending
- List<InvocableStreamPipesEntity> sortedPipelineElements = entities.stream()
+ private InvocableStreamPipesEntity selectWithLowestPrioScore(List<InvocableStreamPipesEntity> candidateInvocables){
+ // sort pipeline element priorityScores ascending and return element with lowest score
+ return candidateInvocables.stream()
.sorted(Comparator.comparingInt(InvocableStreamPipesEntity::getPriorityScore))
- .collect(Collectors.toList());
-
- // return first element with lowest score
- return sortedPipelineElements.get(0);
- }
-
- private boolean verifyIfSupportedOnOtherNodes(InvocableStreamPipesEntity entity, List<NodeInfoDescription> nodeInfos){
- List<NodeInfoDescription> candidateNodes = new ArrayList<>();
- for(NodeInfoDescription nodeInfo : nodeInfos){
- if (nodeInfo.getSupportedElements().contains(entity.getAppId()))
- candidateNodes.add(nodeInfo);
- }
- return !candidateNodes.isEmpty();
+ .collect(Collectors.toList()).get(0);
}
- private boolean isNotBlacklisted(InvocableStreamPipesEntity entity, List<InvocableStreamPipesEntity> blacklist){
- return blacklist.stream().noneMatch(blacklistedEntity ->
- blacklistedEntity.getDeploymentRunningInstanceId().equals(entity.getDeploymentRunningInstanceId()));
+ private boolean isNotBlacklisted(InvocableStreamPipesEntity entity,
+ List<InvocableStreamPipesEntity> blacklistInvocables){
+ return blacklistInvocables.stream()
+ .noneMatch(blacklistEntity -> blacklistEntity
+ .getDeploymentRunningInstanceId().equals(entity.getDeploymentRunningInstanceId()));
}
}
diff --git a/streampipes-node-controller/src/main/java/org/apache/streampipes/node/controller/management/offloading/strategies/selection/RandomSelectionStrategy.java b/streampipes-node-controller/src/main/java/org/apache/streampipes/node/controller/management/offloading/strategies/selection/RandomSelectionStrategy.java
index f89c9b5..dabe841 100644
--- a/streampipes-node-controller/src/main/java/org/apache/streampipes/node/controller/management/offloading/strategies/selection/RandomSelectionStrategy.java
+++ b/streampipes-node-controller/src/main/java/org/apache/streampipes/node/controller/management/offloading/strategies/selection/RandomSelectionStrategy.java
@@ -26,13 +26,14 @@ import java.util.List;
import java.util.Random;
import java.util.stream.Collectors;
-public class RandomSelectionStrategy implements SelectionStrategy{
+public class RandomSelectionStrategy implements SelectionStrategy {
@Override
- public InvocableStreamPipesEntity select(List<InvocableStreamPipesEntity> blacklistedEntities) {
+ public InvocableStreamPipesEntity select(List<InvocableStreamPipesEntity> blacklistInvocables) {
//Sinks cannot be migrated atm
- List<InvocableStreamPipesEntity> instances = RunningInvocableInstances.INSTANCE.getAll().stream().
- filter(e -> e instanceof DataProcessorInvocation).collect(Collectors.toList());
+ List<InvocableStreamPipesEntity> instances = RunningInvocableInstances.INSTANCE.getAll().stream()
+ .filter(e -> e instanceof DataProcessorInvocation)
+ .collect(Collectors.toList());
if(instances.size() == 0)
return null;
return instances.get(new Random().nextInt(instances.size()));
diff --git a/streampipes-node-controller/src/main/java/org/apache/streampipes/node/controller/management/offloading/strategies/selection/SelectionStrategy.java b/streampipes-node-controller/src/main/java/org/apache/streampipes/node/controller/management/offloading/strategies/selection/SelectionStrategy.java
index 54ed1f8..e889c0d 100644
--- a/streampipes-node-controller/src/main/java/org/apache/streampipes/node/controller/management/offloading/strategies/selection/SelectionStrategy.java
+++ b/streampipes-node-controller/src/main/java/org/apache/streampipes/node/controller/management/offloading/strategies/selection/SelectionStrategy.java
@@ -23,5 +23,5 @@ import org.apache.streampipes.model.base.InvocableStreamPipesEntity;
import java.util.List;
public interface SelectionStrategy {
- InvocableStreamPipesEntity select(List<InvocableStreamPipesEntity> blacklistedEntities);
+ InvocableStreamPipesEntity select(List<InvocableStreamPipesEntity> blacklistInvocables);
}
diff --git a/streampipes-node-controller/src/main/java/org/apache/streampipes/node/controller/management/pe/handler/PipelineElementReconfigurationHandler.java b/streampipes-node-controller/src/main/java/org/apache/streampipes/node/controller/management/pe/handler/PipelineElementReconfigurationHandler.java
index 979cdec..be5ea86 100644
--- a/streampipes-node-controller/src/main/java/org/apache/streampipes/node/controller/management/pe/handler/PipelineElementReconfigurationHandler.java
+++ b/streampipes-node-controller/src/main/java/org/apache/streampipes/node/controller/management/pe/handler/PipelineElementReconfigurationHandler.java
@@ -32,6 +32,8 @@ import org.apache.streampipes.model.staticproperty.StaticProperty;
import org.apache.streampipes.node.controller.management.IHandler;
import org.apache.streampipes.node.controller.management.pe.storage.RunningInvocableInstances;
import org.apache.streampipes.node.controller.utils.HttpUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
@@ -41,6 +43,8 @@ import java.util.Map;
public class PipelineElementReconfigurationHandler implements IHandler<Response> {
+ private static final Logger LOG = LoggerFactory.getLogger(PipelineElementReconfigurationHandler.class.getCanonicalName());
+
private static final String DOT = ".";
private static final String RECONFIGURATION_TOPIC = "org.apache.streampipes.control.event.reconfigure";
@@ -63,6 +67,8 @@ public class PipelineElementReconfigurationHandler implements IHandler<Response>
EventProducer pub = getReconfigurationEventProducer();
byte [] reconfigurationEvent = reconfigurationToByteArray();
+
+ LOG.info("Publish reconfiguration event to pipeline element {}", graph.getDeploymentRunningInstanceId());
pub.publish(reconfigurationEvent);
pub.disconnect();
diff --git a/streampipes-node-controller/src/main/java/org/apache/streampipes/node/controller/management/relay/DataStreamRelayManager.java b/streampipes-node-controller/src/main/java/org/apache/streampipes/node/controller/management/relay/DataStreamRelayManager.java
index ef8f07a..d2ad5a1 100644
--- a/streampipes-node-controller/src/main/java/org/apache/streampipes/node/controller/management/relay/DataStreamRelayManager.java
+++ b/streampipes-node-controller/src/main/java/org/apache/streampipes/node/controller/management/relay/DataStreamRelayManager.java
@@ -18,6 +18,7 @@
package org.apache.streampipes.node.controller.management.relay;
import org.apache.streampipes.commons.exceptions.SpRuntimeException;
+import org.apache.streampipes.logging.evaluation.EvaluationLogger;
import org.apache.streampipes.model.Response;
import org.apache.streampipes.model.base.InvocableStreamPipesEntity;
import org.apache.streampipes.model.eventrelay.SpDataStreamRelay;
@@ -36,6 +37,9 @@ public class DataStreamRelayManager {
private static DataStreamRelayManager instance = null;
+ // TODO: remove after tests
+ private static final EvaluationLogger logger = EvaluationLogger.getInstance();
+
private DataStreamRelayManager() {}
public static DataStreamRelayManager getInstance() {
@@ -59,6 +63,7 @@ public class DataStreamRelayManager {
Map<String, EventRelay> eventRelayMap = new HashMap<>();
+ logger.logMQTT("Migration", "node controller start relay", "");
// start data stream relay
// 1:1 mapping -> remote forward
// 1:n mapping -> remote fan-out
@@ -76,6 +81,7 @@ public class DataStreamRelayManager {
}
public Response stop(String id) {
+ logger.logMQTT("Migration", "node controller stop relay", "");
Map<String, EventRelay> relays = RunningRelayInstances.INSTANCE.get(id);
if (relays != null) {
relays.values().forEach(EventRelay::stop);
diff --git a/streampipes-node-controller/src/main/java/org/apache/streampipes/node/controller/management/statscollector/DockerStatsCollector.java b/streampipes-node-controller/src/main/java/org/apache/streampipes/node/controller/management/statscollector/DockerStatsCollector.java
index e812b02..d7cc49d 100644
--- a/streampipes-node-controller/src/main/java/org/apache/streampipes/node/controller/management/statscollector/DockerStatsCollector.java
+++ b/streampipes-node-controller/src/main/java/org/apache/streampipes/node/controller/management/statscollector/DockerStatsCollector.java
@@ -54,7 +54,7 @@ public class DockerStatsCollector {
public void run() {
LOG.debug("Create Docker stats scheduler");
ScheduledExecutorService scheduledExecutorService = Executors.newSingleThreadScheduledExecutor();
- scheduledExecutorService.scheduleAtFixedRate(collect, 0, 1, TimeUnit.SECONDS);
+ scheduledExecutorService.scheduleAtFixedRate(collect, 0, DOCKER_STATS_COLLECT_FREQ_SECS, TimeUnit.SECONDS);
Object[] header = new Object[]{
"timestamp",
diff --git a/streampipes-performance-tests/src/main/java/org/apache/streampipes/performance/EdgeExtensionsGenericPerformanceTest.java b/streampipes-performance-tests/src/main/java/org/apache/streampipes/performance/EdgeExtensionsGenericPerformanceTest.java
index d76d7b6..27e8358 100644
--- a/streampipes-performance-tests/src/main/java/org/apache/streampipes/performance/EdgeExtensionsGenericPerformanceTest.java
+++ b/streampipes-performance-tests/src/main/java/org/apache/streampipes/performance/EdgeExtensionsGenericPerformanceTest.java
@@ -18,11 +18,14 @@
package org.apache.streampipes.performance;
import org.apache.streampipes.performance.performancetest.Test;
-import org.apache.streampipes.logging.evaluation.EvaluationLogger;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
public class EdgeExtensionsGenericPerformanceTest {
+ private static final Logger LOG = LoggerFactory.getLogger(EdgeExtensionsGenericPerformanceTest.class.getCanonicalName());
+
public static void main(String ... args){
int numberOfRuns = Integer.parseInt(System.getenv("TEST_RUNS"));
@@ -31,6 +34,8 @@ public class EdgeExtensionsGenericPerformanceTest {
for(int i = 1; i <= numberOfRuns; i++){
if (i==numberOfRuns)
test.setStopPipeline(true);
+
+ LOG.info("Executing run: {}/{}", i, numberOfRuns);
test.execute(i);
}
}
diff --git a/streampipes-performance-tests/src/main/java/org/apache/streampipes/performance/TestFactory.java b/streampipes-performance-tests/src/main/java/org/apache/streampipes/performance/TestFactory.java
index 062c067..c9dae0b 100644
--- a/streampipes-performance-tests/src/main/java/org/apache/streampipes/performance/TestFactory.java
+++ b/streampipes-performance-tests/src/main/java/org/apache/streampipes/performance/TestFactory.java
@@ -18,7 +18,9 @@
package org.apache.streampipes.performance;
import org.apache.streampipes.logging.evaluation.EvaluationLogger;
+import org.apache.streampipes.performance.performancetest.CountBasedOffloadingTest;
import org.apache.streampipes.performance.performancetest.GenericTest;
+import org.apache.streampipes.performance.performancetest.PrioOffloadingTest;
import org.apache.streampipes.performance.performancetest.Test;
public class TestFactory {
@@ -45,6 +47,16 @@ public class TestFactory {
Object[] header_offloading = {"timestampInMillis", "deviceId", "event", "policy", "selectedProcessor"};
logger.logHeader("Offloading", header_offloading);
return getOffloadingTest();
+ case "OffloadingPrio":
+ Object[] header_offloading_multi = {"timestampInMillis", "deviceId", "event", "policy",
+ "selectedProcessor"};
+ logger.logHeader("Offloading", header_offloading_multi);
+ return getOffloadingPrioSelectionTest();
+ case "OffloadingCountBased":
+ Object[] header_offloading_count_based = {"timestampInMillis", "deviceId", "event", "policy",
+ "selectedProcessor"};
+ logger.logHeader("Offloading", header_offloading_count_based);
+ return getOffloadingCountBasedTest();
default:
throw new RuntimeException("No test configuration found.");
}
@@ -73,7 +85,16 @@ public class TestFactory {
public static Test getOffloadingTest(){
return new GenericTest(getPipelineName(), false, false,
- true, 20000, 1500000);
+ true, 300000, 5400000);
+ }
+
+ public static Test getOffloadingPrioSelectionTest() {
+ return new PrioOffloadingTest(getLowPrioPipelineName(), getHighPrioPipelineName(),
+ false, 300000);
+ }
+
+ private static Test getOffloadingCountBasedTest() {
+ return new CountBasedOffloadingTest(getPipelineName(), false, 300000);
}
//Helpers
@@ -83,4 +104,16 @@ public class TestFactory {
return pipelineName;
}
+ private static String getLowPrioPipelineName() {
+ String pipelineName = System.getenv("TEST_LOW_PRIO_PIPELINE_NAME");
+ if (pipelineName==null) throw new RuntimeException("No Pipeline Name provided.");
+ return pipelineName;
+ }
+
+ private static String getHighPrioPipelineName() {
+ String pipelineName = System.getenv("TEST_HIGH_PRIO_PIPELINE_NAME");
+ if (pipelineName==null) throw new RuntimeException("No Pipeline Name provided.");
+ return pipelineName;
+ }
+
}
diff --git a/streampipes-performance-tests/src/main/java/org/apache/streampipes/performance/performancetest/CountBasedOffloadingTest.java b/streampipes-performance-tests/src/main/java/org/apache/streampipes/performance/performancetest/CountBasedOffloadingTest.java
new file mode 100644
index 0000000..44f4dd7
--- /dev/null
+++ b/streampipes-performance-tests/src/main/java/org/apache/streampipes/performance/performancetest/CountBasedOffloadingTest.java
@@ -0,0 +1,327 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.apache.streampipes.performance.performancetest;
+
+import org.apache.streampipes.client.StreamPipesClient;
+import org.apache.streampipes.client.StreamPipesCredentials;
+import org.apache.streampipes.messaging.mqtt.MqttConsumer;
+import org.apache.streampipes.model.graph.DataProcessorInvocation;
+import org.apache.streampipes.model.grounding.MqttTransportProtocol;
+import org.apache.streampipes.model.grounding.SimpleTopicDefinition;
+import org.apache.streampipes.model.grounding.TopicDefinition;
+import org.apache.streampipes.model.pipeline.Pipeline;
+import org.apache.streampipes.model.pipeline.PipelineOperationStatus;
+import org.apache.streampipes.model.staticproperty.FreeTextStaticProperty;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.nio.charset.StandardCharsets;
+import java.time.Duration;
+import java.util.*;
+
+public class CountBasedOffloadingTest implements Test {
+
+ private static final Logger LOG = LoggerFactory.getLogger(CountBasedOffloadingTest.class.getCanonicalName());
+
+ private boolean stopPipeline;
+ private final long timeToSleepBetweenSteps;
+ private final StreamPipesClient client;
+ private final String pipelineName;
+ private Pipeline pipeline;
+ private final String[] cpuSteps;
+ private final int maxOffloadingActions;
+
+ public CountBasedOffloadingTest(String pipelineName,
+ boolean stopPipeline,
+ long timeToSleepBetweenSteps) {
+
+ this.pipelineName = pipelineName;
+ this.stopPipeline = stopPipeline;
+ this.timeToSleepBetweenSteps = timeToSleepBetweenSteps;
+
+ // Create an instance of the StreamPipes client
+ client = createStreamPipesClient();
+ pipeline = findPipelineByName(pipelineName);
+
+ this.cpuSteps = loadFromEnv();
+
+ this.maxOffloadingActions = System.getenv("MAX_OFFLOADING_ACTIONS") != null ?
+ Integer.parseInt(System.getenv("MAX_OFFLOADING_ACTIONS")) : 50;
+ }
+
+ @Override
+ public void setStopPipeline(boolean stopPipeline) {
+ this.stopPipeline = stopPipeline;
+ }
+
+ @Override
+ public void execute(int nrRuns) {
+
+ //Start Pipeline
+ if (!pipeline.isRunning()) {
+ startPipeline();
+ // provide some time prior to reconfigure values
+ try {
+ Thread.sleep(5000);
+ } catch (InterruptedException e) {
+ e.printStackTrace();
+ }
+ }
+
+ Thread rampUpThread = beginRampUpAndOffloadingInterval();
+
+ // start offloading observer thread
+ Thread offloadingObserverThread = offloadingObserverThread(rampUpThread);
+ rampUpThread.start();
+ offloadingObserverThread.start();
+
+ try {
+ LOG.info("Wait until max. number offloadings is completed: {}", maxOffloadingActions);
+ rampUpThread.join();
+ offloadingObserverThread.join();
+ } catch (InterruptedException e) {
+ e.printStackTrace();
+ }
+
+ // reconfigure to base load and run for 5min
+ adaptPipelineCpuLoad();
+
+ Duration finalSegment = Duration.ofMinutes(5);
+ try {
+ LOG.info("Sleep for final segment of offloading test: {} min", finalSegment.toMinutes());
+ Thread.sleep(finalSegment.toMillis());
+ } catch (InterruptedException e) {
+ e.printStackTrace();
+ }
+
+ //Stop Pipeline
+ if (stopPipeline && pipeline.isRunning()) {
+ stopPipeline(pipeline);
+ }
+ }
+
+
+ // Helpers
+
+ private void adaptPipelineCpuLoad() {
+ // Adjust pipeline CPU load
+ pipeline = findPipelineByName(pipelineName);
+
+ LOG.info("Prepare and reconfigure pipeline {} with {} on node {}",
+ pipeline.getName(),
+ "CPU Burner",
+ findDeploymentTargetByProcessorName(pipeline, "CPU Burner"));
+
+ prepareAndReconfigurePipeline(pipeline, "load", cpuSteps[0]);
+ }
+
+ private void startPipeline() {
+ PipelineOperationStatus startStatus = client.pipelines().start(pipeline);
+
+ String node = findDeploymentTargetByProcessorName(pipeline, "CPU Burner");
+
+ LOG.info("Start status pipeline {} with {} on node {}",
+ startStatus.getTitle(),
+ "CPU Burner",
+ node);
+ if (startStatus.isSuccess()) {
+ pipeline.setRunning(true);
+ }
+ }
+
+ private StreamPipesClient createStreamPipesClient() {
+ StreamPipesCredentials credentials = StreamPipesCredentials.from(
+ System.getenv("SP_USER"),
+ System.getenv("SP_API_KEY"));
+
+ return StreamPipesClient
+ .create(System.getenv("SP_HOST"),
+ Integer.parseInt(System.getenv("SP_PORT")),
+ credentials, true);
+ }
+
+ private void prepareAndReconfigurePipeline(Pipeline pipeline,
+ String fstInternalName,
+ String fstReconfigurationValue) {
+ preparingPipeline(pipeline, fstInternalName, fstReconfigurationValue);
+ reconfiguringPipeline(pipeline, fstReconfigurationValue);
+ }
+
+ private void preparingPipeline(Pipeline pipeline, String fstInternalName, String fstReconfigurationValue) {
+ LOG.info("Set CPU load for {} pipeline to: {} percent",
+ pipeline.getName(),
+ Float.parseFloat(fstReconfigurationValue) * 100);
+
+ pipeline.getSepas().forEach(processor -> processor.getStaticProperties().stream()
+ .filter(FreeTextStaticProperty.class::isInstance)
+ .map(FreeTextStaticProperty.class::cast)
+ .filter(FreeTextStaticProperty::isReconfigurable)
+ .forEach(fsp -> {
+ if (fsp.getInternalName().equals(fstInternalName)) {
+ fsp.setValue(fstReconfigurationValue);
+ }
+ })
+ );
+ }
+
+ private void reconfiguringPipeline(Pipeline pipeline, String fstReconfigurationValue) {
+ LOG.info("Reconfiguration triggered with value " + fstReconfigurationValue);
+ PipelineOperationStatus statusReconfiguration = client.pipelines().reconfigure(pipeline);
+ if (!statusReconfiguration.isSuccess()) {
+ LOG.info("Pipeline {} successfully reconfigured", pipeline.getName());
+ }
+ }
+
+ private void stopPipeline(Pipeline pipeline) {
+ PipelineOperationStatus stopStatus = client.pipelines().stop(pipeline);
+ if (stopStatus.isSuccess()) {
+ LOG.info("Stop status pipeline: " + stopStatus.getTitle());
+ this.pipeline.setRunning(false);
+ } else {
+ LOG.info("Pipeline {} could not be stopped." + stopStatus.getPipelineName());
+ }
+ }
+
+ private Pipeline findPipelineByName(String pipelineName) {
+ List<Pipeline> pipelines = client.pipelines().all();
+ return pipelines.stream()
+ .filter(p -> p.getName().equals(pipelineName))
+ .findFirst()
+ .orElseThrow(() -> new RuntimeException("Pipeline not found"));
+ }
+
+ private String findDeploymentTargetByProcessorName(Pipeline pipeline, String processorName) {
+ DataProcessorInvocation graph = pipeline.getSepas().stream()
+ .filter(processor -> processor.getName().equals(processorName))
+ .findFirst()
+ .orElseThrow(() -> new RuntimeException("Processor not found"));
+ return graph.getDeploymentTargetNodeId();
+ }
+
+ private Thread offloadingObserverThread(Thread rampUpThread) {
+ LOG.info("Offloading observer thread started");
+ return new Thread() {
+ public synchronized void run() {
+
+ Map<String, Integer> offloadingDeviceMap = new HashMap<>();
+
+ String mqttLoggingUrlString = System.getenv("SP_LOGGING_MQTT_URL");
+ try {
+ MqttTransportProtocol transportProtocol = makeMqttTransportProtocol(
+ new URI(mqttLoggingUrlString),
+ "Offloading");
+
+ MqttConsumer consumer = new MqttConsumer();
+ consumer.connect(transportProtocol, event -> {
+ String[] eventArray = new String(event, StandardCharsets.UTF_8).split(",");
+ boolean offloaded = Arrays.asList(eventArray).contains("offloading done");
+
+ if (offloaded) {
+ LOG.info("Pipeline element offloaded {}", eventArray[4]);
+
+ String originDeviceId = eventArray[1];
+ if (offloadingDeviceMap.get(originDeviceId) == null) {
+ offloadingDeviceMap.put(originDeviceId, 1);
+ LOG.info("Current number of offloadings for {}: {}/{}",
+ originDeviceId, 1, maxOffloadingActions);
+ } else {
+ int previousOffloadingCount = offloadingDeviceMap.get(originDeviceId);
+ offloadingDeviceMap.put(originDeviceId, previousOffloadingCount + 1);
+ LOG.info("Current number of offloadings for {}: {}/{}",
+ originDeviceId, previousOffloadingCount + 1, maxOffloadingActions);
+ }
+
+ boolean reachedMaxOffloadingActions = offloadingDeviceMap.values()
+ .stream()
+ .allMatch(numOffloadingActions -> numOffloadingActions >= maxOffloadingActions);
+
+ if (reachedMaxOffloadingActions) {
+ LOG.info("Reached max offloading actions");
+ rampUpThread.interrupt();
+ try {
+ consumer.close();
+ } catch (Exception e) {
+ e.printStackTrace();
+ }
+ }
+ }
+ });
+ } catch (URISyntaxException e) {
+ e.printStackTrace();
+ }
+ }
+ };
+ }
+
+ private Thread beginRampUpAndOffloadingInterval() {
+ LOG.info("Initial ramp up interval thread started");
+ return new Thread() {
+ public synchronized void run() {
+ int counter = 0;
+ while (!Thread.currentThread().isInterrupted()) {
+ try {
+ if (cpuSteps.length > 1 && counter != cpuSteps.length) {
+ for (String cpuStep : cpuSteps) {
+
+ if (counter > 0) {
+ LOG.info("Update pipeline locations");
+ pipeline = findPipelineByName(pipelineName);
+ }
+
+ String node = findDeploymentTargetByProcessorName(pipeline, "CPU Burner");
+
+ LOG.info("Prepare and reconfigure pipeline {} with {} on node {}",
+ pipeline.getName(),
+ "CPU Burner",
+ node);
+ prepareAndReconfigurePipeline(pipeline, "load", cpuStep);
+ counter++;
+
+ Thread.sleep(timeToSleepBetweenSteps);
+ }
+ }
+ } catch (InterruptedException e) {
+ LOG.info("Interrupted cpu steps thread");
+ Thread.currentThread().interrupt();
+ }
+ }
+ }
+ };
+ }
+
+ private MqttTransportProtocol makeMqttTransportProtocol(URI uri, String topic) {
+ MqttTransportProtocol transportProtocol = new MqttTransportProtocol();
+ TopicDefinition simpleTopicDefinition = new SimpleTopicDefinition(topic);
+ transportProtocol.setBrokerHostname(uri.getHost());
+ transportProtocol.setPort(uri.getPort());
+ transportProtocol.setTopicDefinition(simpleTopicDefinition);
+ return transportProtocol;
+ }
+
+ private String[] loadFromEnv() {
+ String[] cpuLoadSteps = System.getenv("OFFLOADING_CPU_LOAD_STEPS") != null ? System.getenv(
+ "OFFLOADING_CPU_LOAD_STEPS").split(";") : null;
+ if (cpuLoadSteps == null) {
+ throw new RuntimeException("CPU load steps must be provided");
+ }
+ return cpuLoadSteps;
+ }
+
+}
diff --git a/streampipes-performance-tests/src/main/java/org/apache/streampipes/performance/performancetest/GenericTest.java b/streampipes-performance-tests/src/main/java/org/apache/streampipes/performance/performancetest/GenericTest.java
index 25e7a1a..6921114 100644
--- a/streampipes-performance-tests/src/main/java/org/apache/streampipes/performance/performancetest/GenericTest.java
+++ b/streampipes-performance-tests/src/main/java/org/apache/streampipes/performance/performancetest/GenericTest.java
@@ -25,7 +25,6 @@ import org.apache.streampipes.model.graph.DataProcessorInvocation;
import org.apache.streampipes.model.pipeline.Pipeline;
import org.apache.streampipes.model.pipeline.PipelineOperationStatus;
import org.apache.streampipes.model.staticproperty.FreeTextStaticProperty;
-
import java.util.List;
import java.util.Optional;
@@ -41,8 +40,16 @@ public class GenericTest implements Test{
private final EvaluationLogger evalLogger = EvaluationLogger.getInstance();
private float reconfigurableValue = 1;
- public GenericTest(String pipelineName, boolean stopPipeline, boolean shouldBeMigrated, boolean shouldBeReconfigured,
- long timeToSleepBeforeManipulation, long timeToSleepAfterManipulation){
+ private final String pipelineName;
+
+ public GenericTest(String pipelineName,
+ boolean stopPipeline,
+ boolean shouldBeMigrated,
+ boolean shouldBeReconfigured,
+ long timeToSleepBeforeManipulation,
+ long timeToSleepAfterManipulation){
+
+ this.pipelineName = pipelineName;
this.stopPipeline = stopPipeline;
this.shouldBeMigrated = shouldBeMigrated;
this.shouldBeReconfigured = shouldBeReconfigured;
@@ -69,7 +76,13 @@ public class GenericTest implements Test{
public void execute(int nrRuns) {
String testType = System.getenv("TEST_TYPE");
- String offloadingThreshold = System.getenv("OFFLOADING_THRESHOLD");
+
+ String[] cpuLoadTargets = System.getenv("OFFLOADING_CPU_LOAD_STEPS") != null ? System.getenv(
+ "OFFLOADING_CPU_LOAD_STEPS").split(";") : null;
+ String cpuEndLoad = System.getenv("OFFLOADING_CPU_LOAD_END") != null ?
+ System.getenv("OFFLOADING_CPU_LOAD_END") : "0.2";
+
+ // 0.5;0.9
Object[] line = null;
@@ -115,32 +128,56 @@ public class GenericTest implements Test{
}
//Reconfiguration
if (shouldBeReconfigured) {
- if (testType.equals("Reconfiguration"))
- pipeline.getSepas().forEach(p -> p.getStaticProperties().stream()
- .filter(FreeTextStaticProperty.class::isInstance)
- .map(FreeTextStaticProperty.class::cast)
- .filter(FreeTextStaticProperty::isReconfigurable)
- .forEach(sp -> {
- if (sp.getInternalName().equals("i-am-reconfigurable")) {
- sp.setValue(Float.toString(this.reconfigurableValue++));
- }
- }));
- else if (testType.equals("Offloading"))
+ if (testType.equals("Reconfiguration")) {
pipeline.getSepas().forEach(p -> p.getStaticProperties().stream()
.filter(FreeTextStaticProperty.class::isInstance)
.map(FreeTextStaticProperty.class::cast)
.filter(FreeTextStaticProperty::isReconfigurable)
.forEach(sp -> {
- if (sp.getInternalName().equals("load")) {
- sp.setValue(offloadingThreshold != null ? offloadingThreshold : "0.9");
+ if (sp.getInternalName().equals("i-am-reconfigurable")) {
+ sp.setValue(Float.toString(this.reconfigurableValue++));
}
}));
- line = new Object[]{"Reconfiguration triggered", nrRuns, (this.reconfigurableValue - 1), true};
- System.out.println("Reconfiguration triggered with value " + (this.reconfigurableValue-1));
- PipelineOperationStatus message = client.pipelines().reconfigure(pipeline);
- System.out.println(message.getTitle());
- if (!message.isSuccess()) {
- line[line.length -1] = false;
+
+ line = reconfigurePipeline(pipeline, nrRuns);
+ }
+ else if (testType.equals("Offloading")) {
+
+ // when having load profiles
+ if (cpuLoadTargets != null && cpuLoadTargets.length > 1) {
+ for (String load : cpuLoadTargets) {
+ System.out.printf("Set CPU load to: %f percent", Float.parseFloat(load) * 100);
+ pipeline.getSepas().forEach(p -> p.getStaticProperties().stream()
+ .filter(FreeTextStaticProperty.class::isInstance)
+ .map(FreeTextStaticProperty.class::cast)
+ .filter(FreeTextStaticProperty::isReconfigurable)
+ .forEach(sp -> {
+ if (sp.getInternalName().equals("load")) {
+ sp.setValue(load);
+ }
+ }));
+
+ line = reconfigurePipeline(pipeline, nrRuns);
+
+ try {
+ Thread.sleep(timeToSleepBeforeManipulation);
+ } catch (InterruptedException e) {
+ e.printStackTrace();
+ }
+ }
+ } else {
+ pipeline.getSepas().forEach(p -> p.getStaticProperties().stream()
+ .filter(FreeTextStaticProperty.class::isInstance)
+ .map(FreeTextStaticProperty.class::cast)
+ .filter(FreeTextStaticProperty::isReconfigurable)
+ .forEach(sp -> {
+ if (sp.getInternalName().equals("load")) {
+ sp.setValue("0.9");
+ }
+ }));
+
+ line = reconfigurePipeline(pipeline, nrRuns);
+ }
}
}
@@ -149,7 +186,33 @@ public class GenericTest implements Test{
long sleepTime = timeToSleepAfterManipulation + (long)(Math.random()*10000);
Thread.sleep(sleepTime);
}else {
+
Thread.sleep(timeToSleepAfterManipulation);
+
+ if (shouldBeReconfigured) {
+ // Reset to baseload
+ if (testType.equals("Offloading")) {
+ System.out.printf("Set CPU load to: %f percent", Float.parseFloat(cpuEndLoad) * 100);
+
+ List<Pipeline> pipelines = client.pipelines().all();
+ Pipeline pipelineLatest = pipelines.stream()
+ .filter(p -> p.getName().equals(pipelineName))
+ .findFirst()
+ .orElseThrow(() -> new RuntimeException("Pipeline not found"));
+
+ pipelineLatest.getSepas().forEach(p -> p.getStaticProperties().stream()
+ .filter(FreeTextStaticProperty.class::isInstance)
+ .map(FreeTextStaticProperty.class::cast)
+ .filter(FreeTextStaticProperty::isReconfigurable)
+ .forEach(sp -> {
+ if (sp.getInternalName().equals("load")) {
+ sp.setValue(cpuEndLoad);
+ }
+ }));
+ line = reconfigurePipeline(pipelineLatest, nrRuns);
+ Thread.sleep(timeToSleepBeforeManipulation);
+ }
+ }
}
} catch (InterruptedException e) {
e.printStackTrace();
@@ -176,6 +239,17 @@ public class GenericTest implements Test{
}
}
+ private Object[] reconfigurePipeline(Pipeline pipeline, int nrRuns) {
+ Object[] line = new Object[]{"Reconfiguration triggered", nrRuns, (this.reconfigurableValue - 1), true};
+ System.out.println("Reconfiguration triggered with value " + (this.reconfigurableValue-1));
+ PipelineOperationStatus message = client.pipelines().reconfigure(pipeline);
+ System.out.println(message.getTitle());
+ if (!message.isSuccess()) {
+ line[line.length -1] = false;
+ }
+ return line;
+ }
+
private void executeReconfiguration(){
if (!pipeline.isRunning()) {
PipelineOperationStatus startMessage = client.pipelines().start(pipeline);
diff --git a/streampipes-performance-tests/src/main/java/org/apache/streampipes/performance/performancetest/PrioOffloadingTest.java b/streampipes-performance-tests/src/main/java/org/apache/streampipes/performance/performancetest/PrioOffloadingTest.java
new file mode 100644
index 0000000..8bd4469
--- /dev/null
+++ b/streampipes-performance-tests/src/main/java/org/apache/streampipes/performance/performancetest/PrioOffloadingTest.java
@@ -0,0 +1,379 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.apache.streampipes.performance.performancetest;
+
+import org.apache.streampipes.client.StreamPipesClient;
+import org.apache.streampipes.client.StreamPipesCredentials;
+import org.apache.streampipes.messaging.mqtt.MqttConsumer;
+import org.apache.streampipes.model.Tuple2;
+import org.apache.streampipes.model.graph.DataProcessorInvocation;
+import org.apache.streampipes.model.grounding.MqttTransportProtocol;
+import org.apache.streampipes.model.grounding.SimpleTopicDefinition;
+import org.apache.streampipes.model.grounding.TopicDefinition;
+import org.apache.streampipes.model.pipeline.Pipeline;
+import org.apache.streampipes.model.pipeline.PipelineOperationStatus;
+import org.apache.streampipes.model.staticproperty.FreeTextStaticProperty;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.nio.charset.StandardCharsets;
+import java.time.Duration;
+import java.time.Instant;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+
+public class PrioOffloadingTest implements Test {
+
+ private static final Logger LOG = LoggerFactory.getLogger(PrioOffloadingTest.class.getCanonicalName());
+
+ private boolean stopPipeline;
+ private final long timeToSleepBetweenSteps;
+ private final StreamPipesClient client;
+
+ private final String pipelineNameLow;
+ private final String pipelineNameHigh;
+
+ private Pipeline pipelineLowPrio;
+ private Pipeline pipelineHighPrio;
+
+ private final List<Tuple2<String, String>> cpuSteps;
+
+ public PrioOffloadingTest(String pipelineNameLow,
+ String pipelineNameHigh,
+ boolean stopPipeline,
+ long timeToSleepBetweenSteps) {
+
+ this.pipelineNameLow = pipelineNameLow;
+ this.pipelineNameHigh = pipelineNameHigh;
+ this.stopPipeline = stopPipeline;
+ this.timeToSleepBetweenSteps = timeToSleepBetweenSteps;
+
+ // Create an instance of the StreamPipes client
+ client = createStreamPipesClient();
+
+ pipelineLowPrio = findPipelineByName(pipelineNameLow);
+ pipelineHighPrio = findPipelineByName(pipelineNameHigh);
+
+ this.cpuSteps = loadFromEnv();
+ }
+
+ @Override
+ public void setStopPipeline(boolean stopPipeline) {
+ this.stopPipeline = stopPipeline;
+ }
+
+ @Override
+ public void execute(int nrRuns) {
+
+ Instant start = Instant.now();
+
+ //Start Pipeline
+ if (!pipelineLowPrio.isRunning() && !pipelineHighPrio.isRunning()) {
+ startingPipelines();
+ // provide some time prior to reconfigure values
+ try {
+ Thread.sleep(5000);
+ } catch (InterruptedException e) {
+ e.printStackTrace();
+ }
+ }
+
+ Thread initialCpuStepThread = getInitialCpuStepThread();
+ // start offloading observer thread
+ Thread offloadingObserverThread = offloadingObserverThread(initialCpuStepThread);
+
+ initialCpuStepThread.start();
+ offloadingObserverThread.start();
+
+ try {
+
+ LOG.info("Wait for offloading to take place");
+ initialCpuStepThread.join();
+ offloadingObserverThread.join();
+
+ } catch (InterruptedException e) {
+ e.printStackTrace();
+ }
+
+ Instant end = Instant.now();
+ Duration elapsed = Duration.between(start, end);
+ Duration remaining = Duration.ofMinutes(30).minus(elapsed.plus(Duration.ofMinutes(5)));
+
+ adaptHighPrioPipelineCpuLoad();
+
+ try {
+ LOG.info("Sleep for remaining duration of offloading interval: {} min", remaining.toMinutes());
+ Thread.sleep(remaining.toMillis());
+ } catch (InterruptedException e) {
+ e.printStackTrace();
+ }
+
+ adaptHighLowPrioPipelineCpuLoad();
+
+ Duration finalSegment = Duration.ofMinutes(5);
+ try {
+ LOG.info("Sleep for final segment of offloading test: {} min", finalSegment.toMinutes());
+ Thread.sleep(finalSegment.toMillis());
+ } catch (InterruptedException e) {
+ e.printStackTrace();
+ }
+
+ //Stop Pipeline
+ if (stopPipeline && pipelineLowPrio.isRunning() && pipelineHighPrio.isRunning()) {
+ stoppingPipeline(pipelineLowPrio);
+ stoppingPipeline(pipelineHighPrio);
+ }
+ }
+
+
+ // Helpers
+
+ private void adaptHighPrioPipelineCpuLoad() {
+
+ // Adjust high prio pipeline CPU load, leave low prio pipeline at given CPU load
+ pipelineHighPrio = findPipelineByName(pipelineNameHigh);
+
+ LOG.info("Prepare and reconfigure high prio pipeline {} with {} on node {}",
+ pipelineHighPrio.getName(),
+ "CPU Burner",
+ findDeploymentTargetByProcessorName(pipelineHighPrio, "CPU Burner"));
+ String highPrioCpuStepBeforeOffloading = cpuSteps.get(1).b;
+ prepareAndReconfigurePipeline(pipelineHighPrio, "load", highPrioCpuStepBeforeOffloading);
+ }
+
+ private void adaptHighLowPrioPipelineCpuLoad() {
+ LOG.info("Reduce CPU load for both high and low prio pipeline");
+
+ // Adjust high prio pipeline CPU load, leave low prio pipeline at given CPU load
+ pipelineLowPrio = findPipelineByName(pipelineNameLow);
+ pipelineHighPrio = findPipelineByName(pipelineNameHigh);
+
+ LOG.info("Prepare and reconfigure low prio pipeline {} with {} on node {}",
+ pipelineLowPrio.getName(),
+ "CPU Burner",
+ findDeploymentTargetByProcessorName(pipelineLowPrio, "CPU Burner"));
+ String lowPrioCpuStepBeforeOffloading = cpuSteps.get(0).a;
+ prepareAndReconfigurePipeline(pipelineLowPrio, "load", lowPrioCpuStepBeforeOffloading);
+
+ LOG.info("Prepare and reconfigure high prio pipeline {} with {} on node {}",
+ pipelineHighPrio.getName(),
+ "CPU Burner",
+ findDeploymentTargetByProcessorName(pipelineHighPrio, "CPU Burner"));
+ String highPrioCpuStepBeforeOffloading = cpuSteps.get(0).b;
+ prepareAndReconfigurePipeline(pipelineHighPrio, "load", highPrioCpuStepBeforeOffloading);
+ }
+
+ private void startingPipelines() {
+ PipelineOperationStatus startStatusLowPrio = client.pipelines().start(pipelineLowPrio);
+ PipelineOperationStatus startStatusHighPrio = client.pipelines().start(pipelineHighPrio);
+
+ String lowPrioNode = findDeploymentTargetByProcessorName(pipelineLowPrio, "CPU Burner");
+ String highPrioNode = findDeploymentTargetByProcessorName(pipelineHighPrio, "CPU Burner");
+
+ LOG.info("Start status low prio pipeline {} with {} on node {}",
+ startStatusLowPrio.getTitle(),
+ "CPU Burner",
+ lowPrioNode);
+ if (startStatusLowPrio.isSuccess()) {
+ pipelineLowPrio.setRunning(true);
+ }
+
+ LOG.info("Start status high prio pipeline {} with {} on node {}",
+ startStatusHighPrio.getTitle(),
+ "CPU Burner",
+ highPrioNode);
+ if (startStatusHighPrio.isSuccess()) {
+ pipelineHighPrio.setRunning(true);
+ }
+ }
+
+ private StreamPipesClient createStreamPipesClient() {
+ StreamPipesCredentials credentials = StreamPipesCredentials.from(
+ System.getenv("SP_USER"),
+ System.getenv("SP_API_KEY"));
+
+ return StreamPipesClient
+ .create(System.getenv("SP_HOST"),
+ Integer.parseInt(System.getenv("SP_PORT")),
+ credentials, true);
+ }
+
+ private void prepareAndReconfigurePipeline(Pipeline pipeline,
+ String fstInternalName,
+ String fstReconfigurationValue) {
+ preparingPipeline(pipeline, fstInternalName, fstReconfigurationValue);
+ reconfiguringPipeline(pipeline, fstReconfigurationValue);
+ }
+
+ private void preparingPipeline(Pipeline pipeline, String fstInternalName, String fstReconfigurationValue) {
+ LOG.info("Set CPU load for {} pipeline to: {} percent",
+ pipeline.getName(),
+ Float.parseFloat(fstReconfigurationValue) * 100);
+
+ pipeline.getSepas().forEach(processor -> processor.getStaticProperties().stream()
+ .filter(FreeTextStaticProperty.class::isInstance)
+ .map(FreeTextStaticProperty.class::cast)
+ .filter(FreeTextStaticProperty::isReconfigurable)
+ .forEach(fsp -> {
+ if (fsp.getInternalName().equals(fstInternalName)) {
+ fsp.setValue(fstReconfigurationValue);
+ }
+ })
+ );
+ }
+
+ private void reconfiguringPipeline(Pipeline pipeline, String fstReconfigurationValue) {
+ LOG.info("Reconfiguration triggered with value " + fstReconfigurationValue);
+ PipelineOperationStatus statusReconfiguration = client.pipelines().reconfigure(pipeline);
+ if (!statusReconfiguration.isSuccess()) {
+ LOG.info("Pipeline {} successfully reconfigured", pipeline.getName());
+ }
+ }
+
+ private void stoppingPipeline(Pipeline pipeline) {
+ PipelineOperationStatus stopStatus = client.pipelines().stop(pipeline);
+ if (stopStatus.isSuccess()) {
+ LOG.info("Stop status pipeline: " + stopStatus.getTitle());
+ pipelineLowPrio.setRunning(false);
+ } else {
+ LOG.info("Pipeline {} could not be stopped." + stopStatus.getPipelineName());
+ }
+ }
+
+ private Pipeline findPipelineByName(String pipelineName) {
+ List<Pipeline> pipelines = client.pipelines().all();
+ return pipelines.stream()
+ .filter(p -> p.getName().equals(pipelineName))
+ .findFirst()
+ .orElseThrow(() -> new RuntimeException("Pipeline not found"));
+ }
+
+ private String findDeploymentTargetByProcessorName(Pipeline pipeline, String processorName) {
+ DataProcessorInvocation graph = pipeline.getSepas().stream()
+ .filter(processor -> processor.getName().equals(processorName))
+ .findFirst()
+ .orElseThrow(() -> new RuntimeException("Processor not found"));
+ return graph.getDeploymentTargetNodeId();
+ }
+
+ private Thread offloadingObserverThread(Thread cpuStepThread) {
+ LOG.info("Offloading observer thread started");
+ return new Thread() {
+ public synchronized void run() {
+ String mqttLoggingUrlString = System.getenv("SP_LOGGING_MQTT_URL");
+ try {
+ MqttTransportProtocol transportProtocol = makeMqttTransportProtocol(
+ new URI(mqttLoggingUrlString),
+ "Offloading");
+
+ MqttConsumer consumer = new MqttConsumer();
+ consumer.connect(transportProtocol, event -> {
+ String[] eventArray = new String(event, StandardCharsets.UTF_8).split(",");
+ boolean offloaded = Arrays.asList(eventArray).contains("offloading done");
+ if (offloaded) {
+ LOG.info("Low prio pipeline element offloaded {}", eventArray[4]);
+ cpuStepThread.interrupt();
+ try {
+ consumer.close();
+ } catch (Exception e) {
+ e.printStackTrace();
+ }
+ }
+ });
+ } catch (URISyntaxException e) {
+ e.printStackTrace();
+ }
+ }
+ };
+ }
+
+ private Thread getInitialCpuStepThread() {
+ LOG.info("Initial CPU step thread started");
+ return new Thread(() -> {
+ while (!Thread.currentThread().isInterrupted()) {
+ try {
+ int counter = 0;
+ if (cpuSteps.size() > 1) {
+ for (Tuple2<String, String> cpuStep : cpuSteps) {
+
+ if (counter > 0) {
+ LOG.info("Update pipeline locations");
+ pipelineLowPrio = findPipelineByName(pipelineNameLow);
+ pipelineHighPrio = findPipelineByName(pipelineNameHigh);
+ }
+
+ String lowPrioCpuStep = cpuStep.a;
+ String highPrioCpuStep = cpuStep.b;
+
+ String lowPrioNode = findDeploymentTargetByProcessorName(pipelineLowPrio, "CPU Burner");
+ String highPrioNode = findDeploymentTargetByProcessorName(pipelineHighPrio, "CPU Burner");
+
+ LOG.info("Prepare and reconfigure low prio pipeline {} with {} on node {}",
+ pipelineLowPrio.getName(),
+ "CPU Burner",
+ lowPrioNode);
+ prepareAndReconfigurePipeline(pipelineLowPrio, "load", lowPrioCpuStep);
+
+ LOG.info("Prepare and reconfigure high prio pipeline {} with {} on node {}",
+ pipelineHighPrio.getName(),
+ "CPU Burner",
+ highPrioNode);
+ prepareAndReconfigurePipeline(pipelineHighPrio, "load", highPrioCpuStep);
+
+ counter++;
+
+ Thread.sleep(timeToSleepBetweenSteps);
+ }
+ }
+ } catch (InterruptedException e) {
+ LOG.info("Interrupted cpu steps thread");
+ Thread.currentThread().interrupt();
+ }
+ }
+ });
+ }
+
+ private MqttTransportProtocol makeMqttTransportProtocol(URI uri, String topic) {
+ MqttTransportProtocol transportProtocol = new MqttTransportProtocol();
+ TopicDefinition simpleTopicDefinition = new SimpleTopicDefinition(topic);
+ transportProtocol.setBrokerHostname(uri.getHost());
+ transportProtocol.setPort(uri.getPort());
+ transportProtocol.setTopicDefinition(simpleTopicDefinition);
+ return transportProtocol;
+ }
+
+ private List<Tuple2<String, String>> loadFromEnv() {
+ String[] cpuLoadStepsLowPrio = System.getenv("OFFLOADING_CPU_LOAD_STEPS_LOW_PRIO").split(";");
+ String[] cpuLoadStepsHighPrio = System.getenv("OFFLOADING_CPU_LOAD_STEPS_HIGH_PRIO").split(";");
+
+ if (cpuLoadStepsLowPrio.length != cpuLoadStepsHighPrio.length) {
+ throw new RuntimeException("CPU load steps array must be of equal size");
+ }
+
+ List<Tuple2<String, String>> cpuSteps = new ArrayList<>();
+ int length = cpuLoadStepsLowPrio.length;
+ for (int i = 0; i < length; i++) {
+ cpuSteps.add(new Tuple2<>(cpuLoadStepsLowPrio[i], cpuLoadStepsHighPrio[i]));
+ }
+ return cpuSteps;
+ }
+
+}
diff --git a/streampipes-performance-tests/src/main/java/org/apache/streampipes/performance/performancetest/TestConsumer.java b/streampipes-performance-tests/src/main/java/org/apache/streampipes/performance/performancetest/TestConsumer.java
new file mode 100644
index 0000000..04f4284
--- /dev/null
+++ b/streampipes-performance-tests/src/main/java/org/apache/streampipes/performance/performancetest/TestConsumer.java
@@ -0,0 +1,155 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.apache.streampipes.performance.performancetest;
+
+import org.apache.streampipes.messaging.mqtt.MqttConsumer;
+import org.apache.streampipes.model.Tuple2;
+import org.apache.streampipes.model.grounding.MqttTransportProtocol;
+import org.apache.streampipes.model.grounding.SimpleTopicDefinition;
+import org.apache.streampipes.model.grounding.TopicDefinition;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.nio.charset.StandardCharsets;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.Map;
+
+public class TestConsumer {
+
+ private static final Logger LOG = LoggerFactory.getLogger(CountBasedOffloadingTest.class.getCanonicalName());
+ private static final int maxOffloadingActions = 2;
+
+ private static volatile boolean maxOffloadingsReached = false;
+ private static final String[] cpuSteps = new String[]{"0.2","0.5", "0.9"};
+
+ public static void main(String ... args) throws Exception {
+
+ // start offloading observer thread
+ Thread initialRampUpThread = getInitialCpuStepThread();
+ Thread offloadingObserverThread = offloadingObserverThread(initialRampUpThread);
+ initialRampUpThread.start();
+ offloadingObserverThread.start();
+
+ try {
+ LOG.info("Wait until max. number offloadings is completed: {}", maxOffloadingActions);
+ initialRampUpThread.join();
+ offloadingObserverThread.join();
+ } catch (InterruptedException e) {
+ e.printStackTrace();
+ }
+
+
+ LOG.info("Continue Outside...");
+
+ }
+
+
+ private static Thread offloadingObserverThread(Thread initialRampUpThread) {
+ LOG.info("Offloading observer thread started");
+ return new Thread() {
+ public synchronized void run() {
+
+ Map<String, Integer> offloadingDeviceMap = new HashMap<>();
+
+ String mqttLoggingUrlString = System.getenv("SP_LOGGING_MQTT_URL");
+ try {
+ MqttTransportProtocol transportProtocol = makeMqttTransportProtocol(
+ new URI(mqttLoggingUrlString),
+ "Offloading");
+
+ MqttConsumer consumer = new MqttConsumer();
+ consumer.connect(transportProtocol, event -> {
+ String[] eventArray = new String(event, StandardCharsets.UTF_8).split(",");
+ boolean offloaded = Arrays.asList(eventArray).contains("offloading done");
+
+ if (offloaded) {
+ LOG.info("Pipeline element offloaded {}", eventArray[4]);
+
+ String originDeviceId = eventArray[1];
+ if (offloadingDeviceMap.get(originDeviceId) == null) {
+ offloadingDeviceMap.put(originDeviceId, 1);
+ LOG.info("Current number of offloadings for {}: {}/{}",
+ originDeviceId, 1, maxOffloadingActions);
+ } else {
+ int previousOffloadingCount = offloadingDeviceMap.get(originDeviceId);
+ offloadingDeviceMap.put(originDeviceId, previousOffloadingCount + 1);
+ LOG.info("Current number of offloadings for {}: {}/{}",
+ originDeviceId, previousOffloadingCount + 1, maxOffloadingActions);
+ }
+
+ boolean reachedMaxOffloadingActions = offloadingDeviceMap.values()
+ .stream()
+ .allMatch(numOffloadingActions -> numOffloadingActions >= maxOffloadingActions);
+
+ if (reachedMaxOffloadingActions) {
+ LOG.info("Reached max offloading actions");
+ maxOffloadingsReached = true;
+ initialRampUpThread.interrupt();
+ try {
+ consumer.close();
+ } catch (Exception e) {
+ e.printStackTrace();
+ }
+ }
+
+ }
+ });
+ } catch (URISyntaxException e) {
+ e.printStackTrace();
+ }
+ }
+ };
+ }
+
+ private static MqttTransportProtocol makeMqttTransportProtocol(URI uri, String topic) {
+ MqttTransportProtocol transportProtocol = new MqttTransportProtocol();
+ TopicDefinition simpleTopicDefinition = new SimpleTopicDefinition(topic);
+ transportProtocol.setBrokerHostname(uri.getHost());
+ transportProtocol.setPort(uri.getPort());
+ transportProtocol.setTopicDefinition(simpleTopicDefinition);
+ return transportProtocol;
+ }
+
+ private static Thread getInitialCpuStepThread() {
+ LOG.info("Initial CPU step thread started");
+ return new Thread(() -> {
+ int counter = 0;
+ while (!Thread.currentThread().isInterrupted()) {
+ try {
+ if (cpuSteps.length > 1 & counter != cpuSteps.length) {
+ for (String cpuStep : cpuSteps) {
+
+ LOG.info("Reconfigure wiht {}", cpuStep);
+
+ counter++;
+
+ Thread.sleep(5000);
+ }
+ }
+ } catch (InterruptedException e) {
+ LOG.info("Interrupted cpu steps thread");
+ Thread.currentThread().interrupt();
+ }
+ }
+ });
+ }
+
+}