You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@streampipes.apache.org by go...@apache.org on 2021/05/12 16:21:29 UTC

[incubator-streampipes] branch edge-extensions updated: Fixed issues with auto offloading

This is an automated email from the ASF dual-hosted git repository.

gomm pushed a commit to branch edge-extensions
in repository https://gitbox.apache.org/repos/asf/incubator-streampipes.git


The following commit(s) were added to refs/heads/edge-extensions by this push:
     new 6d017b7  Fixed issues with auto offloading
6d017b7 is described below

commit 6d017b7579035608368d087c02bc17160ab211ce
Author: Daniel Gomm <da...@outlook.de>
AuthorDate: Wed May 12 18:20:54 2021 +0200

    Fixed issues with auto offloading
---
 .../offloading/OffloadingPolicyManager.java        | 37 +++++++++----
 .../strategies/OffloadingStrategyFactory.java      | 63 ++++++++++++++++------
 .../selection/CPULoadSelectionStrategy.java        |  4 +-
 .../selection/PrioritySelectionStrategy.java       | 17 +++---
 .../selection/RandomSelectionStrategy.java         |  2 +-
 .../strategies/selection/SelectionStrategy.java    |  4 +-
 .../handler/PipelineElementOffloadingHandler.java  |  3 +-
 .../management/resource/ResourceManager.java       |  2 +-
 .../node/controller/utils/HttpUtils.java           | 32 +++++++++++
 .../migration/MigrationPipelineGenerator.java      | 21 ++++----
 10 files changed, 138 insertions(+), 47 deletions(-)

diff --git a/streampipes-node-controller/src/main/java/org/apache/streampipes/node/controller/management/offloading/OffloadingPolicyManager.java b/streampipes-node-controller/src/main/java/org/apache/streampipes/node/controller/management/offloading/OffloadingPolicyManager.java
index a17ab5c..372c470 100644
--- a/streampipes-node-controller/src/main/java/org/apache/streampipes/node/controller/management/offloading/OffloadingPolicyManager.java
+++ b/streampipes-node-controller/src/main/java/org/apache/streampipes/node/controller/management/offloading/OffloadingPolicyManager.java
@@ -42,6 +42,7 @@ public class OffloadingPolicyManager {
     private static final String NODE_MANAGEMENT_ONLINE_ENDPOINT = BACKEND_BASE_ROUTE + "/nodes/online";
 
     private final List<OffloadingStrategy<?>> offloadingStrategies = new ArrayList<>();
+    private final List<InvocableStreamPipesEntity> unsuccessfullyTriedEntities = new ArrayList<>();
     private static OffloadingPolicyManager instance;
     private static final Logger LOG = LoggerFactory.getLogger(OffloadingPolicyManager.class.getCanonicalName());
 
@@ -53,26 +54,44 @@ public class OffloadingPolicyManager {
     }
 
     public void checkPolicies(ResourceMetrics rm){
+        List<OffloadingStrategy<?>> violatedPolicies = new ArrayList<>();
         for(OffloadingStrategy strategy : offloadingStrategies){
             strategy.getOffloadingPolicy().addValue(strategy.getResourceProperty().getProperty(rm));
             if(strategy.getOffloadingPolicy().isViolated()){
-                InvocableStreamPipesEntity offloadEntity = strategy.getSelectionStrategy().select();
-                if(offloadEntity != null){
-                    Response resp = PipelineElementManager.getInstance().offload(offloadEntity);
-                    if(resp.isSuccess())
-                        LOG.info("Successfully offloaded: " + offloadEntity.getAppId()
-                                + " of pipeline: " + offloadEntity.getCorrespondingPipeline());
-                    else LOG.info("Failed to offload: " + offloadEntity.getAppId()
-                            + " of pipeline: " + offloadEntity.getCorrespondingPipeline());
-                } else LOG.info("No pipeline element found to offload");
+                violatedPolicies.add(strategy);
             }
         }
+        if(!violatedPolicies.isEmpty())
+            //Currently uses the first violated policy. Could be extended to take the degree of policy violation into
+            // account
+            triggerOffloading(violatedPolicies.get(0));
+        //Blacklist of entities is cleared when no policies were violated.
+        else unsuccessfullyTriedEntities.clear();
+    }
+
+    private void triggerOffloading(OffloadingStrategy strategy){
+        InvocableStreamPipesEntity offloadEntity = strategy.getSelectionStrategy().select(this.unsuccessfullyTriedEntities);
+        if(offloadEntity != null){
+            Response resp = PipelineElementManager.getInstance().offload(offloadEntity);
+            if(resp.isSuccess()){
+                LOG.info("Successfully offloaded: " + offloadEntity.getAppId()
+                        + " of pipeline: " + offloadEntity.getCorrespondingPipeline());
+            } else{
+                LOG.info("Failed to offload: " + offloadEntity.getAppId()
+                        + " of pipeline: " + offloadEntity.getCorrespondingPipeline());
+                unsuccessfullyTriedEntities.add(offloadEntity);
+            }
+        } else LOG.info("No pipeline element found to offload");
     }
 
     public void addOffloadingStrategy(OffloadingStrategy<?> offloadingStrategy){
         this.offloadingStrategies.add(offloadingStrategy);
     }
 
+    public void addOffloadingStrategies(List<OffloadingStrategy<?>> offloadingStrategies){
+        offloadingStrategies.forEach(this::addOffloadingStrategy);
+    }
+
     public List<NodeInfoDescription> getOnlineNodes(){
         String endpoint = generateNodeManagementOnlineNodesEndpoint();
         return HttpUtils.get(endpoint, new TypeReference<List<NodeInfoDescription>>(){});
diff --git a/streampipes-node-controller/src/main/java/org/apache/streampipes/node/controller/management/offloading/strategies/OffloadingStrategyFactory.java b/streampipes-node-controller/src/main/java/org/apache/streampipes/node/controller/management/offloading/strategies/OffloadingStrategyFactory.java
index cbe5b63..4def3fc 100644
--- a/streampipes-node-controller/src/main/java/org/apache/streampipes/node/controller/management/offloading/strategies/OffloadingStrategyFactory.java
+++ b/streampipes-node-controller/src/main/java/org/apache/streampipes/node/controller/management/offloading/strategies/OffloadingStrategyFactory.java
@@ -19,6 +19,7 @@
 package org.apache.streampipes.node.controller.management.offloading.strategies;
 
 import org.apache.streampipes.node.controller.config.NodeConfiguration;
+import org.apache.streampipes.node.controller.management.node.NodeManager;
 import org.apache.streampipes.node.controller.management.offloading.strategies.policies.Comparator;
 import org.apache.streampipes.node.controller.management.offloading.strategies.policies.ThresholdViolationOffloadingPolicy;
 import org.apache.streampipes.node.controller.management.offloading.strategies.property.CPULoadResourceProperty;
@@ -27,35 +28,63 @@ import org.apache.streampipes.node.controller.management.offloading.strategies.p
 import org.apache.streampipes.node.controller.management.offloading.strategies.selection.PrioritySelectionStrategy;
 import org.apache.streampipes.node.controller.management.offloading.strategies.selection.RandomSelectionStrategy;
 
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+
 public class OffloadingStrategyFactory {
 
-    public OffloadingStrategy select(){
+    public List<OffloadingStrategy<?>> select(){
 
         switch (NodeConfiguration.getAutoOffloadingStrategy()) {
             case CPU:
-                return new OffloadingStrategy<Float>(
-                        new ThresholdViolationOffloadingPolicy<>(5, Comparator.GREATER, 90f, 5),
-                        new CPULoadResourceProperty(), new RandomSelectionStrategy());
+                return Collections.singletonList(getDefaultCPUOffloadingPolicy());
             case MEM:
-                return new OffloadingStrategy<Long>(
-                        new ThresholdViolationOffloadingPolicy<>(5, Comparator.SMALLER,
-                                549755813888L, 5),
-                        new FreeMemoryResourceProperty(), new RandomSelectionStrategy());
+                return Collections.singletonList(getDefaultMemoryOffloadingPolicy());
             case DISK:
-                return new OffloadingStrategy<Long>(
-                        new ThresholdViolationOffloadingPolicy<>(5, Comparator.SMALLER,
-                                549755813888L, 5),
-                        new FreeDiskSpaceResourceProperty(), new RandomSelectionStrategy());
+                return Collections.singletonList(getDefaultDiskSpaceOffloadingPolicy());
             case DEBUG:
-                return new OffloadingStrategy<Float>(
+                return Collections.singletonList(new OffloadingStrategy<Float>(
                         new ThresholdViolationOffloadingPolicy<>(5,
                                 Comparator.GREATER, 0.5f, 1),
-                        new CPULoadResourceProperty(), new PrioritySelectionStrategy());
+                        new CPULoadResourceProperty(), new PrioritySelectionStrategy()));
             default:
-                return new OffloadingStrategy<Float>(
-                        new ThresholdViolationOffloadingPolicy<>(5, Comparator.GREATER, 90f, 4),
-                        new CPULoadResourceProperty(), new RandomSelectionStrategy());
+                return getDefaultStrategy();
         }
     }
 
+    private List<OffloadingStrategy<?>> getDefaultStrategy(){
+        List<OffloadingStrategy<?>> offloadingStrategies = new ArrayList<>();
+        offloadingStrategies.add(getDefaultCPUOffloadingPolicy());
+        offloadingStrategies.add(getDefaultMemoryOffloadingPolicy());
+        offloadingStrategies.add(getDefaultDiskSpaceOffloadingPolicy());
+        return offloadingStrategies;
+    }
+
+    private OffloadingStrategy<Float> getDefaultCPUOffloadingPolicy(){
+        return new OffloadingStrategy<>(
+                new ThresholdViolationOffloadingPolicy<>(5, Comparator.GREATER, 90f, 4),
+                new CPULoadResourceProperty(), new RandomSelectionStrategy());
+    }
+
+    private OffloadingStrategy<Long> getDefaultMemoryOffloadingPolicy(){
+        long totalMemory = NodeManager.getInstance().getNodeInfoDescription()
+                .getNodeResources().getHardwareResource().getMemory().getMemTotal();
+        long threshold = (long) (totalMemory * 0.15);
+        return new OffloadingStrategy<>(
+                new ThresholdViolationOffloadingPolicy<>(5, Comparator.SMALLER,
+                        threshold, 4),
+                new FreeMemoryResourceProperty(), new RandomSelectionStrategy());
+    }
+
+    private OffloadingStrategy<Long> getDefaultDiskSpaceOffloadingPolicy(){
+        long totalDisk = NodeManager.getInstance().getNodeInfoDescription()
+                .getNodeResources().getHardwareResource().getDisk().getDiskTotal();
+        long threshold = (long) (totalDisk * 0.1);
+        return new OffloadingStrategy<>(
+                new ThresholdViolationOffloadingPolicy<>(5, Comparator.SMALLER,
+                        threshold, 4),
+                new FreeMemoryResourceProperty(), new RandomSelectionStrategy());
+    }
+
 }
diff --git a/streampipes-node-controller/src/main/java/org/apache/streampipes/node/controller/management/offloading/strategies/selection/CPULoadSelectionStrategy.java b/streampipes-node-controller/src/main/java/org/apache/streampipes/node/controller/management/offloading/strategies/selection/CPULoadSelectionStrategy.java
index 8658ae3..8bd7e20 100644
--- a/streampipes-node-controller/src/main/java/org/apache/streampipes/node/controller/management/offloading/strategies/selection/CPULoadSelectionStrategy.java
+++ b/streampipes-node-controller/src/main/java/org/apache/streampipes/node/controller/management/offloading/strategies/selection/CPULoadSelectionStrategy.java
@@ -22,12 +22,12 @@ import org.apache.streampipes.model.base.InvocableStreamPipesEntity;
 
 import java.lang.management.ManagementFactory;
 import java.lang.management.ThreadMXBean;
+import java.util.List;
 
 public class CPULoadSelectionStrategy implements SelectionStrategy{
     @Override
-    public InvocableStreamPipesEntity select() {
+    public InvocableStreamPipesEntity select(List<InvocableStreamPipesEntity> blacklistedEntities) {
         //TODO: Migrate the PE with the highest load on CPU (or possibly other resource)
-        ThreadMXBean bean = ManagementFactory.getThreadMXBean();
         return null;
     }
 }
diff --git a/streampipes-node-controller/src/main/java/org/apache/streampipes/node/controller/management/offloading/strategies/selection/PrioritySelectionStrategy.java b/streampipes-node-controller/src/main/java/org/apache/streampipes/node/controller/management/offloading/strategies/selection/PrioritySelectionStrategy.java
index 44ad591..e52ec72 100644
--- a/streampipes-node-controller/src/main/java/org/apache/streampipes/node/controller/management/offloading/strategies/selection/PrioritySelectionStrategy.java
+++ b/streampipes-node-controller/src/main/java/org/apache/streampipes/node/controller/management/offloading/strategies/selection/PrioritySelectionStrategy.java
@@ -32,22 +32,22 @@ import java.util.stream.Collectors;
 public class PrioritySelectionStrategy implements SelectionStrategy{
 
     @Override
-    public InvocableStreamPipesEntity select() {
+    public InvocableStreamPipesEntity select(List<InvocableStreamPipesEntity> blacklistedEntities) {
         List<InvocableStreamPipesEntity> runningPipelineElements = RunningInvocableInstances.INSTANCE.getAll();
 
         //List all other nodes that are online
-        // TODO: this involves request to central backend
-        List<NodeInfoDescription> onlineNodes = getNodeInfos().stream().filter(desc ->
+        // TODO: this involves request to central backend (deactivated; assess if provides meaningful benefit)
+        /*List<NodeInfoDescription> onlineNodes = getNodeInfos().stream().filter(desc ->
                 !desc.getNodeControllerId().equals(NodeConfiguration.getNodeControllerId()))
-                .collect(Collectors.toList());
+                .collect(Collectors.toList());*/
 
         List<InvocableStreamPipesEntity> candidatesForOffloading = runningPipelineElements.stream()
+                        .filter(entity -> isNotBlacklisted(entity, blacklistedEntities))
                         .filter(InvocableStreamPipesEntity::isPreemption)
-                        .filter(entity -> verifyIfSupportedOnOtherNodes(entity, onlineNodes))
+                        //.filter(entity -> verifyIfSupportedOnOtherNodes(entity, onlineNodes))
                         .collect(Collectors.toList());
 
         if (candidatesForOffloading.isEmpty()) {
-            // TODO what to do when null?
             return null;
         } else {
             return selectPipelineElementWithLowestPriorityScore(candidatesForOffloading);
@@ -76,4 +76,9 @@ public class PrioritySelectionStrategy implements SelectionStrategy{
         }
         return !candidateNodes.isEmpty();
     }
+
+    private boolean isNotBlacklisted(InvocableStreamPipesEntity entity, List<InvocableStreamPipesEntity> blacklist){
+        return blacklist.stream().noneMatch(blacklistedEntity ->
+                blacklistedEntity.getDeploymentRunningInstanceId().equals(entity.getDeploymentRunningInstanceId()));
+    }
 }
diff --git a/streampipes-node-controller/src/main/java/org/apache/streampipes/node/controller/management/offloading/strategies/selection/RandomSelectionStrategy.java b/streampipes-node-controller/src/main/java/org/apache/streampipes/node/controller/management/offloading/strategies/selection/RandomSelectionStrategy.java
index a37289c..e1de78c 100644
--- a/streampipes-node-controller/src/main/java/org/apache/streampipes/node/controller/management/offloading/strategies/selection/RandomSelectionStrategy.java
+++ b/streampipes-node-controller/src/main/java/org/apache/streampipes/node/controller/management/offloading/strategies/selection/RandomSelectionStrategy.java
@@ -27,7 +27,7 @@ import java.util.Random;
 public class RandomSelectionStrategy implements SelectionStrategy{
 
     @Override
-    public InvocableStreamPipesEntity select() {
+    public InvocableStreamPipesEntity select(List<InvocableStreamPipesEntity> blacklistedEntities) {
         List<InvocableStreamPipesEntity> instances = RunningInvocableInstances.INSTANCE.getAll();
         if(instances.size() == 0)
             return null;
diff --git a/streampipes-node-controller/src/main/java/org/apache/streampipes/node/controller/management/offloading/strategies/selection/SelectionStrategy.java b/streampipes-node-controller/src/main/java/org/apache/streampipes/node/controller/management/offloading/strategies/selection/SelectionStrategy.java
index 1964198..54ed1f8 100644
--- a/streampipes-node-controller/src/main/java/org/apache/streampipes/node/controller/management/offloading/strategies/selection/SelectionStrategy.java
+++ b/streampipes-node-controller/src/main/java/org/apache/streampipes/node/controller/management/offloading/strategies/selection/SelectionStrategy.java
@@ -20,6 +20,8 @@ package org.apache.streampipes.node.controller.management.offloading.strategies.
 
 import org.apache.streampipes.model.base.InvocableStreamPipesEntity;
 
+import java.util.List;
+
 public interface SelectionStrategy {
-    InvocableStreamPipesEntity select();
+    InvocableStreamPipesEntity select(List<InvocableStreamPipesEntity> blacklistedEntities);
 }
diff --git a/streampipes-node-controller/src/main/java/org/apache/streampipes/node/controller/management/pe/handler/PipelineElementOffloadingHandler.java b/streampipes-node-controller/src/main/java/org/apache/streampipes/node/controller/management/pe/handler/PipelineElementOffloadingHandler.java
index 59e6e34..7b8dc5e 100644
--- a/streampipes-node-controller/src/main/java/org/apache/streampipes/node/controller/management/pe/handler/PipelineElementOffloadingHandler.java
+++ b/streampipes-node-controller/src/main/java/org/apache/streampipes/node/controller/management/pe/handler/PipelineElementOffloadingHandler.java
@@ -41,7 +41,8 @@ public class PipelineElementOffloadingHandler implements IHandler<Response> {
     @Override
     public Response handle() {
         String url = generatePipelineManagementOffloadEndpoint();
-        return HttpUtils.post(url, graph);
+        String bearerToken = NodeConfiguration.getNodeApiKey();
+        return HttpUtils.post(url, bearerToken, graph, Response.class);
     }
 
     private String generatePipelineManagementOffloadEndpoint() {
diff --git a/streampipes-node-controller/src/main/java/org/apache/streampipes/node/controller/management/resource/ResourceManager.java b/streampipes-node-controller/src/main/java/org/apache/streampipes/node/controller/management/resource/ResourceManager.java
index 0107388..5710a5c 100644
--- a/streampipes-node-controller/src/main/java/org/apache/streampipes/node/controller/management/resource/ResourceManager.java
+++ b/streampipes-node-controller/src/main/java/org/apache/streampipes/node/controller/management/resource/ResourceManager.java
@@ -49,7 +49,7 @@ public class ResourceManager {
 
     private ResourceManager() {
         //Offloading Policy
-        OffloadingPolicyManager.getInstance().addOffloadingStrategy(new OffloadingStrategyFactory().select());
+        OffloadingPolicyManager.getInstance().addOffloadingStrategies(new OffloadingStrategyFactory().select());
     }
 
     public static ResourceManager getInstance() {
diff --git a/streampipes-node-controller/src/main/java/org/apache/streampipes/node/controller/utils/HttpUtils.java b/streampipes-node-controller/src/main/java/org/apache/streampipes/node/controller/utils/HttpUtils.java
index 71beb71..f1e0536 100644
--- a/streampipes-node-controller/src/main/java/org/apache/streampipes/node/controller/utils/HttpUtils.java
+++ b/streampipes-node-controller/src/main/java/org/apache/streampipes/node/controller/utils/HttpUtils.java
@@ -166,6 +166,35 @@ public class HttpUtils {
         }
     }
 
+    public static <T1, T2> T2 post(String url, String bearerToken, T1 object, Class<T2> clazz) {
+        String body = serialize(object);
+        try {
+            Response response = Request.Post(url)
+                    .bodyString(body, ContentType.APPLICATION_JSON)
+                    .addHeader("Authorization", "Bearer " + bearerToken)
+                    .connectTimeout(CONNECT_TIMEOUT)
+                    .execute();
+
+            if (clazz.isInstance(Response.class)) {
+                return (T2) response;
+            } else if (clazz.isInstance(String.class)) {
+                return (T2) response.returnContent().asString();
+            } else if (clazz.isAssignableFrom(byte[].class)) {
+                return (T2) response.returnContent().asBytes();
+            } else if (clazz.isAssignableFrom(Boolean.class)) {
+                return (T2) Boolean.TRUE;
+            } else {
+                return deserialize(response, clazz);
+            }
+        } catch (IOException e) {
+            if (clazz.isAssignableFrom(Boolean.class)) {
+                return (T2) Boolean.FALSE;
+            }
+
+            throw new SpRuntimeException("Something went wrong during POST request", e);
+        }
+    }
+
     public static <T> org.apache.streampipes.model.Response post(String url, T object) {
         String body = serialize(object);
         org.apache.streampipes.model.Response response = new org.apache.streampipes.model.Response();
@@ -243,6 +272,9 @@ public class HttpUtils {
     public static <T>T deserialize(Response response, Class<T> clazz) {
         try {
             String responseString = response.returnContent().asString();
+            //TODO: Fix issue when starting connect adapters (& remove quick fix)
+            if(clazz.equals(String.class))
+                return (T) responseString;
             return JacksonSerializer
                     .getObjectMapper()
                     .readValue(responseString, clazz);
diff --git a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/migration/MigrationPipelineGenerator.java b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/migration/MigrationPipelineGenerator.java
index 2d01aad..ad5614a 100644
--- a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/migration/MigrationPipelineGenerator.java
+++ b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/migration/MigrationPipelineGenerator.java
@@ -61,10 +61,10 @@ public class MigrationPipelineGenerator {
         eligibleTargetNodes = filterResourceUtilization(eligibleTargetNodes);
 
         //Different strategies possible (atm cancel offloading)
-        if(eligibleTargetNodes == null || eligibleTargetNodes.isEmpty())
+        if(eligibleTargetNodes.isEmpty())
             return null;
 
-        //Random Selection of new node within the remaining eligibile nodes
+        //Random Selection of new node within the remaining eligible nodes
         randomSelectionAndUpdate(eligibleTargetNodes);
 
         return generateTargetPipeline();
@@ -72,14 +72,11 @@ public class MigrationPipelineGenerator {
 
     private List<NodeInfoDescription> findEligibleTargetNodes(){
 
-        List<NodeInfoDescription> eligibileTargetNodes = new ArrayList<>();
         List<NodeInfoDescription> onlineNodes = NodeManagement.getInstance().getOnlineNodes();
 
-        onlineNodes.stream()
+        return onlineNodes.stream()
                 .filter(this::matchAndVerify)
-                .map(eligibileTargetNodes::add);
-
-        return eligibileTargetNodes;
+                .collect(Collectors.toList());
     }
 
     private boolean matchAndVerify(NodeInfoDescription node) {
@@ -111,8 +108,12 @@ public class MigrationPipelineGenerator {
             String nodeControllerId = nodeInfo.getNodeControllerId();
             Queue<ResourceMetrics> rmHistory = ClusterResourceMonitor.getNodeResourceMetricsById(nodeControllerId);
 
-            if(rmHistory == null)
-                return null;
+            if(rmHistory == null){
+                //If no RessourceMetrics history is available (e.g. shorly after Backend start), consider node as
+                // possible target node
+                filteredTargetNodes.add(nodeInfo);
+                continue;
+            }
 
             Hardware hardware = entityToMigrate.getResourceRequirements().stream()
                     .filter(nodeRR -> nodeRR instanceof Hardware)
@@ -127,6 +128,8 @@ public class MigrationPipelineGenerator {
                         && hardware.getMemory() <= MEM_MULTIPLICATION_FACTOR * rmHistory.peek().getFreeMemoryInBytes()) {
                     filteredTargetNodes.add(nodeInfo);
                 }
+            } else{
+                filteredTargetNodes.add(nodeInfo);
             }
         }
         return filteredTargetNodes;