You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@streampipes.apache.org by go...@apache.org on 2021/10/13 18:55:56 UTC
[incubator-streampipes] branch edge-extensions updated (e715c66 ->
f037eff)
This is an automated email from the ASF dual-hosted git repository.
gomm pushed a change to branch edge-extensions
in repository https://gitbox.apache.org/repos/asf/incubator-streampipes.git.
from e715c66 [hotfix] support series connection of identical pipeline elements and call node controller for configuring connect adapters
new 98c8101 fixed issues with evaluation logging
new d2bfbd1 introduced policy reset after successful offloading
new 9010d49 Merge branch 'edge-extensions' of https://github.com/apache/incubator-streampipes into edge-extensions
new f037eff update logger and fix logging inconsistencies
The 4 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails. The revisions
listed as "add" were already present in the repository and have only
been added to this reference.
Summary of changes:
.../logging/evaluation/EvaluationLogger.java | 23 ++++++++++++++--------
.../controller/api/InvocableEntityResource.java | 6 ++----
.../offloading/OffloadingPolicyManager.java | 10 ++++------
.../ThresholdViolationOffloadingPolicy.java | 14 +++++++++----
.../statscollector/DockerStatsCollector.java | 2 +-
.../streampipes/performance/TestFactory.java | 6 +++---
.../performance/performancetest/GenericTest.java | 15 ++++++++++++--
.../pipeline/PipelineMigrationExecutor.java | 15 +++++---------
8 files changed, 53 insertions(+), 38 deletions(-)
[incubator-streampipes] 01/04: fixed issues with evaluation logging
Posted by go...@apache.org.
This is an automated email from the ASF dual-hosted git repository.
gomm pushed a commit to branch edge-extensions
in repository https://gitbox.apache.org/repos/asf/incubator-streampipes.git
commit 98c810138f65e3bd5c2ae7db337029ccc9812da9
Author: daniel-gomm <da...@outlook.de>
AuthorDate: Fri Oct 8 09:40:12 2021 +0200
fixed issues with evaluation logging
---
.../streampipes/logging/evaluation/EvaluationLogger.java | 2 +-
.../policies/ThresholdViolationOffloadingPolicy.java | 15 +++++++++++----
.../org/apache/streampipes/performance/TestFactory.java | 4 ++--
.../performance/performancetest/GenericTest.java | 15 +++++++++++++--
4 files changed, 27 insertions(+), 9 deletions(-)
diff --git a/streampipes-logging/src/main/java/org/apache/streampipes/logging/evaluation/EvaluationLogger.java b/streampipes-logging/src/main/java/org/apache/streampipes/logging/evaluation/EvaluationLogger.java
index 19ccb4b..ccd0a7a 100644
--- a/streampipes-logging/src/main/java/org/apache/streampipes/logging/evaluation/EvaluationLogger.java
+++ b/streampipes-logging/src/main/java/org/apache/streampipes/logging/evaluation/EvaluationLogger.java
@@ -37,7 +37,7 @@ public class EvaluationLogger {
private EvaluationLogger(){
String loggingUrl = System.getenv("SP_LOGGING_MQTT_URL");
- String nodeId = System.getenv("SP_LOGGING_MQTT_URL");
+ String nodeId = System.getenv("SP_NODE_CONTROLLER_ID");
if (nodeId != null){
this.deviceId = nodeId;
}else {
diff --git a/streampipes-node-controller/src/main/java/org/apache/streampipes/node/controller/management/offloading/strategies/policies/ThresholdViolationOffloadingPolicy.java b/streampipes-node-controller/src/main/java/org/apache/streampipes/node/controller/management/offloading/strategies/policies/ThresholdViolationOffloadingPolicy.java
index 39965d9..e7c60fb 100644
--- a/streampipes-node-controller/src/main/java/org/apache/streampipes/node/controller/management/offloading/strategies/policies/ThresholdViolationOffloadingPolicy.java
+++ b/streampipes-node-controller/src/main/java/org/apache/streampipes/node/controller/management/offloading/strategies/policies/ThresholdViolationOffloadingPolicy.java
@@ -54,6 +54,17 @@ public class ThresholdViolationOffloadingPolicy<T extends Comparable<T>> impleme
if(!this.history.offer(value)) {
this.history.poll();
this.history.offer(value);
+ //Only for logging; can be removed later
+ if(value.compareTo(this.threshold) > 0){
+ int numViolations = 0;
+ for(T val : this.history){
+ if(val.compareTo(this.threshold) > 0){
+ numViolations++;
+ }
+ }
+ Object[] line = {"policy violation #" + numViolations};
+ EvaluationLogger.getInstance().logMQTT("Offloading", line);
+ }
}
}
@@ -65,8 +76,6 @@ public class ThresholdViolationOffloadingPolicy<T extends Comparable<T>> impleme
for(T value : this.history){
if(value.compareTo(this.threshold) > 0){
numViolations++;
- Object[] line = {"policy violation #" + numViolations};
- EvaluationLogger.getInstance().logMQTT("Offloading", line);
}
}
break;
@@ -74,8 +83,6 @@ public class ThresholdViolationOffloadingPolicy<T extends Comparable<T>> impleme
for(T value : this.history){
if(value.compareTo(this.threshold) < 0){
numViolations++;
- Object[] line = {"policy violation #" + numViolations};
- EvaluationLogger.getInstance().logMQTT("Offloading", line);
}
}
break;
diff --git a/streampipes-performance-tests/src/main/java/org/apache/streampipes/performance/TestFactory.java b/streampipes-performance-tests/src/main/java/org/apache/streampipes/performance/TestFactory.java
index 951a540..4cb6dbe 100644
--- a/streampipes-performance-tests/src/main/java/org/apache/streampipes/performance/TestFactory.java
+++ b/streampipes-performance-tests/src/main/java/org/apache/streampipes/performance/TestFactory.java
@@ -34,7 +34,7 @@ public class TestFactory {
case "Latency":
return getLatencyTest();
case "Migration":
- Object[] header_migration = {"timestampInMillis", "deviceId", "event", "numberOfRuns", "durationInNanos", "durationInSecs", "originNode", "targetNode"};
+ Object[] header_migration = {"timestampInMillis", "deviceId", "event", "numberOfRuns", "durationInNanos", "durationInSecs", "originNode", "targetNode", "success"};
logger.logHeader("Migration", header_migration);
return getMigrationTest();
case "Reconfiguration":
@@ -73,7 +73,7 @@ public class TestFactory {
public static Test getOffloadingTest(){
return new GenericTest(getPipelineName(), false, false,
- true, 20000, 600000);
+ true, 20000, 1500000);
}
//Helpers
diff --git a/streampipes-performance-tests/src/main/java/org/apache/streampipes/performance/performancetest/GenericTest.java b/streampipes-performance-tests/src/main/java/org/apache/streampipes/performance/performancetest/GenericTest.java
index 55ce48b..892a5c6 100644
--- a/streampipes-performance-tests/src/main/java/org/apache/streampipes/performance/performancetest/GenericTest.java
+++ b/streampipes-performance-tests/src/main/java/org/apache/streampipes/performance/performancetest/GenericTest.java
@@ -98,7 +98,7 @@ public class GenericTest implements Test{
PipelineOperationStatus migrationMessage = client.pipelines().migrate(pipeline);
long migrationDuration = System.nanoTime() - beforeMigration;
if(testType.equals("Migration")){
- line = new Object[]{System.currentTimeMillis(), "Migration duration", nrRuns, migrationDuration, migrationDuration/1000000000.0,migrationNodes.a, migrationNodes.b, true};
+ line = new Object[]{"Migration duration", nrRuns, migrationDuration, migrationDuration/1000000000.0,migrationNodes.a, migrationNodes.b, true};
}
System.out.println(migrationMessage.getTitle());
if (!migrationMessage.isSuccess()) {
@@ -108,7 +108,8 @@ public class GenericTest implements Test{
}
//Reconfiguration
if (shouldBeReconfigured) {
- pipeline.getSepas().forEach(p -> p.getStaticProperties().stream()
+ if (testType.equals("Reconfiguration"))
+ pipeline.getSepas().forEach(p -> p.getStaticProperties().stream()
.filter(FreeTextStaticProperty.class::isInstance)
.map(FreeTextStaticProperty.class::cast)
.filter(FreeTextStaticProperty::isReconfigurable)
@@ -117,6 +118,16 @@ public class GenericTest implements Test{
sp.setValue(Float.toString(this.reconfigurableValue++));
}
}));
+ else if (testType.equals("Offloading"))
+ pipeline.getSepas().forEach(p -> p.getStaticProperties().stream()
+ .filter(FreeTextStaticProperty.class::isInstance)
+ .map(FreeTextStaticProperty.class::cast)
+ .filter(FreeTextStaticProperty::isReconfigurable)
+ .forEach(sp -> {
+ if (sp.getInternalName().equals("load")) {
+ sp.setValue(Float.toString(0.9f));
+ }
+ }));
line = new Object[]{"Reconfiguration triggered", nrRuns, (this.reconfigurableValue - 1), true};
System.out.println("Reconfiguration triggered with value " + (this.reconfigurableValue-1));
PipelineOperationStatus message = client.pipelines().reconfigure(pipeline);
[incubator-streampipes] 04/04: update logger and fix logging
inconsistencies
Posted by go...@apache.org.
This is an automated email from the ASF dual-hosted git repository.
gomm pushed a commit to branch edge-extensions
in repository https://gitbox.apache.org/repos/asf/incubator-streampipes.git
commit f037eff63036a2cdba14453c574d672c1a10da92
Author: daniel-gomm <da...@outlook.de>
AuthorDate: Wed Oct 13 20:48:46 2021 +0200
update logger and fix logging inconsistencies
---
.../logging/evaluation/EvaluationLogger.java | 21 ++++++++++++++-------
.../controller/api/InvocableEntityResource.java | 6 ++----
.../offloading/OffloadingPolicyManager.java | 9 +++------
.../ThresholdViolationOffloadingPolicy.java | 3 +--
.../statscollector/DockerStatsCollector.java | 2 +-
.../apache/streampipes/performance/TestFactory.java | 2 +-
.../pipeline/PipelineMigrationExecutor.java | 15 +++++----------
7 files changed, 27 insertions(+), 31 deletions(-)
diff --git a/streampipes-logging/src/main/java/org/apache/streampipes/logging/evaluation/EvaluationLogger.java b/streampipes-logging/src/main/java/org/apache/streampipes/logging/evaluation/EvaluationLogger.java
index ccd0a7a..1f9807c 100644
--- a/streampipes-logging/src/main/java/org/apache/streampipes/logging/evaluation/EvaluationLogger.java
+++ b/streampipes-logging/src/main/java/org/apache/streampipes/logging/evaluation/EvaluationLogger.java
@@ -26,7 +26,6 @@ import java.nio.charset.StandardCharsets;
public class EvaluationLogger {
private static EvaluationLogger instance = null;
- private final MQTT mqtt;
private final BlockingConnection connection;
private final String deviceId;
@@ -43,7 +42,7 @@ public class EvaluationLogger {
}else {
this.deviceId = "default";
}
- mqtt = new MQTT();
+ MQTT mqtt = new MQTT();
try {
mqtt.setHost(loggingUrl);
} catch (URISyntaxException e) {
@@ -57,21 +56,29 @@ public class EvaluationLogger {
}
}
- public void logMQTT(String topic, Object[] elements){
+ /**public void logMQTT(String topic, Object[] elements){
String message = System.currentTimeMillis() + "," + this.deviceId + ",";
for(Object element:elements)
message += element + ",";
message = message.substring(0, message.length()-1);
publish(topic, message);
+ }**/
+
+ public void logMQTT(String topic, Object ... elements){
+ StringBuilder message = new StringBuilder(System.currentTimeMillis() + "," + this.deviceId + ",");
+ for(Object element:elements)
+ message.append(element).append(",");
+ message = new StringBuilder(message.substring(0, message.length() - 1));
+ publish(topic, message.toString());
}
public void logHeader(String topic, Object[] elements){
- String message = "";
+ StringBuilder message = new StringBuilder();
for(Object element:elements)
- message += element + ",";
+ message.append(element).append(",");
if (message.length() > 0)
- message = message.substring(0, message.length()-1);
- publish(topic, message);
+ message = new StringBuilder(message.substring(0, message.length() - 1));
+ publish(topic, message.toString());
}
private void publish(String topic, String message){
diff --git a/streampipes-node-controller/src/main/java/org/apache/streampipes/node/controller/api/InvocableEntityResource.java b/streampipes-node-controller/src/main/java/org/apache/streampipes/node/controller/api/InvocableEntityResource.java
index b005ab7..f950fa5 100644
--- a/streampipes-node-controller/src/main/java/org/apache/streampipes/node/controller/api/InvocableEntityResource.java
+++ b/streampipes-node-controller/src/main/java/org/apache/streampipes/node/controller/api/InvocableEntityResource.java
@@ -84,8 +84,7 @@ public class InvocableEntityResource extends AbstractResource {
//TODO: Remove Logger after debugging
InvocableStreamPipesEntity graph = RunningInvocableInstances.INSTANCE.get(runningInstanceId);
EvaluationLogger logger = EvaluationLogger.getInstance();
- Object[] line = {"Element detached"};
- logger.logMQTT("Offloading", line);
+ logger.logMQTT("Offloading", "Element detached");
Response resp = PipelineElementManager.getInstance().detach(graph, runningInstanceId);
RunningInvocableInstances.INSTANCE.remove(runningInstanceId);
@@ -114,8 +113,7 @@ public class InvocableEntityResource extends AbstractResource {
.get(0))
.getValue();
}
- Object[] line = {"reconfiguration request received", nrRuns++, value};
- logger.logMQTT("Reconfiguration", line);
+ logger.logMQTT("Reconfiguration", "reconfiguration request received", nrRuns++, value);
InvocableStreamPipesEntity graph = RunningInvocableInstances.INSTANCE.get(runningInstanceId);
return ok(PipelineElementManager.getInstance().reconfigure(graph, reconfigurationEntity));
}
diff --git a/streampipes-node-controller/src/main/java/org/apache/streampipes/node/controller/management/offloading/OffloadingPolicyManager.java b/streampipes-node-controller/src/main/java/org/apache/streampipes/node/controller/management/offloading/OffloadingPolicyManager.java
index 5d745ce..2fc1271 100644
--- a/streampipes-node-controller/src/main/java/org/apache/streampipes/node/controller/management/offloading/OffloadingPolicyManager.java
+++ b/streampipes-node-controller/src/main/java/org/apache/streampipes/node/controller/management/offloading/OffloadingPolicyManager.java
@@ -70,8 +70,7 @@ public class OffloadingPolicyManager {
//Currently uses the first violated policy. Could be extended to take the degree of policy violation into
// account
//TODO: Remove Logger after debugging
- Object[] line = {"offloading triggered", violatedPolicies.get(0).getOffloadingPolicy().getClass().getSimpleName()};
- EvaluationLogger.getInstance().logMQTT("Offloading", line);
+ EvaluationLogger.getInstance().logMQTT("Offloading", "offloading triggered", violatedPolicies.get(0).getOffloadingPolicy().getClass().getSimpleName());
triggerOffloading(violatedPolicies.get(0));
}
//Blacklist of entities is cleared when no policies were violated.
@@ -80,16 +79,14 @@ public class OffloadingPolicyManager {
private void triggerOffloading(OffloadingStrategy strategy){
InvocableStreamPipesEntity offloadEntity = strategy.getSelectionStrategy().select(this.unsuccessfullyTriedEntities);
- Object[] line = {"entity to offload selected"};
- EvaluationLogger.getInstance().logMQTT("Offloading", line);
+ EvaluationLogger.getInstance().logMQTT("Offloading", "entity to offload selected");
if(offloadEntity != null){
Response resp = PipelineElementManager.getInstance().offload(offloadEntity);
String appId = offloadEntity.getAppId();
String pipelineName = offloadEntity.getCorrespondingPipeline();
- Object[] line_done = {"offloading done", strategy.getOffloadingPolicy().getClass().getSimpleName(), appId};
- EvaluationLogger.getInstance().logMQTT("Offloading", line_done);
+ EvaluationLogger.getInstance().logMQTT("Offloading", "offloading done", strategy.getOffloadingPolicy().getClass().getSimpleName(), appId);
if(resp.isSuccess()){
LOG.info("Successfully offloaded: {} of pipeline: {}", appId, pipelineName);
diff --git a/streampipes-node-controller/src/main/java/org/apache/streampipes/node/controller/management/offloading/strategies/policies/ThresholdViolationOffloadingPolicy.java b/streampipes-node-controller/src/main/java/org/apache/streampipes/node/controller/management/offloading/strategies/policies/ThresholdViolationOffloadingPolicy.java
index e7c60fb..95eb8ff 100644
--- a/streampipes-node-controller/src/main/java/org/apache/streampipes/node/controller/management/offloading/strategies/policies/ThresholdViolationOffloadingPolicy.java
+++ b/streampipes-node-controller/src/main/java/org/apache/streampipes/node/controller/management/offloading/strategies/policies/ThresholdViolationOffloadingPolicy.java
@@ -62,8 +62,7 @@ public class ThresholdViolationOffloadingPolicy<T extends Comparable<T>> impleme
numViolations++;
}
}
- Object[] line = {"policy violation #" + numViolations};
- EvaluationLogger.getInstance().logMQTT("Offloading", line);
+ EvaluationLogger.getInstance().logMQTT("Offloading", "policy violation #" + numViolations);
}
}
}
diff --git a/streampipes-node-controller/src/main/java/org/apache/streampipes/node/controller/management/statscollector/DockerStatsCollector.java b/streampipes-node-controller/src/main/java/org/apache/streampipes/node/controller/management/statscollector/DockerStatsCollector.java
index 2d038fc..e812b02 100644
--- a/streampipes-node-controller/src/main/java/org/apache/streampipes/node/controller/management/statscollector/DockerStatsCollector.java
+++ b/streampipes-node-controller/src/main/java/org/apache/streampipes/node/controller/management/statscollector/DockerStatsCollector.java
@@ -72,7 +72,7 @@ public class DockerStatsCollector {
"netTxHumanReadable"
};
- EvaluationLogger.getInstance().logMQTT(LOGGING_TOPIC, header);
+ EvaluationLogger.getInstance().logHeader(LOGGING_TOPIC, header);
}
private final Runnable collect = () -> {
diff --git a/streampipes-performance-tests/src/main/java/org/apache/streampipes/performance/TestFactory.java b/streampipes-performance-tests/src/main/java/org/apache/streampipes/performance/TestFactory.java
index 4cb6dbe..717095f 100644
--- a/streampipes-performance-tests/src/main/java/org/apache/streampipes/performance/TestFactory.java
+++ b/streampipes-performance-tests/src/main/java/org/apache/streampipes/performance/TestFactory.java
@@ -42,7 +42,7 @@ public class TestFactory {
logger.logHeader("Reconfiguration", header_reconfigure);
return getReconfigurationTest();
case "Offloading":
- Object[] header_offloading = {"timestampInMillis", "deviceId", "event"};
+ Object[] header_offloading = {"timestampInMillis", "deviceId", "event", "policy", "selectedProcessor"};
logger.logHeader("Offloading", header_offloading);
return getOffloadingTest();
default:
diff --git a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/pipeline/PipelineMigrationExecutor.java b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/pipeline/PipelineMigrationExecutor.java
index 2de5bc3..e1f40e4 100644
--- a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/pipeline/PipelineMigrationExecutor.java
+++ b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/pipeline/PipelineMigrationExecutor.java
@@ -103,8 +103,7 @@ public class PipelineMigrationExecutor extends AbstractPipelineExecutor {
return status;
}
long start_target_duration = System.nanoTime() - before_start_target;
- Object[] line_start_target = {"start target element","",start_target_duration,start_target_duration/1000000000.0};
- logger.logMQTT("Migration", line_start_target);
+ logger.logMQTT("Migration", "start target element","",start_target_duration,start_target_duration/1000000000.0);
// Stop relays from origin predecessor
long downtime_beginning = System.nanoTime();
@@ -114,8 +113,7 @@ public class PipelineMigrationExecutor extends AbstractPipelineExecutor {
return status;
}
long stop_relays_origin_duration = System.nanoTime() - downtime_beginning;
- Object[] line_stop_relay = {"stop relay from origin","",stop_relays_origin_duration,stop_relays_origin_duration/1000000000.0};
- logger.logMQTT("Migration", line_stop_relay);
+ logger.logMQTT("Migration", "stop relay from origin","",stop_relays_origin_duration,stop_relays_origin_duration/1000000000.0);
// Start relays to target after migration
long before_start_relay_target = System.nanoTime();
@@ -125,12 +123,10 @@ public class PipelineMigrationExecutor extends AbstractPipelineExecutor {
return status;
}
long start_relay_target_duration = System.nanoTime() - before_start_relay_target;
- Object[] line_start_relay = {"start relay to target","",start_relay_target_duration,start_relay_target_duration/1000000000.0};
- logger.logMQTT("Migration", line_start_relay);
+ logger.logMQTT("Migration", "start relay to target","",start_relay_target_duration,start_relay_target_duration/1000000000.0);
long downtime = System.nanoTime() - downtime_beginning;
- Object[] line_downtime = {"downtime", "", downtime, downtime/1000000000.0};
- logger.logMQTT("Migration", line_downtime);
+ logger.logMQTT("Migration", "downtime", "", downtime, downtime/1000000000.0);
//Stop origin and associated relay
long before_stop_origin = System.nanoTime();
@@ -140,8 +136,7 @@ public class PipelineMigrationExecutor extends AbstractPipelineExecutor {
return status;
}
long stop_origin_duration = System.nanoTime() - before_stop_origin;
- Object[] line_stop_origin = {"stop origin element","",stop_origin_duration,stop_origin_duration/1000000000.0};
- logger.logMQTT("Migration", line_stop_origin);
+ logger.logMQTT("Migration", "stop origin element","",stop_origin_duration,stop_origin_duration/1000000000.0);
List<InvocableStreamPipesEntity> graphs = new ArrayList<>();
graphs.addAll(pipeline.getActions());
[incubator-streampipes] 03/04: Merge branch 'edge-extensions' of
https://github.com/apache/incubator-streampipes into edge-extensions
Posted by go...@apache.org.
This is an automated email from the ASF dual-hosted git repository.
gomm pushed a commit to branch edge-extensions
in repository https://gitbox.apache.org/repos/asf/incubator-streampipes.git
commit 9010d4902ae439934c920cd54ee78c97604b08ab
Merge: d2bfbd1 e715c66
Author: daniel-gomm <da...@outlook.de>
AuthorDate: Wed Oct 13 20:46:17 2021 +0200
Merge branch 'edge-extensions' of https://github.com/apache/incubator-streampipes into edge-extensions
.../streampipes/connect/adapter/Adapter.java | 9 +--
.../connect/adapter/GroundingService.java | 26 ++------
.../connect/adapter/NodeControllerService.java | 74 ++++++++++++++++++++++
.../docker/AbstractStreamPipesDockerContainer.java | 2 +
.../components/pipeline/pipeline.component.ts | 7 +-
ui/src/app/editor/services/jsplumb.service.ts | 24 +++++--
6 files changed, 106 insertions(+), 36 deletions(-)
[incubator-streampipes] 02/04: introduced policy reset after
successful offloading
Posted by go...@apache.org.
This is an automated email from the ASF dual-hosted git repository.
gomm pushed a commit to branch edge-extensions
in repository https://gitbox.apache.org/repos/asf/incubator-streampipes.git
commit d2bfbd106e4dd1bce144c246f8b75630b7af54ac
Author: daniel-gomm <da...@outlook.de>
AuthorDate: Fri Oct 8 09:41:28 2021 +0200
introduced policy reset after successful offloading
---
.../node/controller/management/offloading/OffloadingPolicyManager.java | 1 +
1 file changed, 1 insertion(+)
diff --git a/streampipes-node-controller/src/main/java/org/apache/streampipes/node/controller/management/offloading/OffloadingPolicyManager.java b/streampipes-node-controller/src/main/java/org/apache/streampipes/node/controller/management/offloading/OffloadingPolicyManager.java
index 73ae24e..5d745ce 100644
--- a/streampipes-node-controller/src/main/java/org/apache/streampipes/node/controller/management/offloading/OffloadingPolicyManager.java
+++ b/streampipes-node-controller/src/main/java/org/apache/streampipes/node/controller/management/offloading/OffloadingPolicyManager.java
@@ -93,6 +93,7 @@ public class OffloadingPolicyManager {
if(resp.isSuccess()){
LOG.info("Successfully offloaded: {} of pipeline: {}", appId, pipelineName);
+ strategy.getOffloadingPolicy().reset();
} else{
LOG.warn("Failed to offload: {} of pipeline: {}", appId, pipelineName);
unsuccessfullyTriedEntities.add(offloadEntity);