You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@streampipes.apache.org by go...@apache.org on 2021/10/06 09:05:46 UTC

[incubator-streampipes] branch edge-extensions updated (997d4a7 -> 4e57275)

This is an automated email from the ASF dual-hosted git repository.

gomm pushed a change to branch edge-extensions
in repository https://gitbox.apache.org/repos/asf/incubator-streampipes.git.


    from 997d4a7  [hotfix] set broker according to deployment target
     new 613242c  generified evaluation logging
     new 1e8f137  introduced offloading tests
     new 4e57275  fixed issue with offloading target selection

The 3 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../logging/evaluation/EvaluationLogger.java       | 30 +++++++++++++++++-----
 .../controller/api/InvocableEntityResource.java    |  4 +--
 .../offloading/OffloadingPolicyManager.java        |  6 ++---
 .../ThresholdViolationOffloadingPolicy.java        |  5 ++++
 .../selection/RandomSelectionStrategy.java         |  6 ++++-
 .../streampipes/performance/TestFactory.java       | 23 ++++++++++++-----
 .../performance/performancetest/GenericTest.java   |  2 +-
 .../pipeline/PipelineMigrationExecutor.java        | 10 ++++----
 .../migration/MigrationPipelineGenerator.java      |  2 ++
 9 files changed, 63 insertions(+), 25 deletions(-)

[incubator-streampipes] 01/03: generified evaluation logging

Posted by go...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

gomm pushed a commit to branch edge-extensions
in repository https://gitbox.apache.org/repos/asf/incubator-streampipes.git

commit 613242c95a0329c7cb3b9d5db54de14153e5b0cb
Author: daniel-gomm <da...@outlook.de>
AuthorDate: Wed Oct 6 10:59:37 2021 +0200

    generified evaluation logging
---
 .../logging/evaluation/EvaluationLogger.java       | 30 +++++++++++++++++-----
 .../controller/api/InvocableEntityResource.java    |  4 +--
 .../offloading/OffloadingPolicyManager.java        |  6 ++---
 .../ThresholdViolationOffloadingPolicy.java        |  5 ++++
 .../performance/performancetest/GenericTest.java   |  2 +-
 .../pipeline/PipelineMigrationExecutor.java        | 10 ++++----
 6 files changed, 40 insertions(+), 17 deletions(-)

diff --git a/streampipes-logging/src/main/java/org/apache/streampipes/logging/evaluation/EvaluationLogger.java b/streampipes-logging/src/main/java/org/apache/streampipes/logging/evaluation/EvaluationLogger.java
index 9790b0e..19ccb4b 100644
--- a/streampipes-logging/src/main/java/org/apache/streampipes/logging/evaluation/EvaluationLogger.java
+++ b/streampipes-logging/src/main/java/org/apache/streampipes/logging/evaluation/EvaluationLogger.java
@@ -28,6 +28,7 @@ public class EvaluationLogger {
     private static EvaluationLogger instance = null;
     private final MQTT mqtt;
     private final BlockingConnection connection;
+    private final String deviceId;
 
     public static EvaluationLogger getInstance(){
         if(instance==null) instance = new EvaluationLogger();
@@ -36,6 +37,12 @@ public class EvaluationLogger {
 
     private EvaluationLogger(){
         String loggingUrl = System.getenv("SP_LOGGING_MQTT_URL");
+        String nodeId = System.getenv("SP_LOGGING_MQTT_URL");
+        if (nodeId != null){
+            this.deviceId = nodeId;
+        }else {
+            this.deviceId = "default";
+        }
         mqtt = new MQTT();
         try {
             mqtt.setHost(loggingUrl);
@@ -51,16 +58,27 @@ public class EvaluationLogger {
     }
 
     public void logMQTT(String topic, Object[] elements){
+        String message = System.currentTimeMillis() + "," + this.deviceId + ",";
+        for(Object element:elements)
+            message += element + ",";
+        message = message.substring(0, message.length()-1);
+        publish(topic, message);
+    }
+
+    public void logHeader(String topic, Object[] elements){
         String message = "";
         for(Object element:elements)
             message += element + ",";
-        if (message.length() > 0) {
+        if (message.length() > 0)
             message = message.substring(0, message.length()-1);
-            try {
-                connection.publish(topic, message.getBytes(StandardCharsets.UTF_8), QoS.AT_LEAST_ONCE, false);
-            } catch (Exception e) {
-                e.printStackTrace();
-            }
+        publish(topic, message);
+    }
+
+    private void publish(String topic, String message){
+        try {
+            connection.publish(topic, message.getBytes(StandardCharsets.UTF_8), QoS.AT_LEAST_ONCE, false);
+        } catch (Exception e) {
+            e.printStackTrace();
         }
     }
 
diff --git a/streampipes-node-controller/src/main/java/org/apache/streampipes/node/controller/api/InvocableEntityResource.java b/streampipes-node-controller/src/main/java/org/apache/streampipes/node/controller/api/InvocableEntityResource.java
index 324efb5..b005ab7 100644
--- a/streampipes-node-controller/src/main/java/org/apache/streampipes/node/controller/api/InvocableEntityResource.java
+++ b/streampipes-node-controller/src/main/java/org/apache/streampipes/node/controller/api/InvocableEntityResource.java
@@ -84,7 +84,7 @@ public class InvocableEntityResource extends AbstractResource {
         //TODO: Remove Logger after debugging
         InvocableStreamPipesEntity graph = RunningInvocableInstances.INSTANCE.get(runningInstanceId);
         EvaluationLogger logger = EvaluationLogger.getInstance();
-        Object[] line = {System.currentTimeMillis() ,"Element detached"};
+        Object[] line = {"Element detached"};
         logger.logMQTT("Offloading", line);
         Response resp = PipelineElementManager.getInstance().detach(graph, runningInstanceId);
         RunningInvocableInstances.INSTANCE.remove(runningInstanceId);
@@ -114,7 +114,7 @@ public class InvocableEntityResource extends AbstractResource {
                     .get(0))
                     .getValue();
         }
-        Object[] line = {System.currentTimeMillis() ,"reconfiguration request received", nrRuns++, value};
+        Object[] line = {"reconfiguration request received", nrRuns++, value};
         logger.logMQTT("Reconfiguration", line);
         InvocableStreamPipesEntity graph = RunningInvocableInstances.INSTANCE.get(runningInstanceId);
         return ok(PipelineElementManager.getInstance().reconfigure(graph, reconfigurationEntity));
diff --git a/streampipes-node-controller/src/main/java/org/apache/streampipes/node/controller/management/offloading/OffloadingPolicyManager.java b/streampipes-node-controller/src/main/java/org/apache/streampipes/node/controller/management/offloading/OffloadingPolicyManager.java
index e705d3b..73ae24e 100644
--- a/streampipes-node-controller/src/main/java/org/apache/streampipes/node/controller/management/offloading/OffloadingPolicyManager.java
+++ b/streampipes-node-controller/src/main/java/org/apache/streampipes/node/controller/management/offloading/OffloadingPolicyManager.java
@@ -70,7 +70,7 @@ public class OffloadingPolicyManager {
             //Currently uses the first violated policy. Could be extended to take the degree of policy violation into
             // account
             //TODO: Remove Logger after debugging
-            Object[] line = {System.currentTimeMillis() ,"offloading triggered", violatedPolicies.get(0).getOffloadingPolicy().getClass().getSimpleName()};
+            Object[] line = {"offloading triggered", violatedPolicies.get(0).getOffloadingPolicy().getClass().getSimpleName()};
             EvaluationLogger.getInstance().logMQTT("Offloading", line);
             triggerOffloading(violatedPolicies.get(0));
         }
@@ -80,7 +80,7 @@ public class OffloadingPolicyManager {
 
     private void triggerOffloading(OffloadingStrategy strategy){
         InvocableStreamPipesEntity offloadEntity = strategy.getSelectionStrategy().select(this.unsuccessfullyTriedEntities);
-        Object[] line = {System.currentTimeMillis() ,"entity to offload selected"};
+        Object[] line = {"entity to offload selected"};
         EvaluationLogger.getInstance().logMQTT("Offloading", line);
         if(offloadEntity != null){
             Response resp = PipelineElementManager.getInstance().offload(offloadEntity);
@@ -88,7 +88,7 @@ public class OffloadingPolicyManager {
             String appId = offloadEntity.getAppId();
             String pipelineName = offloadEntity.getCorrespondingPipeline();
 
-            Object[] line_done = {System.currentTimeMillis() ,"offloading done", strategy.getOffloadingPolicy().getClass().getSimpleName(), appId};
+            Object[] line_done = {"offloading done", strategy.getOffloadingPolicy().getClass().getSimpleName(), appId};
             EvaluationLogger.getInstance().logMQTT("Offloading", line_done);
 
             if(resp.isSuccess()){
diff --git a/streampipes-node-controller/src/main/java/org/apache/streampipes/node/controller/management/offloading/strategies/policies/ThresholdViolationOffloadingPolicy.java b/streampipes-node-controller/src/main/java/org/apache/streampipes/node/controller/management/offloading/strategies/policies/ThresholdViolationOffloadingPolicy.java
index aa30b3e..39965d9 100644
--- a/streampipes-node-controller/src/main/java/org/apache/streampipes/node/controller/management/offloading/strategies/policies/ThresholdViolationOffloadingPolicy.java
+++ b/streampipes-node-controller/src/main/java/org/apache/streampipes/node/controller/management/offloading/strategies/policies/ThresholdViolationOffloadingPolicy.java
@@ -17,6 +17,7 @@
  */
 package org.apache.streampipes.node.controller.management.offloading.strategies.policies;
 
+import org.apache.streampipes.logging.evaluation.EvaluationLogger;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -64,6 +65,8 @@ public class ThresholdViolationOffloadingPolicy<T extends Comparable<T>> impleme
                 for(T value : this.history){
                     if(value.compareTo(this.threshold) > 0){
                         numViolations++;
+                        Object[] line = {"policy violation #" + numViolations};
+                        EvaluationLogger.getInstance().logMQTT("Offloading", line);
                     }
                 }
                 break;
@@ -71,6 +74,8 @@ public class ThresholdViolationOffloadingPolicy<T extends Comparable<T>> impleme
                 for(T value : this.history){
                     if(value.compareTo(this.threshold) < 0){
                         numViolations++;
+                        Object[] line = {"policy violation #" + numViolations};
+                        EvaluationLogger.getInstance().logMQTT("Offloading", line);
                     }
                 }
                 break;
diff --git a/streampipes-performance-tests/src/main/java/org/apache/streampipes/performance/performancetest/GenericTest.java b/streampipes-performance-tests/src/main/java/org/apache/streampipes/performance/performancetest/GenericTest.java
index 6d6ed70..55ce48b 100644
--- a/streampipes-performance-tests/src/main/java/org/apache/streampipes/performance/performancetest/GenericTest.java
+++ b/streampipes-performance-tests/src/main/java/org/apache/streampipes/performance/performancetest/GenericTest.java
@@ -117,7 +117,7 @@ public class GenericTest implements Test{
                             sp.setValue(Float.toString(this.reconfigurableValue++));
                         }
                     }));
-            line = new Object[]{System.currentTimeMillis(), "Reconfiguration triggered", nrRuns, (this.reconfigurableValue - 1), true};
+            line = new Object[]{"Reconfiguration triggered", nrRuns, (this.reconfigurableValue - 1), true};
             System.out.println("Reconfiguration triggered with value " + (this.reconfigurableValue-1));
             PipelineOperationStatus message = client.pipelines().reconfigure(pipeline);
             System.out.println(message.getTitle());
diff --git a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/pipeline/PipelineMigrationExecutor.java b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/pipeline/PipelineMigrationExecutor.java
index 8b0631d..2de5bc3 100644
--- a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/pipeline/PipelineMigrationExecutor.java
+++ b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/pipeline/PipelineMigrationExecutor.java
@@ -103,7 +103,7 @@ public class PipelineMigrationExecutor extends AbstractPipelineExecutor {
             return status;
         }
         long start_target_duration = System.nanoTime() - before_start_target;
-        Object[] line_start_target = {System.currentTimeMillis(), "start target element","",start_target_duration,start_target_duration/1000000000.0};
+        Object[] line_start_target = {"start target element","",start_target_duration,start_target_duration/1000000000.0};
         logger.logMQTT("Migration", line_start_target);
 
         // Stop relays from origin predecessor
@@ -114,7 +114,7 @@ public class PipelineMigrationExecutor extends AbstractPipelineExecutor {
             return status;
         }
         long stop_relays_origin_duration = System.nanoTime() - downtime_beginning;
-        Object[] line_stop_relay = {System.currentTimeMillis(), "stop relay from origin","",stop_relays_origin_duration,stop_relays_origin_duration/1000000000.0};
+        Object[] line_stop_relay = {"stop relay from origin","",stop_relays_origin_duration,stop_relays_origin_duration/1000000000.0};
         logger.logMQTT("Migration", line_stop_relay);
 
         // Start relays to target after migration
@@ -125,11 +125,11 @@ public class PipelineMigrationExecutor extends AbstractPipelineExecutor {
             return status;
         }
         long start_relay_target_duration = System.nanoTime() - before_start_relay_target;
-        Object[] line_start_relay = {System.currentTimeMillis(), "start relay to target","",start_relay_target_duration,start_relay_target_duration/1000000000.0};
+        Object[] line_start_relay = {"start relay to target","",start_relay_target_duration,start_relay_target_duration/1000000000.0};
         logger.logMQTT("Migration", line_start_relay);
 
         long downtime = System.nanoTime() - downtime_beginning;
-        Object[] line_downtime = {System.currentTimeMillis() ,"downtime", "", downtime, downtime/1000000000.0};
+        Object[] line_downtime = {"downtime", "", downtime, downtime/1000000000.0};
         logger.logMQTT("Migration", line_downtime);
 
         //Stop origin and associated relay
@@ -140,7 +140,7 @@ public class PipelineMigrationExecutor extends AbstractPipelineExecutor {
             return status;
         }
         long stop_origin_duration = System.nanoTime() - before_stop_origin;
-        Object[] line_stop_origin = {System.currentTimeMillis(), "stop origin element","",stop_origin_duration,stop_origin_duration/1000000000.0};
+        Object[] line_stop_origin = {"stop origin element","",stop_origin_duration,stop_origin_duration/1000000000.0};
         logger.logMQTT("Migration", line_stop_origin);
 
         List<InvocableStreamPipesEntity> graphs = new ArrayList<>();

[incubator-streampipes] 03/03: fixed issue with offloading target selection

Posted by go...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

gomm pushed a commit to branch edge-extensions
in repository https://gitbox.apache.org/repos/asf/incubator-streampipes.git

commit 4e57275424c2a11d9cad9514d878aa6fde903f15
Author: daniel-gomm <da...@outlook.de>
AuthorDate: Wed Oct 6 11:00:58 2021 +0200

    fixed issue with offloading target selection
---
 .../offloading/strategies/selection/RandomSelectionStrategy.java    | 6 +++++-
 .../streampipes/manager/migration/MigrationPipelineGenerator.java   | 2 ++
 2 files changed, 7 insertions(+), 1 deletion(-)

diff --git a/streampipes-node-controller/src/main/java/org/apache/streampipes/node/controller/management/offloading/strategies/selection/RandomSelectionStrategy.java b/streampipes-node-controller/src/main/java/org/apache/streampipes/node/controller/management/offloading/strategies/selection/RandomSelectionStrategy.java
index e1de78c..f89c9b5 100644
--- a/streampipes-node-controller/src/main/java/org/apache/streampipes/node/controller/management/offloading/strategies/selection/RandomSelectionStrategy.java
+++ b/streampipes-node-controller/src/main/java/org/apache/streampipes/node/controller/management/offloading/strategies/selection/RandomSelectionStrategy.java
@@ -19,16 +19,20 @@
 package org.apache.streampipes.node.controller.management.offloading.strategies.selection;
 
 import org.apache.streampipes.model.base.InvocableStreamPipesEntity;
+import org.apache.streampipes.model.graph.DataProcessorInvocation;
 import org.apache.streampipes.node.controller.management.pe.storage.RunningInvocableInstances;
 
 import java.util.List;
 import java.util.Random;
+import java.util.stream.Collectors;
 
 public class RandomSelectionStrategy implements SelectionStrategy{
 
     @Override
     public InvocableStreamPipesEntity select(List<InvocableStreamPipesEntity> blacklistedEntities) {
-        List<InvocableStreamPipesEntity> instances = RunningInvocableInstances.INSTANCE.getAll();
+        //Sinks cannot be migrated atm
+        List<InvocableStreamPipesEntity> instances = RunningInvocableInstances.INSTANCE.getAll().stream().
+                filter(e -> e instanceof DataProcessorInvocation).collect(Collectors.toList());
         if(instances.size() == 0)
             return null;
         return instances.get(new Random().nextInt(instances.size()));
diff --git a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/migration/MigrationPipelineGenerator.java b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/migration/MigrationPipelineGenerator.java
index 586b89d..dba5c7b 100644
--- a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/migration/MigrationPipelineGenerator.java
+++ b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/migration/MigrationPipelineGenerator.java
@@ -144,6 +144,8 @@ public class MigrationPipelineGenerator {
                 .filter(dp -> dp.getDeploymentRunningInstanceId().equals(entityToMigrate.getDeploymentRunningInstanceId()))
                 .findFirst();
 
+        if (!originalInvocation.isPresent())
+            return null;
         int index = correspondingPipeline.getSepas().indexOf(originalInvocation.get());
 
         Pipeline targetPipeline;

[incubator-streampipes] 02/03: introduced offloading tests

Posted by go...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

gomm pushed a commit to branch edge-extensions
in repository https://gitbox.apache.org/repos/asf/incubator-streampipes.git

commit 1e8f1374a9162769fff557a1af5f310e4a1b6507
Author: daniel-gomm <da...@outlook.de>
AuthorDate: Wed Oct 6 11:00:13 2021 +0200

    introduced offloading tests
---
 .../streampipes/performance/TestFactory.java       | 23 +++++++++++++++-------
 1 file changed, 16 insertions(+), 7 deletions(-)

diff --git a/streampipes-performance-tests/src/main/java/org/apache/streampipes/performance/TestFactory.java b/streampipes-performance-tests/src/main/java/org/apache/streampipes/performance/TestFactory.java
index 9a8bff1..951a540 100644
--- a/streampipes-performance-tests/src/main/java/org/apache/streampipes/performance/TestFactory.java
+++ b/streampipes-performance-tests/src/main/java/org/apache/streampipes/performance/TestFactory.java
@@ -28,19 +28,23 @@ public class TestFactory {
         EvaluationLogger logger = EvaluationLogger.getInstance();
         switch (System.getenv("TEST_TYPE")){
             case "Deployment":
-                Object[] header_deployment = {"timestampInMillis", "event", "numberOfRuns", "durationInNanos", "durationInSecs"};
-                logger.logMQTT("Deployment", header_deployment);
+                Object[] header_deployment = {"timestampInMillis", "deviceId", "event", "numberOfRuns", "durationInNanos", "durationInSecs"};
+                logger.logHeader("Deployment", header_deployment);
                 return getDeploymentTest();
             case "Latency":
                 return getLatencyTest();
             case "Migration":
-                Object[] header_migration = {"timestampInMillis", "event", "numberOfRuns", "durationInNanos", "durationInSecs", "originNode", "targetNode"};
-                logger.logMQTT("Migration", header_migration);
+                Object[] header_migration = {"timestampInMillis", "deviceId", "event", "numberOfRuns", "durationInNanos", "durationInSecs", "originNode", "targetNode"};
+                logger.logHeader("Migration", header_migration);
                 return getMigrationTest();
             case "Reconfiguration":
-                Object[] header_reconfigure = {"timestampInMillis", "event", "numberOfRuns", "reconfigurationValue"};
-                logger.logMQTT("Reconfiguration", header_reconfigure);
+                Object[] header_reconfigure = {"timestampInMillis", "deviceId", "event", "numberOfRuns", "reconfigurationValue"};
+                logger.logHeader("Reconfiguration", header_reconfigure);
                 return getReconfigurationTest();
+            case "Offloading":
+                Object[] header_offloading = {"timestampInMillis", "deviceId", "event"};
+                logger.logHeader("Offloading", header_offloading);
+                return getOffloadingTest();
             default:
                 throw new RuntimeException("No test configuration found.");
         }
@@ -59,7 +63,7 @@ public class TestFactory {
 
     public static Test getMigrationTest(){
         return new GenericTest(getPipelineName(), false, true,
-                false, 15000, 5000);
+                false, 15000, 7500);
     }
 
     public static Test getReconfigurationTest(){
@@ -67,6 +71,11 @@ public class TestFactory {
                 true, 10000, 5000);
     }
 
+    public static Test getOffloadingTest(){
+        return new GenericTest(getPipelineName(), false, false,
+                true, 20000, 600000);
+    }
+
     //Helpers
     private static String getPipelineName(){
         String pipelineName = System.getenv("TEST_PIPELINE_NAME");