You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@streampipes.apache.org by go...@apache.org on 2021/10/13 18:55:57 UTC
[incubator-streampipes] 01/04: fixed issues with evaluation logging
This is an automated email from the ASF dual-hosted git repository.
gomm pushed a commit to branch edge-extensions
in repository https://gitbox.apache.org/repos/asf/incubator-streampipes.git
commit 98c810138f65e3bd5c2ae7db337029ccc9812da9
Author: daniel-gomm <da...@outlook.de>
AuthorDate: Fri Oct 8 09:40:12 2021 +0200
fixed issues with evaluation logging
---
.../streampipes/logging/evaluation/EvaluationLogger.java | 2 +-
.../policies/ThresholdViolationOffloadingPolicy.java | 15 +++++++++++----
.../org/apache/streampipes/performance/TestFactory.java | 4 ++--
.../performance/performancetest/GenericTest.java | 15 +++++++++++++--
4 files changed, 27 insertions(+), 9 deletions(-)
diff --git a/streampipes-logging/src/main/java/org/apache/streampipes/logging/evaluation/EvaluationLogger.java b/streampipes-logging/src/main/java/org/apache/streampipes/logging/evaluation/EvaluationLogger.java
index 19ccb4b..ccd0a7a 100644
--- a/streampipes-logging/src/main/java/org/apache/streampipes/logging/evaluation/EvaluationLogger.java
+++ b/streampipes-logging/src/main/java/org/apache/streampipes/logging/evaluation/EvaluationLogger.java
@@ -37,7 +37,7 @@ public class EvaluationLogger {
private EvaluationLogger(){
String loggingUrl = System.getenv("SP_LOGGING_MQTT_URL");
- String nodeId = System.getenv("SP_LOGGING_MQTT_URL");
+ String nodeId = System.getenv("SP_NODE_CONTROLLER_ID");
if (nodeId != null){
this.deviceId = nodeId;
}else {
diff --git a/streampipes-node-controller/src/main/java/org/apache/streampipes/node/controller/management/offloading/strategies/policies/ThresholdViolationOffloadingPolicy.java b/streampipes-node-controller/src/main/java/org/apache/streampipes/node/controller/management/offloading/strategies/policies/ThresholdViolationOffloadingPolicy.java
index 39965d9..e7c60fb 100644
--- a/streampipes-node-controller/src/main/java/org/apache/streampipes/node/controller/management/offloading/strategies/policies/ThresholdViolationOffloadingPolicy.java
+++ b/streampipes-node-controller/src/main/java/org/apache/streampipes/node/controller/management/offloading/strategies/policies/ThresholdViolationOffloadingPolicy.java
@@ -54,6 +54,17 @@ public class ThresholdViolationOffloadingPolicy<T extends Comparable<T>> impleme
if(!this.history.offer(value)) {
this.history.poll();
this.history.offer(value);
+ //Only for logging; can be removed later
+ if(value.compareTo(this.threshold) > 0){
+ int numViolations = 0;
+ for(T val : this.history){
+ if(val.compareTo(this.threshold) > 0){
+ numViolations++;
+ }
+ }
+ Object[] line = {"policy violation #" + numViolations};
+ EvaluationLogger.getInstance().logMQTT("Offloading", line);
+ }
}
}
@@ -65,8 +76,6 @@ public class ThresholdViolationOffloadingPolicy<T extends Comparable<T>> impleme
for(T value : this.history){
if(value.compareTo(this.threshold) > 0){
numViolations++;
- Object[] line = {"policy violation #" + numViolations};
- EvaluationLogger.getInstance().logMQTT("Offloading", line);
}
}
break;
@@ -74,8 +83,6 @@ public class ThresholdViolationOffloadingPolicy<T extends Comparable<T>> impleme
for(T value : this.history){
if(value.compareTo(this.threshold) < 0){
numViolations++;
- Object[] line = {"policy violation #" + numViolations};
- EvaluationLogger.getInstance().logMQTT("Offloading", line);
}
}
break;
diff --git a/streampipes-performance-tests/src/main/java/org/apache/streampipes/performance/TestFactory.java b/streampipes-performance-tests/src/main/java/org/apache/streampipes/performance/TestFactory.java
index 951a540..4cb6dbe 100644
--- a/streampipes-performance-tests/src/main/java/org/apache/streampipes/performance/TestFactory.java
+++ b/streampipes-performance-tests/src/main/java/org/apache/streampipes/performance/TestFactory.java
@@ -34,7 +34,7 @@ public class TestFactory {
case "Latency":
return getLatencyTest();
case "Migration":
- Object[] header_migration = {"timestampInMillis", "deviceId", "event", "numberOfRuns", "durationInNanos", "durationInSecs", "originNode", "targetNode"};
+ Object[] header_migration = {"timestampInMillis", "deviceId", "event", "numberOfRuns", "durationInNanos", "durationInSecs", "originNode", "targetNode", "success"};
logger.logHeader("Migration", header_migration);
return getMigrationTest();
case "Reconfiguration":
@@ -73,7 +73,7 @@ public class TestFactory {
public static Test getOffloadingTest(){
return new GenericTest(getPipelineName(), false, false,
- true, 20000, 600000);
+ true, 20000, 1500000);
}
//Helpers
diff --git a/streampipes-performance-tests/src/main/java/org/apache/streampipes/performance/performancetest/GenericTest.java b/streampipes-performance-tests/src/main/java/org/apache/streampipes/performance/performancetest/GenericTest.java
index 55ce48b..892a5c6 100644
--- a/streampipes-performance-tests/src/main/java/org/apache/streampipes/performance/performancetest/GenericTest.java
+++ b/streampipes-performance-tests/src/main/java/org/apache/streampipes/performance/performancetest/GenericTest.java
@@ -98,7 +98,7 @@ public class GenericTest implements Test{
PipelineOperationStatus migrationMessage = client.pipelines().migrate(pipeline);
long migrationDuration = System.nanoTime() - beforeMigration;
if(testType.equals("Migration")){
- line = new Object[]{System.currentTimeMillis(), "Migration duration", nrRuns, migrationDuration, migrationDuration/1000000000.0,migrationNodes.a, migrationNodes.b, true};
+ line = new Object[]{"Migration duration", nrRuns, migrationDuration, migrationDuration/1000000000.0,migrationNodes.a, migrationNodes.b, true};
}
System.out.println(migrationMessage.getTitle());
if (!migrationMessage.isSuccess()) {
@@ -108,7 +108,8 @@ public class GenericTest implements Test{
}
//Reconfiguration
if (shouldBeReconfigured) {
- pipeline.getSepas().forEach(p -> p.getStaticProperties().stream()
+ if (testType.equals("Reconfiguration"))
+ pipeline.getSepas().forEach(p -> p.getStaticProperties().stream()
.filter(FreeTextStaticProperty.class::isInstance)
.map(FreeTextStaticProperty.class::cast)
.filter(FreeTextStaticProperty::isReconfigurable)
@@ -117,6 +118,16 @@ public class GenericTest implements Test{
sp.setValue(Float.toString(this.reconfigurableValue++));
}
}));
+ else if (testType.equals("Offloading"))
+ pipeline.getSepas().forEach(p -> p.getStaticProperties().stream()
+ .filter(FreeTextStaticProperty.class::isInstance)
+ .map(FreeTextStaticProperty.class::cast)
+ .filter(FreeTextStaticProperty::isReconfigurable)
+ .forEach(sp -> {
+ if (sp.getInternalName().equals("load")) {
+ sp.setValue(Float.toString(0.9f));
+ }
+ }));
line = new Object[]{"Reconfiguration triggered", nrRuns, (this.reconfigurableValue - 1), true};
System.out.println("Reconfiguration triggered with value " + (this.reconfigurableValue-1));
PipelineOperationStatus message = client.pipelines().reconfigure(pipeline);