You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@streampipes.apache.org by go...@apache.org on 2021/10/06 09:05:47 UTC

[incubator-streampipes] 01/03: generified evaluation logging

This is an automated email from the ASF dual-hosted git repository.

gomm pushed a commit to branch edge-extensions
in repository https://gitbox.apache.org/repos/asf/incubator-streampipes.git

commit 613242c95a0329c7cb3b9d5db54de14153e5b0cb
Author: daniel-gomm <da...@outlook.de>
AuthorDate: Wed Oct 6 10:59:37 2021 +0200

    generified evaluation logging
---
 .../logging/evaluation/EvaluationLogger.java       | 30 +++++++++++++++++-----
 .../controller/api/InvocableEntityResource.java    |  4 +--
 .../offloading/OffloadingPolicyManager.java        |  6 ++---
 .../ThresholdViolationOffloadingPolicy.java        |  5 ++++
 .../performance/performancetest/GenericTest.java   |  2 +-
 .../pipeline/PipelineMigrationExecutor.java        | 10 ++++----
 6 files changed, 40 insertions(+), 17 deletions(-)

diff --git a/streampipes-logging/src/main/java/org/apache/streampipes/logging/evaluation/EvaluationLogger.java b/streampipes-logging/src/main/java/org/apache/streampipes/logging/evaluation/EvaluationLogger.java
index 9790b0e..19ccb4b 100644
--- a/streampipes-logging/src/main/java/org/apache/streampipes/logging/evaluation/EvaluationLogger.java
+++ b/streampipes-logging/src/main/java/org/apache/streampipes/logging/evaluation/EvaluationLogger.java
@@ -28,6 +28,7 @@ public class EvaluationLogger {
     private static EvaluationLogger instance = null;
     private final MQTT mqtt;
     private final BlockingConnection connection;
+    private final String deviceId;
 
     public static EvaluationLogger getInstance(){
         if(instance==null) instance = new EvaluationLogger();
@@ -36,6 +37,12 @@ public class EvaluationLogger {
 
     private EvaluationLogger(){
         String loggingUrl = System.getenv("SP_LOGGING_MQTT_URL");
+        String nodeId = System.getenv("SP_LOGGING_MQTT_URL");
+        if (nodeId != null){
+            this.deviceId = nodeId;
+        }else {
+            this.deviceId = "default";
+        }
         mqtt = new MQTT();
         try {
             mqtt.setHost(loggingUrl);
@@ -51,16 +58,27 @@ public class EvaluationLogger {
     }
 
     public void logMQTT(String topic, Object[] elements){
+        String message = System.currentTimeMillis() + "," + this.deviceId + ",";
+        for(Object element:elements)
+            message += element + ",";
+        message = message.substring(0, message.length()-1);
+        publish(topic, message);
+    }
+
+    public void logHeader(String topic, Object[] elements){
         String message = "";
         for(Object element:elements)
             message += element + ",";
-        if (message.length() > 0) {
+        if (message.length() > 0)
             message = message.substring(0, message.length()-1);
-            try {
-                connection.publish(topic, message.getBytes(StandardCharsets.UTF_8), QoS.AT_LEAST_ONCE, false);
-            } catch (Exception e) {
-                e.printStackTrace();
-            }
+        publish(topic, message);
+    }
+
+    private void publish(String topic, String message){
+        try {
+            connection.publish(topic, message.getBytes(StandardCharsets.UTF_8), QoS.AT_LEAST_ONCE, false);
+        } catch (Exception e) {
+            e.printStackTrace();
         }
     }
 
diff --git a/streampipes-node-controller/src/main/java/org/apache/streampipes/node/controller/api/InvocableEntityResource.java b/streampipes-node-controller/src/main/java/org/apache/streampipes/node/controller/api/InvocableEntityResource.java
index 324efb5..b005ab7 100644
--- a/streampipes-node-controller/src/main/java/org/apache/streampipes/node/controller/api/InvocableEntityResource.java
+++ b/streampipes-node-controller/src/main/java/org/apache/streampipes/node/controller/api/InvocableEntityResource.java
@@ -84,7 +84,7 @@ public class InvocableEntityResource extends AbstractResource {
         //TODO: Remove Logger after debugging
         InvocableStreamPipesEntity graph = RunningInvocableInstances.INSTANCE.get(runningInstanceId);
         EvaluationLogger logger = EvaluationLogger.getInstance();
-        Object[] line = {System.currentTimeMillis() ,"Element detached"};
+        Object[] line = {"Element detached"};
         logger.logMQTT("Offloading", line);
         Response resp = PipelineElementManager.getInstance().detach(graph, runningInstanceId);
         RunningInvocableInstances.INSTANCE.remove(runningInstanceId);
@@ -114,7 +114,7 @@ public class InvocableEntityResource extends AbstractResource {
                     .get(0))
                     .getValue();
         }
-        Object[] line = {System.currentTimeMillis() ,"reconfiguration request received", nrRuns++, value};
+        Object[] line = {"reconfiguration request received", nrRuns++, value};
         logger.logMQTT("Reconfiguration", line);
         InvocableStreamPipesEntity graph = RunningInvocableInstances.INSTANCE.get(runningInstanceId);
         return ok(PipelineElementManager.getInstance().reconfigure(graph, reconfigurationEntity));
diff --git a/streampipes-node-controller/src/main/java/org/apache/streampipes/node/controller/management/offloading/OffloadingPolicyManager.java b/streampipes-node-controller/src/main/java/org/apache/streampipes/node/controller/management/offloading/OffloadingPolicyManager.java
index e705d3b..73ae24e 100644
--- a/streampipes-node-controller/src/main/java/org/apache/streampipes/node/controller/management/offloading/OffloadingPolicyManager.java
+++ b/streampipes-node-controller/src/main/java/org/apache/streampipes/node/controller/management/offloading/OffloadingPolicyManager.java
@@ -70,7 +70,7 @@ public class OffloadingPolicyManager {
             //Currently uses the first violated policy. Could be extended to take the degree of policy violation into
             // account
             //TODO: Remove Logger after debugging
-            Object[] line = {System.currentTimeMillis() ,"offloading triggered", violatedPolicies.get(0).getOffloadingPolicy().getClass().getSimpleName()};
+            Object[] line = {"offloading triggered", violatedPolicies.get(0).getOffloadingPolicy().getClass().getSimpleName()};
             EvaluationLogger.getInstance().logMQTT("Offloading", line);
             triggerOffloading(violatedPolicies.get(0));
         }
@@ -80,7 +80,7 @@ public class OffloadingPolicyManager {
 
     private void triggerOffloading(OffloadingStrategy strategy){
         InvocableStreamPipesEntity offloadEntity = strategy.getSelectionStrategy().select(this.unsuccessfullyTriedEntities);
-        Object[] line = {System.currentTimeMillis() ,"entity to offload selected"};
+        Object[] line = {"entity to offload selected"};
         EvaluationLogger.getInstance().logMQTT("Offloading", line);
         if(offloadEntity != null){
             Response resp = PipelineElementManager.getInstance().offload(offloadEntity);
@@ -88,7 +88,7 @@ public class OffloadingPolicyManager {
             String appId = offloadEntity.getAppId();
             String pipelineName = offloadEntity.getCorrespondingPipeline();
 
-            Object[] line_done = {System.currentTimeMillis() ,"offloading done", strategy.getOffloadingPolicy().getClass().getSimpleName(), appId};
+            Object[] line_done = {"offloading done", strategy.getOffloadingPolicy().getClass().getSimpleName(), appId};
             EvaluationLogger.getInstance().logMQTT("Offloading", line_done);
 
             if(resp.isSuccess()){
diff --git a/streampipes-node-controller/src/main/java/org/apache/streampipes/node/controller/management/offloading/strategies/policies/ThresholdViolationOffloadingPolicy.java b/streampipes-node-controller/src/main/java/org/apache/streampipes/node/controller/management/offloading/strategies/policies/ThresholdViolationOffloadingPolicy.java
index aa30b3e..39965d9 100644
--- a/streampipes-node-controller/src/main/java/org/apache/streampipes/node/controller/management/offloading/strategies/policies/ThresholdViolationOffloadingPolicy.java
+++ b/streampipes-node-controller/src/main/java/org/apache/streampipes/node/controller/management/offloading/strategies/policies/ThresholdViolationOffloadingPolicy.java
@@ -17,6 +17,7 @@
  */
 package org.apache.streampipes.node.controller.management.offloading.strategies.policies;
 
+import org.apache.streampipes.logging.evaluation.EvaluationLogger;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -64,6 +65,8 @@ public class ThresholdViolationOffloadingPolicy<T extends Comparable<T>> impleme
                 for(T value : this.history){
                     if(value.compareTo(this.threshold) > 0){
                         numViolations++;
+                        Object[] line = {"policy violation #" + numViolations};
+                        EvaluationLogger.getInstance().logMQTT("Offloading", line);
                     }
                 }
                 break;
@@ -71,6 +74,8 @@ public class ThresholdViolationOffloadingPolicy<T extends Comparable<T>> impleme
                 for(T value : this.history){
                     if(value.compareTo(this.threshold) < 0){
                         numViolations++;
+                        Object[] line = {"policy violation #" + numViolations};
+                        EvaluationLogger.getInstance().logMQTT("Offloading", line);
                     }
                 }
                 break;
diff --git a/streampipes-performance-tests/src/main/java/org/apache/streampipes/performance/performancetest/GenericTest.java b/streampipes-performance-tests/src/main/java/org/apache/streampipes/performance/performancetest/GenericTest.java
index 6d6ed70..55ce48b 100644
--- a/streampipes-performance-tests/src/main/java/org/apache/streampipes/performance/performancetest/GenericTest.java
+++ b/streampipes-performance-tests/src/main/java/org/apache/streampipes/performance/performancetest/GenericTest.java
@@ -117,7 +117,7 @@ public class GenericTest implements Test{
                             sp.setValue(Float.toString(this.reconfigurableValue++));
                         }
                     }));
-            line = new Object[]{System.currentTimeMillis(), "Reconfiguration triggered", nrRuns, (this.reconfigurableValue - 1), true};
+            line = new Object[]{"Reconfiguration triggered", nrRuns, (this.reconfigurableValue - 1), true};
             System.out.println("Reconfiguration triggered with value " + (this.reconfigurableValue-1));
             PipelineOperationStatus message = client.pipelines().reconfigure(pipeline);
             System.out.println(message.getTitle());
diff --git a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/pipeline/PipelineMigrationExecutor.java b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/pipeline/PipelineMigrationExecutor.java
index 8b0631d..2de5bc3 100644
--- a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/pipeline/PipelineMigrationExecutor.java
+++ b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/pipeline/PipelineMigrationExecutor.java
@@ -103,7 +103,7 @@ public class PipelineMigrationExecutor extends AbstractPipelineExecutor {
             return status;
         }
         long start_target_duration = System.nanoTime() - before_start_target;
-        Object[] line_start_target = {System.currentTimeMillis(), "start target element","",start_target_duration,start_target_duration/1000000000.0};
+        Object[] line_start_target = {"start target element","",start_target_duration,start_target_duration/1000000000.0};
         logger.logMQTT("Migration", line_start_target);
 
         // Stop relays from origin predecessor
@@ -114,7 +114,7 @@ public class PipelineMigrationExecutor extends AbstractPipelineExecutor {
             return status;
         }
         long stop_relays_origin_duration = System.nanoTime() - downtime_beginning;
-        Object[] line_stop_relay = {System.currentTimeMillis(), "stop relay from origin","",stop_relays_origin_duration,stop_relays_origin_duration/1000000000.0};
+        Object[] line_stop_relay = {"stop relay from origin","",stop_relays_origin_duration,stop_relays_origin_duration/1000000000.0};
         logger.logMQTT("Migration", line_stop_relay);
 
         // Start relays to target after migration
@@ -125,11 +125,11 @@ public class PipelineMigrationExecutor extends AbstractPipelineExecutor {
             return status;
         }
         long start_relay_target_duration = System.nanoTime() - before_start_relay_target;
-        Object[] line_start_relay = {System.currentTimeMillis(), "start relay to target","",start_relay_target_duration,start_relay_target_duration/1000000000.0};
+        Object[] line_start_relay = {"start relay to target","",start_relay_target_duration,start_relay_target_duration/1000000000.0};
         logger.logMQTT("Migration", line_start_relay);
 
         long downtime = System.nanoTime() - downtime_beginning;
-        Object[] line_downtime = {System.currentTimeMillis() ,"downtime", "", downtime, downtime/1000000000.0};
+        Object[] line_downtime = {"downtime", "", downtime, downtime/1000000000.0};
         logger.logMQTT("Migration", line_downtime);
 
         //Stop origin and associated relay
@@ -140,7 +140,7 @@ public class PipelineMigrationExecutor extends AbstractPipelineExecutor {
             return status;
         }
         long stop_origin_duration = System.nanoTime() - before_stop_origin;
-        Object[] line_stop_origin = {System.currentTimeMillis(), "stop origin element","",stop_origin_duration,stop_origin_duration/1000000000.0};
+        Object[] line_stop_origin = {"stop origin element","",stop_origin_duration,stop_origin_duration/1000000000.0};
         logger.logMQTT("Migration", line_stop_origin);
 
         List<InvocableStreamPipesEntity> graphs = new ArrayList<>();