You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@streampipes.apache.org by go...@apache.org on 2021/10/13 18:55:57 UTC

[incubator-streampipes] 01/04: fixed issues with evaluation logging

This is an automated email from the ASF dual-hosted git repository.

gomm pushed a commit to branch edge-extensions
in repository https://gitbox.apache.org/repos/asf/incubator-streampipes.git

commit 98c810138f65e3bd5c2ae7db337029ccc9812da9
Author: daniel-gomm <da...@outlook.de>
AuthorDate: Fri Oct 8 09:40:12 2021 +0200

    fixed issues with evaluation logging
---
 .../streampipes/logging/evaluation/EvaluationLogger.java  |  2 +-
 .../policies/ThresholdViolationOffloadingPolicy.java      | 15 +++++++++++----
 .../org/apache/streampipes/performance/TestFactory.java   |  4 ++--
 .../performance/performancetest/GenericTest.java          | 15 +++++++++++++--
 4 files changed, 27 insertions(+), 9 deletions(-)

diff --git a/streampipes-logging/src/main/java/org/apache/streampipes/logging/evaluation/EvaluationLogger.java b/streampipes-logging/src/main/java/org/apache/streampipes/logging/evaluation/EvaluationLogger.java
index 19ccb4b..ccd0a7a 100644
--- a/streampipes-logging/src/main/java/org/apache/streampipes/logging/evaluation/EvaluationLogger.java
+++ b/streampipes-logging/src/main/java/org/apache/streampipes/logging/evaluation/EvaluationLogger.java
@@ -37,7 +37,7 @@ public class EvaluationLogger {
 
     private EvaluationLogger(){
         String loggingUrl = System.getenv("SP_LOGGING_MQTT_URL");
-        String nodeId = System.getenv("SP_LOGGING_MQTT_URL");
+        String nodeId = System.getenv("SP_NODE_CONTROLLER_ID");
         if (nodeId != null){
             this.deviceId = nodeId;
         }else {
diff --git a/streampipes-node-controller/src/main/java/org/apache/streampipes/node/controller/management/offloading/strategies/policies/ThresholdViolationOffloadingPolicy.java b/streampipes-node-controller/src/main/java/org/apache/streampipes/node/controller/management/offloading/strategies/policies/ThresholdViolationOffloadingPolicy.java
index 39965d9..e7c60fb 100644
--- a/streampipes-node-controller/src/main/java/org/apache/streampipes/node/controller/management/offloading/strategies/policies/ThresholdViolationOffloadingPolicy.java
+++ b/streampipes-node-controller/src/main/java/org/apache/streampipes/node/controller/management/offloading/strategies/policies/ThresholdViolationOffloadingPolicy.java
@@ -54,6 +54,17 @@ public class ThresholdViolationOffloadingPolicy<T extends Comparable<T>> impleme
         if(!this.history.offer(value)) {
             this.history.poll();
             this.history.offer(value);
+            //Only for logging; can be removed later
+            if(value.compareTo(this.threshold) > 0){
+                int numViolations = 0;
+                for(T val : this.history){
+                    if(val.compareTo(this.threshold) > 0){
+                        numViolations++;
+                    }
+                }
+                Object[] line = {"policy violation #" + numViolations};
+                EvaluationLogger.getInstance().logMQTT("Offloading", line);
+            }
         }
     }
 
@@ -65,8 +76,6 @@ public class ThresholdViolationOffloadingPolicy<T extends Comparable<T>> impleme
                 for(T value : this.history){
                     if(value.compareTo(this.threshold) > 0){
                         numViolations++;
-                        Object[] line = {"policy violation #" + numViolations};
-                        EvaluationLogger.getInstance().logMQTT("Offloading", line);
                     }
                 }
                 break;
@@ -74,8 +83,6 @@ public class ThresholdViolationOffloadingPolicy<T extends Comparable<T>> impleme
                 for(T value : this.history){
                     if(value.compareTo(this.threshold) < 0){
                         numViolations++;
-                        Object[] line = {"policy violation #" + numViolations};
-                        EvaluationLogger.getInstance().logMQTT("Offloading", line);
                     }
                 }
                 break;
diff --git a/streampipes-performance-tests/src/main/java/org/apache/streampipes/performance/TestFactory.java b/streampipes-performance-tests/src/main/java/org/apache/streampipes/performance/TestFactory.java
index 951a540..4cb6dbe 100644
--- a/streampipes-performance-tests/src/main/java/org/apache/streampipes/performance/TestFactory.java
+++ b/streampipes-performance-tests/src/main/java/org/apache/streampipes/performance/TestFactory.java
@@ -34,7 +34,7 @@ public class TestFactory {
             case "Latency":
                 return getLatencyTest();
             case "Migration":
-                Object[] header_migration = {"timestampInMillis", "deviceId", "event", "numberOfRuns", "durationInNanos", "durationInSecs", "originNode", "targetNode"};
+                Object[] header_migration = {"timestampInMillis", "deviceId", "event", "numberOfRuns", "durationInNanos", "durationInSecs", "originNode", "targetNode", "success"};
                 logger.logHeader("Migration", header_migration);
                 return getMigrationTest();
             case "Reconfiguration":
@@ -73,7 +73,7 @@ public class TestFactory {
 
     public static Test getOffloadingTest(){
         return new GenericTest(getPipelineName(), false, false,
-                true, 20000, 600000);
+                true, 20000, 1500000);
     }
 
     //Helpers
diff --git a/streampipes-performance-tests/src/main/java/org/apache/streampipes/performance/performancetest/GenericTest.java b/streampipes-performance-tests/src/main/java/org/apache/streampipes/performance/performancetest/GenericTest.java
index 55ce48b..892a5c6 100644
--- a/streampipes-performance-tests/src/main/java/org/apache/streampipes/performance/performancetest/GenericTest.java
+++ b/streampipes-performance-tests/src/main/java/org/apache/streampipes/performance/performancetest/GenericTest.java
@@ -98,7 +98,7 @@ public class GenericTest implements Test{
             PipelineOperationStatus migrationMessage = client.pipelines().migrate(pipeline);
             long migrationDuration = System.nanoTime() - beforeMigration;
             if(testType.equals("Migration")){
-                line = new Object[]{System.currentTimeMillis(), "Migration duration", nrRuns, migrationDuration, migrationDuration/1000000000.0,migrationNodes.a, migrationNodes.b, true};
+                line = new Object[]{"Migration duration", nrRuns, migrationDuration, migrationDuration/1000000000.0,migrationNodes.a, migrationNodes.b, true};
             }
             System.out.println(migrationMessage.getTitle());
             if (!migrationMessage.isSuccess()) {
@@ -108,7 +108,8 @@ public class GenericTest implements Test{
         }
         //Reconfiguration
         if (shouldBeReconfigured) {
-            pipeline.getSepas().forEach(p -> p.getStaticProperties().stream()
+            if (testType.equals("Reconfiguration"))
+                pipeline.getSepas().forEach(p -> p.getStaticProperties().stream()
                     .filter(FreeTextStaticProperty.class::isInstance)
                     .map(FreeTextStaticProperty.class::cast)
                     .filter(FreeTextStaticProperty::isReconfigurable)
@@ -117,6 +118,16 @@ public class GenericTest implements Test{
                             sp.setValue(Float.toString(this.reconfigurableValue++));
                         }
                     }));
+            else if (testType.equals("Offloading"))
+                pipeline.getSepas().forEach(p -> p.getStaticProperties().stream()
+                        .filter(FreeTextStaticProperty.class::isInstance)
+                        .map(FreeTextStaticProperty.class::cast)
+                        .filter(FreeTextStaticProperty::isReconfigurable)
+                        .forEach(sp -> {
+                            if (sp.getInternalName().equals("load")) {
+                                sp.setValue(Float.toString(0.9f));
+                            }
+                        }));
             line = new Object[]{"Reconfiguration triggered", nrRuns, (this.reconfigurableValue - 1), true};
             System.out.println("Reconfiguration triggered with value " + (this.reconfigurableValue-1));
             PipelineOperationStatus message = client.pipelines().reconfigure(pipeline);