You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@streampipes.apache.org by go...@apache.org on 2021/10/13 18:55:56 UTC

[incubator-streampipes] branch edge-extensions updated (e715c66 -> f037eff)

This is an automated email from the ASF dual-hosted git repository.

gomm pushed a change to branch edge-extensions
in repository https://gitbox.apache.org/repos/asf/incubator-streampipes.git.


    from e715c66  [hotfix] support series connection of identical pipeline elements and call node controller for configuring connect adapters
     new 98c8101  fixed issues with evaluation logging
     new d2bfbd1  introduced policy reset after successful offloading
     new 9010d49  Merge branch 'edge-extensions' of https://github.com/apache/incubator-streampipes into edge-extensions
     new f037eff  update logger and fix logging inconsistencies

The 4 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../logging/evaluation/EvaluationLogger.java       | 23 ++++++++++++++--------
 .../controller/api/InvocableEntityResource.java    |  6 ++----
 .../offloading/OffloadingPolicyManager.java        | 10 ++++------
 .../ThresholdViolationOffloadingPolicy.java        | 14 +++++++++----
 .../statscollector/DockerStatsCollector.java       |  2 +-
 .../streampipes/performance/TestFactory.java       |  6 +++---
 .../performance/performancetest/GenericTest.java   | 15 ++++++++++++--
 .../pipeline/PipelineMigrationExecutor.java        | 15 +++++---------
 8 files changed, 53 insertions(+), 38 deletions(-)

[incubator-streampipes] 01/04: fixed issues with evaluation logging

Posted by go...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

gomm pushed a commit to branch edge-extensions
in repository https://gitbox.apache.org/repos/asf/incubator-streampipes.git

commit 98c810138f65e3bd5c2ae7db337029ccc9812da9
Author: daniel-gomm <da...@outlook.de>
AuthorDate: Fri Oct 8 09:40:12 2021 +0200

    fixed issues with evaluation logging
---
 .../streampipes/logging/evaluation/EvaluationLogger.java  |  2 +-
 .../policies/ThresholdViolationOffloadingPolicy.java      | 15 +++++++++++----
 .../org/apache/streampipes/performance/TestFactory.java   |  4 ++--
 .../performance/performancetest/GenericTest.java          | 15 +++++++++++++--
 4 files changed, 27 insertions(+), 9 deletions(-)

diff --git a/streampipes-logging/src/main/java/org/apache/streampipes/logging/evaluation/EvaluationLogger.java b/streampipes-logging/src/main/java/org/apache/streampipes/logging/evaluation/EvaluationLogger.java
index 19ccb4b..ccd0a7a 100644
--- a/streampipes-logging/src/main/java/org/apache/streampipes/logging/evaluation/EvaluationLogger.java
+++ b/streampipes-logging/src/main/java/org/apache/streampipes/logging/evaluation/EvaluationLogger.java
@@ -37,7 +37,7 @@ public class EvaluationLogger {
 
     private EvaluationLogger(){
         String loggingUrl = System.getenv("SP_LOGGING_MQTT_URL");
-        String nodeId = System.getenv("SP_LOGGING_MQTT_URL");
+        String nodeId = System.getenv("SP_NODE_CONTROLLER_ID");
         if (nodeId != null){
             this.deviceId = nodeId;
         }else {
diff --git a/streampipes-node-controller/src/main/java/org/apache/streampipes/node/controller/management/offloading/strategies/policies/ThresholdViolationOffloadingPolicy.java b/streampipes-node-controller/src/main/java/org/apache/streampipes/node/controller/management/offloading/strategies/policies/ThresholdViolationOffloadingPolicy.java
index 39965d9..e7c60fb 100644
--- a/streampipes-node-controller/src/main/java/org/apache/streampipes/node/controller/management/offloading/strategies/policies/ThresholdViolationOffloadingPolicy.java
+++ b/streampipes-node-controller/src/main/java/org/apache/streampipes/node/controller/management/offloading/strategies/policies/ThresholdViolationOffloadingPolicy.java
@@ -54,6 +54,17 @@ public class ThresholdViolationOffloadingPolicy<T extends Comparable<T>> impleme
         if(!this.history.offer(value)) {
             this.history.poll();
             this.history.offer(value);
+            //Only for logging; can be removed later
+            if(value.compareTo(this.threshold) > 0){
+                int numViolations = 0;
+                for(T val : this.history){
+                    if(val.compareTo(this.threshold) > 0){
+                        numViolations++;
+                    }
+                }
+                Object[] line = {"policy violation #" + numViolations};
+                EvaluationLogger.getInstance().logMQTT("Offloading", line);
+            }
         }
     }
 
@@ -65,8 +76,6 @@ public class ThresholdViolationOffloadingPolicy<T extends Comparable<T>> impleme
                 for(T value : this.history){
                     if(value.compareTo(this.threshold) > 0){
                         numViolations++;
-                        Object[] line = {"policy violation #" + numViolations};
-                        EvaluationLogger.getInstance().logMQTT("Offloading", line);
                     }
                 }
                 break;
@@ -74,8 +83,6 @@ public class ThresholdViolationOffloadingPolicy<T extends Comparable<T>> impleme
                 for(T value : this.history){
                     if(value.compareTo(this.threshold) < 0){
                         numViolations++;
-                        Object[] line = {"policy violation #" + numViolations};
-                        EvaluationLogger.getInstance().logMQTT("Offloading", line);
                     }
                 }
                 break;
diff --git a/streampipes-performance-tests/src/main/java/org/apache/streampipes/performance/TestFactory.java b/streampipes-performance-tests/src/main/java/org/apache/streampipes/performance/TestFactory.java
index 951a540..4cb6dbe 100644
--- a/streampipes-performance-tests/src/main/java/org/apache/streampipes/performance/TestFactory.java
+++ b/streampipes-performance-tests/src/main/java/org/apache/streampipes/performance/TestFactory.java
@@ -34,7 +34,7 @@ public class TestFactory {
             case "Latency":
                 return getLatencyTest();
             case "Migration":
-                Object[] header_migration = {"timestampInMillis", "deviceId", "event", "numberOfRuns", "durationInNanos", "durationInSecs", "originNode", "targetNode"};
+                Object[] header_migration = {"timestampInMillis", "deviceId", "event", "numberOfRuns", "durationInNanos", "durationInSecs", "originNode", "targetNode", "success"};
                 logger.logHeader("Migration", header_migration);
                 return getMigrationTest();
             case "Reconfiguration":
@@ -73,7 +73,7 @@ public class TestFactory {
 
     public static Test getOffloadingTest(){
         return new GenericTest(getPipelineName(), false, false,
-                true, 20000, 600000);
+                true, 20000, 1500000);
     }
 
     //Helpers
diff --git a/streampipes-performance-tests/src/main/java/org/apache/streampipes/performance/performancetest/GenericTest.java b/streampipes-performance-tests/src/main/java/org/apache/streampipes/performance/performancetest/GenericTest.java
index 55ce48b..892a5c6 100644
--- a/streampipes-performance-tests/src/main/java/org/apache/streampipes/performance/performancetest/GenericTest.java
+++ b/streampipes-performance-tests/src/main/java/org/apache/streampipes/performance/performancetest/GenericTest.java
@@ -98,7 +98,7 @@ public class GenericTest implements Test{
             PipelineOperationStatus migrationMessage = client.pipelines().migrate(pipeline);
             long migrationDuration = System.nanoTime() - beforeMigration;
             if(testType.equals("Migration")){
-                line = new Object[]{System.currentTimeMillis(), "Migration duration", nrRuns, migrationDuration, migrationDuration/1000000000.0,migrationNodes.a, migrationNodes.b, true};
+                line = new Object[]{"Migration duration", nrRuns, migrationDuration, migrationDuration/1000000000.0,migrationNodes.a, migrationNodes.b, true};
             }
             System.out.println(migrationMessage.getTitle());
             if (!migrationMessage.isSuccess()) {
@@ -108,7 +108,8 @@ public class GenericTest implements Test{
         }
         //Reconfiguration
         if (shouldBeReconfigured) {
-            pipeline.getSepas().forEach(p -> p.getStaticProperties().stream()
+            if (testType.equals("Reconfiguration"))
+                pipeline.getSepas().forEach(p -> p.getStaticProperties().stream()
                     .filter(FreeTextStaticProperty.class::isInstance)
                     .map(FreeTextStaticProperty.class::cast)
                     .filter(FreeTextStaticProperty::isReconfigurable)
@@ -117,6 +118,16 @@ public class GenericTest implements Test{
                             sp.setValue(Float.toString(this.reconfigurableValue++));
                         }
                     }));
+            else if (testType.equals("Offloading"))
+                pipeline.getSepas().forEach(p -> p.getStaticProperties().stream()
+                        .filter(FreeTextStaticProperty.class::isInstance)
+                        .map(FreeTextStaticProperty.class::cast)
+                        .filter(FreeTextStaticProperty::isReconfigurable)
+                        .forEach(sp -> {
+                            if (sp.getInternalName().equals("load")) {
+                                sp.setValue(Float.toString(0.9f));
+                            }
+                        }));
             line = new Object[]{"Reconfiguration triggered", nrRuns, (this.reconfigurableValue - 1), true};
             System.out.println("Reconfiguration triggered with value " + (this.reconfigurableValue-1));
             PipelineOperationStatus message = client.pipelines().reconfigure(pipeline);

[incubator-streampipes] 04/04: update logger and fix logging inconsistencies

Posted by go...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

gomm pushed a commit to branch edge-extensions
in repository https://gitbox.apache.org/repos/asf/incubator-streampipes.git

commit f037eff63036a2cdba14453c574d672c1a10da92
Author: daniel-gomm <da...@outlook.de>
AuthorDate: Wed Oct 13 20:48:46 2021 +0200

    update logger and fix logging inconsistencies
---
 .../logging/evaluation/EvaluationLogger.java        | 21 ++++++++++++++-------
 .../controller/api/InvocableEntityResource.java     |  6 ++----
 .../offloading/OffloadingPolicyManager.java         |  9 +++------
 .../ThresholdViolationOffloadingPolicy.java         |  3 +--
 .../statscollector/DockerStatsCollector.java        |  2 +-
 .../apache/streampipes/performance/TestFactory.java |  2 +-
 .../pipeline/PipelineMigrationExecutor.java         | 15 +++++----------
 7 files changed, 27 insertions(+), 31 deletions(-)

diff --git a/streampipes-logging/src/main/java/org/apache/streampipes/logging/evaluation/EvaluationLogger.java b/streampipes-logging/src/main/java/org/apache/streampipes/logging/evaluation/EvaluationLogger.java
index ccd0a7a..1f9807c 100644
--- a/streampipes-logging/src/main/java/org/apache/streampipes/logging/evaluation/EvaluationLogger.java
+++ b/streampipes-logging/src/main/java/org/apache/streampipes/logging/evaluation/EvaluationLogger.java
@@ -26,7 +26,6 @@ import java.nio.charset.StandardCharsets;
 
 public class EvaluationLogger {
     private static EvaluationLogger instance = null;
-    private final MQTT mqtt;
     private final BlockingConnection connection;
     private final String deviceId;
 
@@ -43,7 +42,7 @@ public class EvaluationLogger {
         }else {
             this.deviceId = "default";
         }
-        mqtt = new MQTT();
+        MQTT mqtt = new MQTT();
         try {
             mqtt.setHost(loggingUrl);
         } catch (URISyntaxException e) {
@@ -57,21 +56,29 @@ public class EvaluationLogger {
         }
     }
 
-    public void logMQTT(String topic, Object[] elements){
+    /**public void logMQTT(String topic, Object[] elements){
         String message = System.currentTimeMillis() + "," + this.deviceId + ",";
         for(Object element:elements)
             message += element + ",";
         message = message.substring(0, message.length()-1);
         publish(topic, message);
+    }**/
+
+    public void logMQTT(String topic, Object ... elements){
+        StringBuilder message = new StringBuilder(System.currentTimeMillis() + "," + this.deviceId + ",");
+        for(Object element:elements)
+            message.append(element).append(",");
+        message = new StringBuilder(message.substring(0, message.length() - 1));
+        publish(topic, message.toString());
     }
 
     public void logHeader(String topic, Object[] elements){
-        String message = "";
+        StringBuilder message = new StringBuilder();
         for(Object element:elements)
-            message += element + ",";
+            message.append(element).append(",");
         if (message.length() > 0)
-            message = message.substring(0, message.length()-1);
-        publish(topic, message);
+            message = new StringBuilder(message.substring(0, message.length() - 1));
+        publish(topic, message.toString());
     }
 
     private void publish(String topic, String message){
diff --git a/streampipes-node-controller/src/main/java/org/apache/streampipes/node/controller/api/InvocableEntityResource.java b/streampipes-node-controller/src/main/java/org/apache/streampipes/node/controller/api/InvocableEntityResource.java
index b005ab7..f950fa5 100644
--- a/streampipes-node-controller/src/main/java/org/apache/streampipes/node/controller/api/InvocableEntityResource.java
+++ b/streampipes-node-controller/src/main/java/org/apache/streampipes/node/controller/api/InvocableEntityResource.java
@@ -84,8 +84,7 @@ public class InvocableEntityResource extends AbstractResource {
         //TODO: Remove Logger after debugging
         InvocableStreamPipesEntity graph = RunningInvocableInstances.INSTANCE.get(runningInstanceId);
         EvaluationLogger logger = EvaluationLogger.getInstance();
-        Object[] line = {"Element detached"};
-        logger.logMQTT("Offloading", line);
+        logger.logMQTT("Offloading", "Element detached");
         Response resp = PipelineElementManager.getInstance().detach(graph, runningInstanceId);
         RunningInvocableInstances.INSTANCE.remove(runningInstanceId);
 
@@ -114,8 +113,7 @@ public class InvocableEntityResource extends AbstractResource {
                     .get(0))
                     .getValue();
         }
-        Object[] line = {"reconfiguration request received", nrRuns++, value};
-        logger.logMQTT("Reconfiguration", line);
+        logger.logMQTT("Reconfiguration", "reconfiguration request received", nrRuns++, value);
         InvocableStreamPipesEntity graph = RunningInvocableInstances.INSTANCE.get(runningInstanceId);
         return ok(PipelineElementManager.getInstance().reconfigure(graph, reconfigurationEntity));
     }
diff --git a/streampipes-node-controller/src/main/java/org/apache/streampipes/node/controller/management/offloading/OffloadingPolicyManager.java b/streampipes-node-controller/src/main/java/org/apache/streampipes/node/controller/management/offloading/OffloadingPolicyManager.java
index 5d745ce..2fc1271 100644
--- a/streampipes-node-controller/src/main/java/org/apache/streampipes/node/controller/management/offloading/OffloadingPolicyManager.java
+++ b/streampipes-node-controller/src/main/java/org/apache/streampipes/node/controller/management/offloading/OffloadingPolicyManager.java
@@ -70,8 +70,7 @@ public class OffloadingPolicyManager {
             //Currently uses the first violated policy. Could be extended to take the degree of policy violation into
             // account
             //TODO: Remove Logger after debugging
-            Object[] line = {"offloading triggered", violatedPolicies.get(0).getOffloadingPolicy().getClass().getSimpleName()};
-            EvaluationLogger.getInstance().logMQTT("Offloading", line);
+            EvaluationLogger.getInstance().logMQTT("Offloading", "offloading triggered", violatedPolicies.get(0).getOffloadingPolicy().getClass().getSimpleName());
             triggerOffloading(violatedPolicies.get(0));
         }
         //Blacklist of entities is cleared when no policies were violated.
@@ -80,16 +79,14 @@ public class OffloadingPolicyManager {
 
     private void triggerOffloading(OffloadingStrategy strategy){
         InvocableStreamPipesEntity offloadEntity = strategy.getSelectionStrategy().select(this.unsuccessfullyTriedEntities);
-        Object[] line = {"entity to offload selected"};
-        EvaluationLogger.getInstance().logMQTT("Offloading", line);
+        EvaluationLogger.getInstance().logMQTT("Offloading", "entity to offload selected");
         if(offloadEntity != null){
             Response resp = PipelineElementManager.getInstance().offload(offloadEntity);
 
             String appId = offloadEntity.getAppId();
             String pipelineName = offloadEntity.getCorrespondingPipeline();
 
-            Object[] line_done = {"offloading done", strategy.getOffloadingPolicy().getClass().getSimpleName(), appId};
-            EvaluationLogger.getInstance().logMQTT("Offloading", line_done);
+            EvaluationLogger.getInstance().logMQTT("Offloading", "offloading done", strategy.getOffloadingPolicy().getClass().getSimpleName(), appId);
 
             if(resp.isSuccess()){
                 LOG.info("Successfully offloaded: {} of pipeline: {}", appId, pipelineName);
diff --git a/streampipes-node-controller/src/main/java/org/apache/streampipes/node/controller/management/offloading/strategies/policies/ThresholdViolationOffloadingPolicy.java b/streampipes-node-controller/src/main/java/org/apache/streampipes/node/controller/management/offloading/strategies/policies/ThresholdViolationOffloadingPolicy.java
index e7c60fb..95eb8ff 100644
--- a/streampipes-node-controller/src/main/java/org/apache/streampipes/node/controller/management/offloading/strategies/policies/ThresholdViolationOffloadingPolicy.java
+++ b/streampipes-node-controller/src/main/java/org/apache/streampipes/node/controller/management/offloading/strategies/policies/ThresholdViolationOffloadingPolicy.java
@@ -62,8 +62,7 @@ public class ThresholdViolationOffloadingPolicy<T extends Comparable<T>> impleme
                         numViolations++;
                     }
                 }
-                Object[] line = {"policy violation #" + numViolations};
-                EvaluationLogger.getInstance().logMQTT("Offloading", line);
+                EvaluationLogger.getInstance().logMQTT("Offloading", "policy violation #" + numViolations);
             }
         }
     }
diff --git a/streampipes-node-controller/src/main/java/org/apache/streampipes/node/controller/management/statscollector/DockerStatsCollector.java b/streampipes-node-controller/src/main/java/org/apache/streampipes/node/controller/management/statscollector/DockerStatsCollector.java
index 2d038fc..e812b02 100644
--- a/streampipes-node-controller/src/main/java/org/apache/streampipes/node/controller/management/statscollector/DockerStatsCollector.java
+++ b/streampipes-node-controller/src/main/java/org/apache/streampipes/node/controller/management/statscollector/DockerStatsCollector.java
@@ -72,7 +72,7 @@ public class DockerStatsCollector {
                 "netTxHumanReadable"
         };
 
-        EvaluationLogger.getInstance().logMQTT(LOGGING_TOPIC, header);
+        EvaluationLogger.getInstance().logHeader(LOGGING_TOPIC, header);
     }
 
     private final Runnable collect = () -> {
diff --git a/streampipes-performance-tests/src/main/java/org/apache/streampipes/performance/TestFactory.java b/streampipes-performance-tests/src/main/java/org/apache/streampipes/performance/TestFactory.java
index 4cb6dbe..717095f 100644
--- a/streampipes-performance-tests/src/main/java/org/apache/streampipes/performance/TestFactory.java
+++ b/streampipes-performance-tests/src/main/java/org/apache/streampipes/performance/TestFactory.java
@@ -42,7 +42,7 @@ public class TestFactory {
                 logger.logHeader("Reconfiguration", header_reconfigure);
                 return getReconfigurationTest();
             case "Offloading":
-                Object[] header_offloading = {"timestampInMillis", "deviceId", "event"};
+                Object[] header_offloading = {"timestampInMillis", "deviceId", "event", "policy", "selectedProcessor"};
                 logger.logHeader("Offloading", header_offloading);
                 return getOffloadingTest();
             default:
diff --git a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/pipeline/PipelineMigrationExecutor.java b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/pipeline/PipelineMigrationExecutor.java
index 2de5bc3..e1f40e4 100644
--- a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/pipeline/PipelineMigrationExecutor.java
+++ b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/pipeline/PipelineMigrationExecutor.java
@@ -103,8 +103,7 @@ public class PipelineMigrationExecutor extends AbstractPipelineExecutor {
             return status;
         }
         long start_target_duration = System.nanoTime() - before_start_target;
-        Object[] line_start_target = {"start target element","",start_target_duration,start_target_duration/1000000000.0};
-        logger.logMQTT("Migration", line_start_target);
+        logger.logMQTT("Migration", "start target element","",start_target_duration,start_target_duration/1000000000.0);
 
         // Stop relays from origin predecessor
         long downtime_beginning = System.nanoTime();
@@ -114,8 +113,7 @@ public class PipelineMigrationExecutor extends AbstractPipelineExecutor {
             return status;
         }
         long stop_relays_origin_duration = System.nanoTime() - downtime_beginning;
-        Object[] line_stop_relay = {"stop relay from origin","",stop_relays_origin_duration,stop_relays_origin_duration/1000000000.0};
-        logger.logMQTT("Migration", line_stop_relay);
+        logger.logMQTT("Migration", "stop relay from origin","",stop_relays_origin_duration,stop_relays_origin_duration/1000000000.0);
 
         // Start relays to target after migration
         long before_start_relay_target = System.nanoTime();
@@ -125,12 +123,10 @@ public class PipelineMigrationExecutor extends AbstractPipelineExecutor {
             return status;
         }
         long start_relay_target_duration = System.nanoTime() - before_start_relay_target;
-        Object[] line_start_relay = {"start relay to target","",start_relay_target_duration,start_relay_target_duration/1000000000.0};
-        logger.logMQTT("Migration", line_start_relay);
+        logger.logMQTT("Migration", "start relay to target","",start_relay_target_duration,start_relay_target_duration/1000000000.0);
 
         long downtime = System.nanoTime() - downtime_beginning;
-        Object[] line_downtime = {"downtime", "", downtime, downtime/1000000000.0};
-        logger.logMQTT("Migration", line_downtime);
+        logger.logMQTT("Migration", "downtime", "", downtime, downtime/1000000000.0);
 
         //Stop origin and associated relay
         long before_stop_origin = System.nanoTime();
@@ -140,8 +136,7 @@ public class PipelineMigrationExecutor extends AbstractPipelineExecutor {
             return status;
         }
         long stop_origin_duration = System.nanoTime() - before_stop_origin;
-        Object[] line_stop_origin = {"stop origin element","",stop_origin_duration,stop_origin_duration/1000000000.0};
-        logger.logMQTT("Migration", line_stop_origin);
+        logger.logMQTT("Migration", "stop origin element","",stop_origin_duration,stop_origin_duration/1000000000.0);
 
         List<InvocableStreamPipesEntity> graphs = new ArrayList<>();
         graphs.addAll(pipeline.getActions());

[incubator-streampipes] 03/04: Merge branch 'edge-extensions' of https://github.com/apache/incubator-streampipes into edge-extensions

Posted by go...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

gomm pushed a commit to branch edge-extensions
in repository https://gitbox.apache.org/repos/asf/incubator-streampipes.git

commit 9010d4902ae439934c920cd54ee78c97604b08ab
Merge: d2bfbd1 e715c66
Author: daniel-gomm <da...@outlook.de>
AuthorDate: Wed Oct 13 20:46:17 2021 +0200

    Merge branch 'edge-extensions' of https://github.com/apache/incubator-streampipes into edge-extensions

 .../streampipes/connect/adapter/Adapter.java       |  9 +--
 .../connect/adapter/GroundingService.java          | 26 ++------
 .../connect/adapter/NodeControllerService.java     | 74 ++++++++++++++++++++++
 .../docker/AbstractStreamPipesDockerContainer.java |  2 +
 .../components/pipeline/pipeline.component.ts      |  7 +-
 ui/src/app/editor/services/jsplumb.service.ts      | 24 +++++--
 6 files changed, 106 insertions(+), 36 deletions(-)

[incubator-streampipes] 02/04: introduced policy reset after successful offloading

Posted by go...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

gomm pushed a commit to branch edge-extensions
in repository https://gitbox.apache.org/repos/asf/incubator-streampipes.git

commit d2bfbd106e4dd1bce144c246f8b75630b7af54ac
Author: daniel-gomm <da...@outlook.de>
AuthorDate: Fri Oct 8 09:41:28 2021 +0200

    introduced policy reset after successful offloading
---
 .../node/controller/management/offloading/OffloadingPolicyManager.java   | 1 +
 1 file changed, 1 insertion(+)

diff --git a/streampipes-node-controller/src/main/java/org/apache/streampipes/node/controller/management/offloading/OffloadingPolicyManager.java b/streampipes-node-controller/src/main/java/org/apache/streampipes/node/controller/management/offloading/OffloadingPolicyManager.java
index 73ae24e..5d745ce 100644
--- a/streampipes-node-controller/src/main/java/org/apache/streampipes/node/controller/management/offloading/OffloadingPolicyManager.java
+++ b/streampipes-node-controller/src/main/java/org/apache/streampipes/node/controller/management/offloading/OffloadingPolicyManager.java
@@ -93,6 +93,7 @@ public class OffloadingPolicyManager {
 
             if(resp.isSuccess()){
                 LOG.info("Successfully offloaded: {} of pipeline: {}", appId, pipelineName);
+                strategy.getOffloadingPolicy().reset();
             } else{
                 LOG.warn("Failed to offload: {} of pipeline: {}", appId, pipelineName);
                 unsuccessfullyTriedEntities.add(offloadEntity);