You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@streampipes.apache.org by go...@apache.org on 2021/05/12 16:21:29 UTC
[incubator-streampipes] branch edge-extensions updated: Fixed
issues with auto offloading
This is an automated email from the ASF dual-hosted git repository.
gomm pushed a commit to branch edge-extensions
in repository https://gitbox.apache.org/repos/asf/incubator-streampipes.git
The following commit(s) were added to refs/heads/edge-extensions by this push:
new 6d017b7 Fixed issues with auto offloading
6d017b7 is described below
commit 6d017b7579035608368d087c02bc17160ab211ce
Author: Daniel Gomm <da...@outlook.de>
AuthorDate: Wed May 12 18:20:54 2021 +0200
Fixed issues with auto offloading
---
.../offloading/OffloadingPolicyManager.java | 37 +++++++++----
.../strategies/OffloadingStrategyFactory.java | 63 ++++++++++++++++------
.../selection/CPULoadSelectionStrategy.java | 4 +-
.../selection/PrioritySelectionStrategy.java | 17 +++---
.../selection/RandomSelectionStrategy.java | 2 +-
.../strategies/selection/SelectionStrategy.java | 4 +-
.../handler/PipelineElementOffloadingHandler.java | 3 +-
.../management/resource/ResourceManager.java | 2 +-
.../node/controller/utils/HttpUtils.java | 32 +++++++++++
.../migration/MigrationPipelineGenerator.java | 21 ++++----
10 files changed, 138 insertions(+), 47 deletions(-)
diff --git a/streampipes-node-controller/src/main/java/org/apache/streampipes/node/controller/management/offloading/OffloadingPolicyManager.java b/streampipes-node-controller/src/main/java/org/apache/streampipes/node/controller/management/offloading/OffloadingPolicyManager.java
index a17ab5c..372c470 100644
--- a/streampipes-node-controller/src/main/java/org/apache/streampipes/node/controller/management/offloading/OffloadingPolicyManager.java
+++ b/streampipes-node-controller/src/main/java/org/apache/streampipes/node/controller/management/offloading/OffloadingPolicyManager.java
@@ -42,6 +42,7 @@ public class OffloadingPolicyManager {
private static final String NODE_MANAGEMENT_ONLINE_ENDPOINT = BACKEND_BASE_ROUTE + "/nodes/online";
private final List<OffloadingStrategy<?>> offloadingStrategies = new ArrayList<>();
+ private final List<InvocableStreamPipesEntity> unsuccessfullyTriedEntities = new ArrayList<>();
private static OffloadingPolicyManager instance;
private static final Logger LOG = LoggerFactory.getLogger(OffloadingPolicyManager.class.getCanonicalName());
@@ -53,26 +54,44 @@ public class OffloadingPolicyManager {
}
public void checkPolicies(ResourceMetrics rm){
+ List<OffloadingStrategy<?>> violatedPolicies = new ArrayList<>();
for(OffloadingStrategy strategy : offloadingStrategies){
strategy.getOffloadingPolicy().addValue(strategy.getResourceProperty().getProperty(rm));
if(strategy.getOffloadingPolicy().isViolated()){
- InvocableStreamPipesEntity offloadEntity = strategy.getSelectionStrategy().select();
- if(offloadEntity != null){
- Response resp = PipelineElementManager.getInstance().offload(offloadEntity);
- if(resp.isSuccess())
- LOG.info("Successfully offloaded: " + offloadEntity.getAppId()
- + " of pipeline: " + offloadEntity.getCorrespondingPipeline());
- else LOG.info("Failed to offload: " + offloadEntity.getAppId()
- + " of pipeline: " + offloadEntity.getCorrespondingPipeline());
- } else LOG.info("No pipeline element found to offload");
+ violatedPolicies.add(strategy);
}
}
+ if(!violatedPolicies.isEmpty())
+ //Currently uses the first violated policy. Could be extended to take the degree of policy violation into
+ // account
+ triggerOffloading(violatedPolicies.get(0));
+ //Blacklist of entities is cleared when no policies were violated.
+ else unsuccessfullyTriedEntities.clear();
+ }
+
+ private void triggerOffloading(OffloadingStrategy strategy){
+ InvocableStreamPipesEntity offloadEntity = strategy.getSelectionStrategy().select(this.unsuccessfullyTriedEntities);
+ if(offloadEntity != null){
+ Response resp = PipelineElementManager.getInstance().offload(offloadEntity);
+ if(resp.isSuccess()){
+ LOG.info("Successfully offloaded: " + offloadEntity.getAppId()
+ + " of pipeline: " + offloadEntity.getCorrespondingPipeline());
+ } else{
+ LOG.info("Failed to offload: " + offloadEntity.getAppId()
+ + " of pipeline: " + offloadEntity.getCorrespondingPipeline());
+ unsuccessfullyTriedEntities.add(offloadEntity);
+ }
+ } else LOG.info("No pipeline element found to offload");
}
public void addOffloadingStrategy(OffloadingStrategy<?> offloadingStrategy){
this.offloadingStrategies.add(offloadingStrategy);
}
+ public void addOffloadingStrategies(List<OffloadingStrategy<?>> offloadingStrategies){
+ offloadingStrategies.forEach(this::addOffloadingStrategy);
+ }
+
public List<NodeInfoDescription> getOnlineNodes(){
String endpoint = generateNodeManagementOnlineNodesEndpoint();
return HttpUtils.get(endpoint, new TypeReference<List<NodeInfoDescription>>(){});
diff --git a/streampipes-node-controller/src/main/java/org/apache/streampipes/node/controller/management/offloading/strategies/OffloadingStrategyFactory.java b/streampipes-node-controller/src/main/java/org/apache/streampipes/node/controller/management/offloading/strategies/OffloadingStrategyFactory.java
index cbe5b63..4def3fc 100644
--- a/streampipes-node-controller/src/main/java/org/apache/streampipes/node/controller/management/offloading/strategies/OffloadingStrategyFactory.java
+++ b/streampipes-node-controller/src/main/java/org/apache/streampipes/node/controller/management/offloading/strategies/OffloadingStrategyFactory.java
@@ -19,6 +19,7 @@
package org.apache.streampipes.node.controller.management.offloading.strategies;
import org.apache.streampipes.node.controller.config.NodeConfiguration;
+import org.apache.streampipes.node.controller.management.node.NodeManager;
import org.apache.streampipes.node.controller.management.offloading.strategies.policies.Comparator;
import org.apache.streampipes.node.controller.management.offloading.strategies.policies.ThresholdViolationOffloadingPolicy;
import org.apache.streampipes.node.controller.management.offloading.strategies.property.CPULoadResourceProperty;
@@ -27,35 +28,63 @@ import org.apache.streampipes.node.controller.management.offloading.strategies.p
import org.apache.streampipes.node.controller.management.offloading.strategies.selection.PrioritySelectionStrategy;
import org.apache.streampipes.node.controller.management.offloading.strategies.selection.RandomSelectionStrategy;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+
public class OffloadingStrategyFactory {
- public OffloadingStrategy select(){
+ public List<OffloadingStrategy<?>> select(){
switch (NodeConfiguration.getAutoOffloadingStrategy()) {
case CPU:
- return new OffloadingStrategy<Float>(
- new ThresholdViolationOffloadingPolicy<>(5, Comparator.GREATER, 90f, 5),
- new CPULoadResourceProperty(), new RandomSelectionStrategy());
+ return Collections.singletonList(getDefaultCPUOffloadingPolicy());
case MEM:
- return new OffloadingStrategy<Long>(
- new ThresholdViolationOffloadingPolicy<>(5, Comparator.SMALLER,
- 549755813888L, 5),
- new FreeMemoryResourceProperty(), new RandomSelectionStrategy());
+ return Collections.singletonList(getDefaultMemoryOffloadingPolicy());
case DISK:
- return new OffloadingStrategy<Long>(
- new ThresholdViolationOffloadingPolicy<>(5, Comparator.SMALLER,
- 549755813888L, 5),
- new FreeDiskSpaceResourceProperty(), new RandomSelectionStrategy());
+ return Collections.singletonList(getDefaultDiskSpaceOffloadingPolicy());
case DEBUG:
- return new OffloadingStrategy<Float>(
+ return Collections.singletonList(new OffloadingStrategy<Float>(
new ThresholdViolationOffloadingPolicy<>(5,
Comparator.GREATER, 0.5f, 1),
- new CPULoadResourceProperty(), new PrioritySelectionStrategy());
+ new CPULoadResourceProperty(), new PrioritySelectionStrategy()));
default:
- return new OffloadingStrategy<Float>(
- new ThresholdViolationOffloadingPolicy<>(5, Comparator.GREATER, 90f, 4),
- new CPULoadResourceProperty(), new RandomSelectionStrategy());
+ return getDefaultStrategy();
}
}
+ private List<OffloadingStrategy<?>> getDefaultStrategy(){
+ List<OffloadingStrategy<?>> offloadingStrategies = new ArrayList<>();
+ offloadingStrategies.add(getDefaultCPUOffloadingPolicy());
+ offloadingStrategies.add(getDefaultMemoryOffloadingPolicy());
+ offloadingStrategies.add(getDefaultDiskSpaceOffloadingPolicy());
+ return offloadingStrategies;
+ }
+
+ private OffloadingStrategy<Float> getDefaultCPUOffloadingPolicy(){
+ return new OffloadingStrategy<>(
+ new ThresholdViolationOffloadingPolicy<>(5, Comparator.GREATER, 90f, 4),
+ new CPULoadResourceProperty(), new RandomSelectionStrategy());
+ }
+
+ private OffloadingStrategy<Long> getDefaultMemoryOffloadingPolicy(){
+ long totalMemory = NodeManager.getInstance().getNodeInfoDescription()
+ .getNodeResources().getHardwareResource().getMemory().getMemTotal();
+ long threshold = (long) (totalMemory * 0.15);
+ return new OffloadingStrategy<>(
+ new ThresholdViolationOffloadingPolicy<>(5, Comparator.SMALLER,
+ threshold, 4),
+ new FreeMemoryResourceProperty(), new RandomSelectionStrategy());
+ }
+
+ private OffloadingStrategy<Long> getDefaultDiskSpaceOffloadingPolicy(){
+ long totalDisk = NodeManager.getInstance().getNodeInfoDescription()
+ .getNodeResources().getHardwareResource().getDisk().getDiskTotal();
+ long threshold = (long) (totalDisk * 0.1);
+ return new OffloadingStrategy<>(
+ new ThresholdViolationOffloadingPolicy<>(5, Comparator.SMALLER,
+ threshold, 4),
+ new FreeMemoryResourceProperty(), new RandomSelectionStrategy());
+ }
+
}
diff --git a/streampipes-node-controller/src/main/java/org/apache/streampipes/node/controller/management/offloading/strategies/selection/CPULoadSelectionStrategy.java b/streampipes-node-controller/src/main/java/org/apache/streampipes/node/controller/management/offloading/strategies/selection/CPULoadSelectionStrategy.java
index 8658ae3..8bd7e20 100644
--- a/streampipes-node-controller/src/main/java/org/apache/streampipes/node/controller/management/offloading/strategies/selection/CPULoadSelectionStrategy.java
+++ b/streampipes-node-controller/src/main/java/org/apache/streampipes/node/controller/management/offloading/strategies/selection/CPULoadSelectionStrategy.java
@@ -22,12 +22,12 @@ import org.apache.streampipes.model.base.InvocableStreamPipesEntity;
import java.lang.management.ManagementFactory;
import java.lang.management.ThreadMXBean;
+import java.util.List;
public class CPULoadSelectionStrategy implements SelectionStrategy{
@Override
- public InvocableStreamPipesEntity select() {
+ public InvocableStreamPipesEntity select(List<InvocableStreamPipesEntity> blacklistedEntities) {
//TODO: Migrate the PE with the highest load on CPU (or possibly other resource)
- ThreadMXBean bean = ManagementFactory.getThreadMXBean();
return null;
}
}
diff --git a/streampipes-node-controller/src/main/java/org/apache/streampipes/node/controller/management/offloading/strategies/selection/PrioritySelectionStrategy.java b/streampipes-node-controller/src/main/java/org/apache/streampipes/node/controller/management/offloading/strategies/selection/PrioritySelectionStrategy.java
index 44ad591..e52ec72 100644
--- a/streampipes-node-controller/src/main/java/org/apache/streampipes/node/controller/management/offloading/strategies/selection/PrioritySelectionStrategy.java
+++ b/streampipes-node-controller/src/main/java/org/apache/streampipes/node/controller/management/offloading/strategies/selection/PrioritySelectionStrategy.java
@@ -32,22 +32,22 @@ import java.util.stream.Collectors;
public class PrioritySelectionStrategy implements SelectionStrategy{
@Override
- public InvocableStreamPipesEntity select() {
+ public InvocableStreamPipesEntity select(List<InvocableStreamPipesEntity> blacklistedEntities) {
List<InvocableStreamPipesEntity> runningPipelineElements = RunningInvocableInstances.INSTANCE.getAll();
//List all other nodes that are online
- // TODO: this involves request to central backend
- List<NodeInfoDescription> onlineNodes = getNodeInfos().stream().filter(desc ->
+ // TODO: this involves request to central backend (deactivated; assess if provides meaningful benefit)
+ /*List<NodeInfoDescription> onlineNodes = getNodeInfos().stream().filter(desc ->
!desc.getNodeControllerId().equals(NodeConfiguration.getNodeControllerId()))
- .collect(Collectors.toList());
+ .collect(Collectors.toList());*/
List<InvocableStreamPipesEntity> candidatesForOffloading = runningPipelineElements.stream()
+ .filter(entity -> isNotBlacklisted(entity, blacklistedEntities))
.filter(InvocableStreamPipesEntity::isPreemption)
- .filter(entity -> verifyIfSupportedOnOtherNodes(entity, onlineNodes))
+ //.filter(entity -> verifyIfSupportedOnOtherNodes(entity, onlineNodes))
.collect(Collectors.toList());
if (candidatesForOffloading.isEmpty()) {
- // TODO what to do when null?
return null;
} else {
return selectPipelineElementWithLowestPriorityScore(candidatesForOffloading);
@@ -76,4 +76,9 @@ public class PrioritySelectionStrategy implements SelectionStrategy{
}
return !candidateNodes.isEmpty();
}
+
+ private boolean isNotBlacklisted(InvocableStreamPipesEntity entity, List<InvocableStreamPipesEntity> blacklist){
+ return blacklist.stream().noneMatch(blacklistedEntity ->
+ blacklistedEntity.getDeploymentRunningInstanceId().equals(entity.getDeploymentRunningInstanceId()));
+ }
}
diff --git a/streampipes-node-controller/src/main/java/org/apache/streampipes/node/controller/management/offloading/strategies/selection/RandomSelectionStrategy.java b/streampipes-node-controller/src/main/java/org/apache/streampipes/node/controller/management/offloading/strategies/selection/RandomSelectionStrategy.java
index a37289c..e1de78c 100644
--- a/streampipes-node-controller/src/main/java/org/apache/streampipes/node/controller/management/offloading/strategies/selection/RandomSelectionStrategy.java
+++ b/streampipes-node-controller/src/main/java/org/apache/streampipes/node/controller/management/offloading/strategies/selection/RandomSelectionStrategy.java
@@ -27,7 +27,7 @@ import java.util.Random;
public class RandomSelectionStrategy implements SelectionStrategy{
@Override
- public InvocableStreamPipesEntity select() {
+ public InvocableStreamPipesEntity select(List<InvocableStreamPipesEntity> blacklistedEntities) {
List<InvocableStreamPipesEntity> instances = RunningInvocableInstances.INSTANCE.getAll();
if(instances.size() == 0)
return null;
diff --git a/streampipes-node-controller/src/main/java/org/apache/streampipes/node/controller/management/offloading/strategies/selection/SelectionStrategy.java b/streampipes-node-controller/src/main/java/org/apache/streampipes/node/controller/management/offloading/strategies/selection/SelectionStrategy.java
index 1964198..54ed1f8 100644
--- a/streampipes-node-controller/src/main/java/org/apache/streampipes/node/controller/management/offloading/strategies/selection/SelectionStrategy.java
+++ b/streampipes-node-controller/src/main/java/org/apache/streampipes/node/controller/management/offloading/strategies/selection/SelectionStrategy.java
@@ -20,6 +20,8 @@ package org.apache.streampipes.node.controller.management.offloading.strategies.
import org.apache.streampipes.model.base.InvocableStreamPipesEntity;
+import java.util.List;
+
public interface SelectionStrategy {
- InvocableStreamPipesEntity select();
+ InvocableStreamPipesEntity select(List<InvocableStreamPipesEntity> blacklistedEntities);
}
diff --git a/streampipes-node-controller/src/main/java/org/apache/streampipes/node/controller/management/pe/handler/PipelineElementOffloadingHandler.java b/streampipes-node-controller/src/main/java/org/apache/streampipes/node/controller/management/pe/handler/PipelineElementOffloadingHandler.java
index 59e6e34..7b8dc5e 100644
--- a/streampipes-node-controller/src/main/java/org/apache/streampipes/node/controller/management/pe/handler/PipelineElementOffloadingHandler.java
+++ b/streampipes-node-controller/src/main/java/org/apache/streampipes/node/controller/management/pe/handler/PipelineElementOffloadingHandler.java
@@ -41,7 +41,8 @@ public class PipelineElementOffloadingHandler implements IHandler<Response> {
@Override
public Response handle() {
String url = generatePipelineManagementOffloadEndpoint();
- return HttpUtils.post(url, graph);
+ String bearerToken = NodeConfiguration.getNodeApiKey();
+ return HttpUtils.post(url, bearerToken, graph, Response.class);
}
private String generatePipelineManagementOffloadEndpoint() {
diff --git a/streampipes-node-controller/src/main/java/org/apache/streampipes/node/controller/management/resource/ResourceManager.java b/streampipes-node-controller/src/main/java/org/apache/streampipes/node/controller/management/resource/ResourceManager.java
index 0107388..5710a5c 100644
--- a/streampipes-node-controller/src/main/java/org/apache/streampipes/node/controller/management/resource/ResourceManager.java
+++ b/streampipes-node-controller/src/main/java/org/apache/streampipes/node/controller/management/resource/ResourceManager.java
@@ -49,7 +49,7 @@ public class ResourceManager {
private ResourceManager() {
//Offloading Policy
- OffloadingPolicyManager.getInstance().addOffloadingStrategy(new OffloadingStrategyFactory().select());
+ OffloadingPolicyManager.getInstance().addOffloadingStrategies(new OffloadingStrategyFactory().select());
}
public static ResourceManager getInstance() {
diff --git a/streampipes-node-controller/src/main/java/org/apache/streampipes/node/controller/utils/HttpUtils.java b/streampipes-node-controller/src/main/java/org/apache/streampipes/node/controller/utils/HttpUtils.java
index 71beb71..f1e0536 100644
--- a/streampipes-node-controller/src/main/java/org/apache/streampipes/node/controller/utils/HttpUtils.java
+++ b/streampipes-node-controller/src/main/java/org/apache/streampipes/node/controller/utils/HttpUtils.java
@@ -166,6 +166,35 @@ public class HttpUtils {
}
}
+ public static <T1, T2> T2 post(String url, String bearerToken, T1 object, Class<T2> clazz) {
+ String body = serialize(object);
+ try {
+ Response response = Request.Post(url)
+ .bodyString(body, ContentType.APPLICATION_JSON)
+ .addHeader("Authorization", "Bearer " + bearerToken)
+ .connectTimeout(CONNECT_TIMEOUT)
+ .execute();
+
+ if (clazz.isInstance(Response.class)) {
+ return (T2) response;
+ } else if (clazz.isInstance(String.class)) {
+ return (T2) response.returnContent().asString();
+ } else if (clazz.isAssignableFrom(byte[].class)) {
+ return (T2) response.returnContent().asBytes();
+ } else if (clazz.isAssignableFrom(Boolean.class)) {
+ return (T2) Boolean.TRUE;
+ } else {
+ return deserialize(response, clazz);
+ }
+ } catch (IOException e) {
+ if (clazz.isAssignableFrom(Boolean.class)) {
+ return (T2) Boolean.FALSE;
+ }
+
+ throw new SpRuntimeException("Something went wrong during POST request", e);
+ }
+ }
+
public static <T> org.apache.streampipes.model.Response post(String url, T object) {
String body = serialize(object);
org.apache.streampipes.model.Response response = new org.apache.streampipes.model.Response();
@@ -243,6 +272,9 @@ public class HttpUtils {
public static <T>T deserialize(Response response, Class<T> clazz) {
try {
String responseString = response.returnContent().asString();
+ //TODO: Fix issue when starting connect adapters (& remove quick fix)
+ if(clazz.equals(String.class))
+ return (T) responseString;
return JacksonSerializer
.getObjectMapper()
.readValue(responseString, clazz);
diff --git a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/migration/MigrationPipelineGenerator.java b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/migration/MigrationPipelineGenerator.java
index 2d01aad..ad5614a 100644
--- a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/migration/MigrationPipelineGenerator.java
+++ b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/migration/MigrationPipelineGenerator.java
@@ -61,10 +61,10 @@ public class MigrationPipelineGenerator {
eligibleTargetNodes = filterResourceUtilization(eligibleTargetNodes);
//Different strategies possible (atm cancel offloading)
- if(eligibleTargetNodes == null || eligibleTargetNodes.isEmpty())
+ if(eligibleTargetNodes.isEmpty())
return null;
- //Random Selection of new node within the remaining eligibile nodes
+ //Random Selection of new node within the remaining eligible nodes
randomSelectionAndUpdate(eligibleTargetNodes);
return generateTargetPipeline();
@@ -72,14 +72,11 @@ public class MigrationPipelineGenerator {
private List<NodeInfoDescription> findEligibleTargetNodes(){
- List<NodeInfoDescription> eligibileTargetNodes = new ArrayList<>();
List<NodeInfoDescription> onlineNodes = NodeManagement.getInstance().getOnlineNodes();
- onlineNodes.stream()
+ return onlineNodes.stream()
.filter(this::matchAndVerify)
- .map(eligibileTargetNodes::add);
-
- return eligibileTargetNodes;
+ .collect(Collectors.toList());
}
private boolean matchAndVerify(NodeInfoDescription node) {
@@ -111,8 +108,12 @@ public class MigrationPipelineGenerator {
String nodeControllerId = nodeInfo.getNodeControllerId();
Queue<ResourceMetrics> rmHistory = ClusterResourceMonitor.getNodeResourceMetricsById(nodeControllerId);
- if(rmHistory == null)
- return null;
+ if(rmHistory == null){
+ //If no RessourceMetrics history is available (e.g. shorly after Backend start), consider node as
+ // possible target node
+ filteredTargetNodes.add(nodeInfo);
+ continue;
+ }
Hardware hardware = entityToMigrate.getResourceRequirements().stream()
.filter(nodeRR -> nodeRR instanceof Hardware)
@@ -127,6 +128,8 @@ public class MigrationPipelineGenerator {
&& hardware.getMemory() <= MEM_MULTIPLICATION_FACTOR * rmHistory.peek().getFreeMemoryInBytes()) {
filteredTargetNodes.add(nodeInfo);
}
+ } else{
+ filteredTargetNodes.add(nodeInfo);
}
}
return filteredTargetNodes;