You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@streampipes.apache.org by go...@apache.org on 2021/05/10 21:03:15 UTC

[incubator-streampipes] 04/07: Refactored and standardized offloading

This is an automated email from the ASF dual-hosted git repository.

gomm pushed a commit to branch edge-extensions
in repository https://gitbox.apache.org/repos/asf/incubator-streampipes.git

commit d27684af4bb2c70c8aa96a1301d334ce7f97fc47
Author: Daniel Gomm <da...@outlook.de>
AuthorDate: Thu May 6 13:43:36 2021 +0200

    Refactored and standardized offloading
---
 .../backend/StreamPipesBackendApplication.java     |  2 +
 .../model/message/NotificationType.java            |  1 +
 .../model/resource}/ResourceMetrics.java           |  2 +-
 .../offloading/OffloadingPolicyManager.java        | 17 +++-
 .../offloading/model/OffloadingStrategy.java       |  7 ++
 .../model/OffloadingStrategyFactory.java           | 59 +++++++++++++
 .../model/property/CPULoadResourceProperty.java    |  2 +-
 .../property/FreeDiskSpaceResourceProperty.java    |  2 +-
 .../model/property/FreeMemoryResourceProperty.java |  2 +-
 .../model/property/ResourceProperty.java           |  2 +-
 .../management/pe/InvocableElementManager.java     | 57 ++++++++++---
 .../management/resource/ResourceManager.java       | 12 +--
 .../migration/MigrationPipelineGenerator.java      | 99 ++++++++++++++++++----
 .../manager/node/StreamPipesClusterManager.java    |  7 +-
 .../resources/ClusterResourceManager.java          | 95 ++++++++++++---------
 .../org/apache/streampipes/rest/impl/Node.java     |  4 -
 16 files changed, 277 insertions(+), 93 deletions(-)

diff --git a/streampipes-backend/src/main/java/org/apache/streampipes/backend/StreamPipesBackendApplication.java b/streampipes-backend/src/main/java/org/apache/streampipes/backend/StreamPipesBackendApplication.java
index 899eb8d..aced851 100644
--- a/streampipes-backend/src/main/java/org/apache/streampipes/backend/StreamPipesBackendApplication.java
+++ b/streampipes-backend/src/main/java/org/apache/streampipes/backend/StreamPipesBackendApplication.java
@@ -20,6 +20,7 @@ package org.apache.streampipes.backend;
 import org.apache.shiro.web.env.EnvironmentLoaderListener;
 import org.apache.shiro.web.servlet.OncePerRequestFilter;
 import org.apache.shiro.web.servlet.ShiroFilter;
+import org.apache.streampipes.manager.node.management.resources.ClusterResourceManager;
 import org.apache.streampipes.manager.operations.Operations;
 import org.apache.streampipes.model.pipeline.Pipeline;
 import org.apache.streampipes.model.pipeline.PipelineOperationStatus;
@@ -60,6 +61,7 @@ public class StreamPipesBackendApplication {
     ScheduledExecutorService executorService = Executors.newSingleThreadScheduledExecutor();
 
     executorService.schedule(this::startAllPreviouslyStoppedPipelines, 5, TimeUnit.SECONDS);
+    executorService.scheduleAtFixedRate(ClusterResourceManager.getInstance()::checkResources, 30l, 60l, TimeUnit.SECONDS);
   }
 
   @PreDestroy
diff --git a/streampipes-model/src/main/java/org/apache/streampipes/model/message/NotificationType.java b/streampipes-model/src/main/java/org/apache/streampipes/model/message/NotificationType.java
index 270d77d..a0a8e7d 100644
--- a/streampipes-model/src/main/java/org/apache/streampipes/model/message/NotificationType.java
+++ b/streampipes-model/src/main/java/org/apache/streampipes/model/message/NotificationType.java
@@ -33,6 +33,7 @@ public enum NotificationType {
 	NO_MATCHING_PROTOCOL_CONNECTION("Not a valid connection", "No supported input protocol matches provided output protocol"),
 	REMOTE_SERVER_NOT_ACCESSIBLE("Can't connect to remote server", "Please contact the admin of the system"),
 	NO_MATCHING_SCHEME("The JSON from the server is not valid", "The keys in the element description don't map the keys in the JSON response"),
+	NO_NODE_FOUND("No node found", "No node fulfilling the requirements is available"),
 
 	LOGIN_FAILED("Login failed", "Please re-enter your password"),
 	LOGIN_SUCCESS("Login success", ""),
diff --git a/streampipes-node-controller/src/main/java/org/apache/streampipes/node/controller/management/resource/model/ResourceMetrics.java b/streampipes-model/src/main/java/org/apache/streampipes/model/resource/ResourceMetrics.java
similarity index 97%
rename from streampipes-node-controller/src/main/java/org/apache/streampipes/node/controller/management/resource/model/ResourceMetrics.java
rename to streampipes-model/src/main/java/org/apache/streampipes/model/resource/ResourceMetrics.java
index ddaaff6..a70bb4b 100644
--- a/streampipes-node-controller/src/main/java/org/apache/streampipes/node/controller/management/resource/model/ResourceMetrics.java
+++ b/streampipes-model/src/main/java/org/apache/streampipes/model/resource/ResourceMetrics.java
@@ -15,7 +15,7 @@
  * limitations under the License.
  *
  */
-package org.apache.streampipes.node.controller.management.resource.model;
+package org.apache.streampipes.model.resource;
 
 public class ResourceMetrics {
 
diff --git a/streampipes-node-controller/src/main/java/org/apache/streampipes/node/controller/management/offloading/OffloadingPolicyManager.java b/streampipes-node-controller/src/main/java/org/apache/streampipes/node/controller/management/offloading/OffloadingPolicyManager.java
index f041695..ed0d922 100644
--- a/streampipes-node-controller/src/main/java/org/apache/streampipes/node/controller/management/offloading/OffloadingPolicyManager.java
+++ b/streampipes-node-controller/src/main/java/org/apache/streampipes/node/controller/management/offloading/OffloadingPolicyManager.java
@@ -18,9 +18,13 @@
 
 package org.apache.streampipes.node.controller.management.offloading;
 
+import org.apache.streampipes.model.Response;
+import org.apache.streampipes.model.base.InvocableStreamPipesEntity;
 import org.apache.streampipes.node.controller.management.offloading.model.OffloadingStrategy;
 import org.apache.streampipes.node.controller.management.pe.InvocableElementManager;
-import org.apache.streampipes.node.controller.management.resource.model.ResourceMetrics;
+import org.apache.streampipes.model.resource.ResourceMetrics;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import java.util.ArrayList;
 import java.util.List;
@@ -29,6 +33,7 @@ public class OffloadingPolicyManager {
 
     private final List<OffloadingStrategy<?>> offloadingStrategies = new ArrayList<>();
     private static OffloadingPolicyManager instance;
+    private static final Logger LOG = LoggerFactory.getLogger(OffloadingPolicyManager.class.getCanonicalName());
 
     public static OffloadingPolicyManager getInstance(){
         if(instance == null){
@@ -41,7 +46,15 @@ public class OffloadingPolicyManager {
         for(OffloadingStrategy strategy:offloadingStrategies){
             strategy.getOffloadingPolicy().addValue(strategy.getResourceProperty().getProperty(rm));
             if(strategy.getOffloadingPolicy().isViolated()){
-                InvocableElementManager.getInstance().postOffloadRequest(strategy.getSelectionStrategy().selectEntity());
+                InvocableStreamPipesEntity offloadEntity = strategy.getSelectionStrategy().selectEntity();
+                if(offloadEntity != null){
+                    Response resp = InvocableElementManager.getInstance().postOffloadRequest(offloadEntity);
+                    if(resp.isSuccess())
+                        LOG.info("Successfully offloaded: " + offloadEntity.getAppId()
+                                + " from Pipeline: " + offloadEntity.getCorrespondingPipeline());
+                    else LOG.info("Failed to offload: " + offloadEntity.getAppId()
+                            + " from Pipeline: " + offloadEntity.getCorrespondingPipeline());
+                }else LOG.info("No entity to offload found");
             }
         }
     }
diff --git a/streampipes-node-controller/src/main/java/org/apache/streampipes/node/controller/management/offloading/model/OffloadingStrategy.java b/streampipes-node-controller/src/main/java/org/apache/streampipes/node/controller/management/offloading/model/OffloadingStrategy.java
index 8a6218d..d359508 100644
--- a/streampipes-node-controller/src/main/java/org/apache/streampipes/node/controller/management/offloading/model/OffloadingStrategy.java
+++ b/streampipes-node-controller/src/main/java/org/apache/streampipes/node/controller/management/offloading/model/OffloadingStrategy.java
@@ -21,17 +21,24 @@ package org.apache.streampipes.node.controller.management.offloading.model;
 import org.apache.streampipes.node.controller.management.offloading.model.policies.OffloadingPolicy;
 import org.apache.streampipes.node.controller.management.offloading.model.property.ResourceProperty;
 import org.apache.streampipes.node.controller.management.offloading.model.selection.SelectionStrategy;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 public class OffloadingStrategy<T> {
     private SelectionStrategy selectionStrategy;
     private OffloadingPolicy<T> offloadingPolicy;
     private ResourceProperty<T> resourceProperty;
+    private static final Logger LOG = LoggerFactory.getLogger(OffloadingStrategy.class.getCanonicalName());
 
     public OffloadingStrategy(OffloadingPolicy offloadingPolicy, ResourceProperty resourceProperty,
                               SelectionStrategy selectionStrategy){
         this.offloadingPolicy = offloadingPolicy;
         this.resourceProperty = resourceProperty;
         this.selectionStrategy = selectionStrategy;
+        LOG.info("Registered offloading strategy: "
+                + this.selectionStrategy.getClass().getSimpleName() + " | "
+                + this.resourceProperty.getClass().getSimpleName() + " | "
+                + this.offloadingPolicy.getClass().getSimpleName());
     }
 
     public SelectionStrategy getSelectionStrategy() {
diff --git a/streampipes-node-controller/src/main/java/org/apache/streampipes/node/controller/management/offloading/model/OffloadingStrategyFactory.java b/streampipes-node-controller/src/main/java/org/apache/streampipes/node/controller/management/offloading/model/OffloadingStrategyFactory.java
new file mode 100644
index 0000000..19e852e
--- /dev/null
+++ b/streampipes-node-controller/src/main/java/org/apache/streampipes/node/controller/management/offloading/model/OffloadingStrategyFactory.java
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.streampipes.node.controller.management.offloading.model;
+
+import org.apache.streampipes.node.controller.management.offloading.model.policies.Comparator;
+import org.apache.streampipes.node.controller.management.offloading.model.policies.OffloadingPolicy;
+import org.apache.streampipes.node.controller.management.offloading.model.policies.ThresholdViolationOffloadingPolicy;
+import org.apache.streampipes.node.controller.management.offloading.model.property.CPULoadResourceProperty;
+import org.apache.streampipes.node.controller.management.offloading.model.property.FreeDiskSpaceResourceProperty;
+import org.apache.streampipes.node.controller.management.offloading.model.property.FreeMemoryResourceProperty;
+import org.apache.streampipes.node.controller.management.offloading.model.selection.CPULoadSelectionStrategy;
+import org.apache.streampipes.node.controller.management.offloading.model.selection.RandomSelectionStrategy;
+import org.apache.streampipes.node.controller.management.offloading.model.selection.SelectionStrategy;
+
+public class OffloadingStrategyFactory {
+
+    public OffloadingStrategy getFromEnv(){
+        if(System.getenv("SP_AUTO_OFFLOADING_POLICY") == null || System.getenv("SP_AUTO_OFFLOADING_POLICY").isEmpty()){
+            return new OffloadingStrategy<Float>(new ThresholdViolationOffloadingPolicy<>(5, Comparator.GREATER,90f, 4),
+                    new CPULoadResourceProperty(), new RandomSelectionStrategy());
+        }else{
+            switch (System.getenv("SP_AUTO_OFFLOADING_POLICY")){
+                case "CPU":
+                    return new OffloadingStrategy<Float>(new ThresholdViolationOffloadingPolicy<>(5, Comparator.GREATER,90f, 4),
+                            new CPULoadResourceProperty(), new RandomSelectionStrategy());
+                case "debug":
+                    return new OffloadingStrategy<Float>(new ThresholdViolationOffloadingPolicy<>(5,
+                            Comparator.GREATER,0.5f, 3),
+                            new CPULoadResourceProperty(), new RandomSelectionStrategy());
+                case "memory":
+                    return new OffloadingStrategy<Long>(new ThresholdViolationOffloadingPolicy<>(5, Comparator.SMALLER,
+                            549755813888l, 5),
+                            new FreeMemoryResourceProperty(), new RandomSelectionStrategy());
+                case "disk space":
+                    return new OffloadingStrategy<Long>(new ThresholdViolationOffloadingPolicy<>(5, Comparator.SMALLER,
+                            549755813888l, 5),
+                            new FreeDiskSpaceResourceProperty(), new RandomSelectionStrategy());
+            }
+        }
+        return null;
+    }
+
+}
diff --git a/streampipes-node-controller/src/main/java/org/apache/streampipes/node/controller/management/offloading/model/property/CPULoadResourceProperty.java b/streampipes-node-controller/src/main/java/org/apache/streampipes/node/controller/management/offloading/model/property/CPULoadResourceProperty.java
index 7beb720..b695c4f 100644
--- a/streampipes-node-controller/src/main/java/org/apache/streampipes/node/controller/management/offloading/model/property/CPULoadResourceProperty.java
+++ b/streampipes-node-controller/src/main/java/org/apache/streampipes/node/controller/management/offloading/model/property/CPULoadResourceProperty.java
@@ -18,7 +18,7 @@
 
 package org.apache.streampipes.node.controller.management.offloading.model.property;
 
-import org.apache.streampipes.node.controller.management.resource.model.ResourceMetrics;
+import org.apache.streampipes.model.resource.ResourceMetrics;
 
 public class CPULoadResourceProperty implements ResourceProperty<Float>{
 
diff --git a/streampipes-node-controller/src/main/java/org/apache/streampipes/node/controller/management/offloading/model/property/FreeDiskSpaceResourceProperty.java b/streampipes-node-controller/src/main/java/org/apache/streampipes/node/controller/management/offloading/model/property/FreeDiskSpaceResourceProperty.java
index 840de8d..31e7ab8 100644
--- a/streampipes-node-controller/src/main/java/org/apache/streampipes/node/controller/management/offloading/model/property/FreeDiskSpaceResourceProperty.java
+++ b/streampipes-node-controller/src/main/java/org/apache/streampipes/node/controller/management/offloading/model/property/FreeDiskSpaceResourceProperty.java
@@ -18,7 +18,7 @@
 
 package org.apache.streampipes.node.controller.management.offloading.model.property;
 
-import org.apache.streampipes.node.controller.management.resource.model.ResourceMetrics;
+import org.apache.streampipes.model.resource.ResourceMetrics;
 
 public class FreeDiskSpaceResourceProperty implements ResourceProperty<Long>{
 
diff --git a/streampipes-node-controller/src/main/java/org/apache/streampipes/node/controller/management/offloading/model/property/FreeMemoryResourceProperty.java b/streampipes-node-controller/src/main/java/org/apache/streampipes/node/controller/management/offloading/model/property/FreeMemoryResourceProperty.java
index 11c5ce3..e58c4a3 100644
--- a/streampipes-node-controller/src/main/java/org/apache/streampipes/node/controller/management/offloading/model/property/FreeMemoryResourceProperty.java
+++ b/streampipes-node-controller/src/main/java/org/apache/streampipes/node/controller/management/offloading/model/property/FreeMemoryResourceProperty.java
@@ -18,7 +18,7 @@
 
 package org.apache.streampipes.node.controller.management.offloading.model.property;
 
-import org.apache.streampipes.node.controller.management.resource.model.ResourceMetrics;
+import org.apache.streampipes.model.resource.ResourceMetrics;
 
 public class FreeMemoryResourceProperty implements ResourceProperty<Long> {
     @Override
diff --git a/streampipes-node-controller/src/main/java/org/apache/streampipes/node/controller/management/offloading/model/property/ResourceProperty.java b/streampipes-node-controller/src/main/java/org/apache/streampipes/node/controller/management/offloading/model/property/ResourceProperty.java
index 9b61663..13abc3f 100644
--- a/streampipes-node-controller/src/main/java/org/apache/streampipes/node/controller/management/offloading/model/property/ResourceProperty.java
+++ b/streampipes-node-controller/src/main/java/org/apache/streampipes/node/controller/management/offloading/model/property/ResourceProperty.java
@@ -18,7 +18,7 @@
 
 package org.apache.streampipes.node.controller.management.offloading.model.property;
 
-import org.apache.streampipes.node.controller.management.resource.model.ResourceMetrics;
+import org.apache.streampipes.model.resource.ResourceMetrics;
 
 public interface ResourceProperty<T> {
     T getProperty(ResourceMetrics resourceMetrics);
diff --git a/streampipes-node-controller/src/main/java/org/apache/streampipes/node/controller/management/pe/InvocableElementManager.java b/streampipes-node-controller/src/main/java/org/apache/streampipes/node/controller/management/pe/InvocableElementManager.java
index 8f25aa7..1a318d9 100644
--- a/streampipes-node-controller/src/main/java/org/apache/streampipes/node/controller/management/pe/InvocableElementManager.java
+++ b/streampipes-node-controller/src/main/java/org/apache/streampipes/node/controller/management/pe/InvocableElementManager.java
@@ -167,15 +167,13 @@ public class InvocableElementManager implements IPipelineElementLifeCycle {
         return response;
     }
 
-    public void postOffloadRequest(InvocableStreamPipesEntity instanceToOffload){
+    public Response postOffloadRequest(InvocableStreamPipesEntity instanceToOffload){
         try {
             String url = generateBackendOffloadEndpoint();
             String desc = toJson(instanceToOffload);
-            Request.Post(url)
-                    .bodyString(desc, ContentType.APPLICATION_JSON)
-                    .execute();
+            return handleResponse(Request.Post(url).bodyString(desc, ContentType.APPLICATION_JSON).execute());
         } catch (IOException e) {
-            e.printStackTrace();
+            throw new SpRuntimeException(e);
         }
     }
 
@@ -275,14 +273,49 @@ public class InvocableElementManager implements IPipelineElementLifeCycle {
     }
 
     private List<ConsumableStreamPipesEntity> getSupportedEntities(InvocableRegistration registration){
-        //Check if the node supports the entity (atm checks if it has a GPU) TODO: simplify; check for other Hardware
-        return registration.getSupportedPipelineElements().stream().filter(nse -> (!nse.getResourceRequirements().stream()
+        //Check if the node supports the entity (atm only hardware requirements; could be expanded to include
+        // software requirements)
+        return registration.getSupportedPipelineElements().stream()
+                .filter(this::checkGpuRequirement)
+                .filter(this::checkCpuRequirement)
+                .filter(this::checkMemoryRequirement)
+                //.filter(this::checkDiskSpaceRequirement) currently does not work (node diskspace is falsely set to 0
+                // in Node Hardware Resources, regardless of actual free diskspace) TODO: fix this issue
+                    .collect(Collectors.toList());
+    }
+
+    private boolean checkGpuRequirement(ConsumableStreamPipesEntity spEntity){
+        boolean entityRequiresGpu = spEntity.getResourceRequirements().stream()
                 .filter(req -> req instanceof Hardware).filter(req -> ((Hardware) req).isGpu()).map(req -> ((Hardware) req)
-                        .isGpu()).findFirst().orElse(false) ||
-                nse.getResourceRequirements().stream().filter(req -> req instanceof Hardware)
-                        .filter(req -> ((Hardware) req).isGpu()).map(req -> ((Hardware) req)
-                        .isGpu()).findFirst().orElse(false) == getNodeInfoDescription()
-                .getNodeResources().getHardwareResource().getGpu().getCores()>0)).collect(Collectors.toList());
+                        .isGpu()).findFirst().orElse(false);
+        boolean nodeHasGPU = (getNodeInfoDescription().getNodeResources().getHardwareResource().getGpu().getCores()>0);
+        return (!entityRequiresGpu || nodeHasGPU);
+    }
+
+    private boolean checkCpuRequirement(ConsumableStreamPipesEntity spEntity){
+        int requiredCPUCores = spEntity.getResourceRequirements().stream()
+                .filter(req -> req instanceof Hardware).filter(req -> ((Hardware) req).isGpu())
+                .map(req -> ((Hardware) req).getCpuCores()).findFirst().orElse(0);
+        int nodeCPUCores = getNodeInfoDescription().getNodeResources().getHardwareResource().getCpu().getCores();
+        return (requiredCPUCores <= nodeCPUCores);
+    }
+
+    private boolean checkMemoryRequirement(ConsumableStreamPipesEntity spEntity){
+        long requiredMemory = spEntity.getResourceRequirements().stream()
+                .filter(req -> req instanceof Hardware).filter(req -> ((Hardware) req).isGpu())
+                .map(req -> ((Hardware) req).getMemory()).findFirst().orElse(0l);
+        //Looks at total memory (could be adjusted to consider currently available Memory)
+        long actualNodeMemory = getNodeInfoDescription().getNodeResources().getHardwareResource().getMemory().getMemTotal();
+        return (requiredMemory <= actualNodeMemory);
+    }
+
+    private boolean checkDiskSpaceRequirement(ConsumableStreamPipesEntity spEntity){
+        long requiredDiskSpace = spEntity.getResourceRequirements().stream()
+                .filter(req -> req instanceof Hardware).filter(req -> ((Hardware) req).isGpu())
+                .map(req -> ((Hardware) req).getDisk()).findFirst().orElse(0l);
+        //Looks at total diskspace (could be adjusted to consider currently available diskspace)
+        long actualNodeDiskSpace = getNodeInfoDescription().getNodeResources().getHardwareResource().getDisk().getDiskTotal();
+        return (requiredDiskSpace <= actualNodeDiskSpace);
     }
 
     private List<InvocableStreamPipesEntity> getAllInvocables() {
diff --git a/streampipes-node-controller/src/main/java/org/apache/streampipes/node/controller/management/resource/ResourceManager.java b/streampipes-node-controller/src/main/java/org/apache/streampipes/node/controller/management/resource/ResourceManager.java
index a65039a..c3df5b9 100644
--- a/streampipes-node-controller/src/main/java/org/apache/streampipes/node/controller/management/resource/ResourceManager.java
+++ b/streampipes-node-controller/src/main/java/org/apache/streampipes/node/controller/management/resource/ResourceManager.java
@@ -21,12 +21,8 @@ import com.fasterxml.jackson.core.JsonProcessingException;
 import org.apache.streampipes.commons.exceptions.SpRuntimeException;
 import org.apache.streampipes.node.controller.config.NodeConfiguration;
 import org.apache.streampipes.node.controller.management.offloading.OffloadingPolicyManager;
-import org.apache.streampipes.node.controller.management.offloading.model.OffloadingStrategy;
-import org.apache.streampipes.node.controller.management.offloading.model.policies.Comparator;
-import org.apache.streampipes.node.controller.management.offloading.model.policies.ThresholdViolationOffloadingPolicy;
-import org.apache.streampipes.node.controller.management.offloading.model.property.CPULoadResourceProperty;
-import org.apache.streampipes.node.controller.management.offloading.model.selection.RandomSelectionStrategy;
-import org.apache.streampipes.node.controller.management.resource.model.ResourceMetrics;
+import org.apache.streampipes.node.controller.management.offloading.model.OffloadingStrategyFactory;
+import org.apache.streampipes.model.resource.ResourceMetrics;
 import org.apache.streampipes.node.controller.management.resource.utils.DiskSpace;
 import org.apache.streampipes.node.controller.management.resource.utils.ResourceUtils;
 import org.apache.streampipes.serializers.json.JacksonSerializer;
@@ -53,9 +49,7 @@ public class ResourceManager {
 
     private ResourceManager() {
         //Offloading Policy
-        OffloadingPolicyManager.getInstance().addOffloadingStrategy(new OffloadingStrategy<Float>(new
-                ThresholdViolationOffloadingPolicy<>(5, Comparator.GREATER,60f, 5),
-                new CPULoadResourceProperty(), new RandomSelectionStrategy()));
+        OffloadingPolicyManager.getInstance().addOffloadingStrategy(new OffloadingStrategyFactory().getFromEnv());
     }
 
     public static ResourceManager getInstance() {
diff --git a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/migration/MigrationPipelineGenerator.java b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/migration/MigrationPipelineGenerator.java
index b8a688e..0545d22 100644
--- a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/migration/MigrationPipelineGenerator.java
+++ b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/migration/MigrationPipelineGenerator.java
@@ -19,42 +19,100 @@ package org.apache.streampipes.manager.migration;
 
 import com.fasterxml.jackson.core.JsonProcessingException;
 import org.apache.streampipes.manager.node.StreamPipesClusterManager;
+import org.apache.streampipes.manager.node.management.resources.ClusterResourceManager;
 import org.apache.streampipes.model.base.InvocableStreamPipesEntity;
 import org.apache.streampipes.model.graph.DataProcessorInvocation;
 import org.apache.streampipes.model.node.NodeInfoDescription;
 import org.apache.streampipes.model.pipeline.Pipeline;
+import org.apache.streampipes.model.resource.Hardware;
+import org.apache.streampipes.model.resource.ResourceMetrics;
 
 
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Optional;
-import java.util.Random;
+import java.util.*;
+import java.util.stream.Collectors;
 
 public class MigrationPipelineGenerator {
 
-    public static Pipeline generateMigrationPipeline(InvocableStreamPipesEntity entityToMigrate, Pipeline correspondingPipeline){
+    private InvocableStreamPipesEntity entityToMigrate;
+    private Pipeline correspondingPipeline;
 
+    public MigrationPipelineGenerator(InvocableStreamPipesEntity entityToMigrate, Pipeline correspondingPipeline){
+        this.entityToMigrate = entityToMigrate;
+        this.correspondingPipeline = correspondingPipeline;
+    }
+
+
+    public Pipeline generateMigrationPipeline(){
+
+        List<NodeInfoDescription> possibleTargetNodes = getNodeInfos();
+
+        switch(correspondingPipeline.getExecutionPolicy()){
+            case "custom":
+                possibleTargetNodes = filterLocationTags(possibleTargetNodes);
+            case "locality-aware":
+                //TODO: incorporate strategy for locality-aware deployment
+            case "default":
+                //TODO: incorporate strategy for default deployment
+        }
+
+        //Check current resource utilization on node
+        possibleTargetNodes = filterResourceUtilization(possibleTargetNodes);
+
+
+        //Different strategies possible (atm cancel offloading)
+        if(possibleTargetNodes == null || possibleTargetNodes.isEmpty())
+            return null;
+
+        //Random Selection of new Node within the remaining possible nodes
+        changeEntityDescriptionToMatchRandomNode(possibleTargetNodes);
+
+        return generateTargetPipeline();
+    }
+
+    private List<NodeInfoDescription> getNodeInfos(){
         List<NodeInfoDescription> possibleTargetNodes = new ArrayList<>();
         List<NodeInfoDescription> nodeInfo = StreamPipesClusterManager.getAllActiveAndHealthyNodes();
         nodeInfo.forEach(desc ->{
             if(desc.getSupportedElements().stream().anyMatch(element -> element.equals(entityToMigrate.getAppId()))
-                && !desc.getNodeControllerId().equals(entityToMigrate.getDeploymentTargetNodeId()))
+                    && !desc.getNodeControllerId().equals(entityToMigrate.getDeploymentTargetNodeId()))
                 possibleTargetNodes.add(desc);
         });
+        return possibleTargetNodes;
+    }
 
-        if(possibleTargetNodes.isEmpty())
-            return null;
+    private List<NodeInfoDescription> filterLocationTags(List<NodeInfoDescription> possibleTargetNodes){
+        return possibleTargetNodes.stream()
+                .filter(desc -> nodeTagsContainElementTag(correspondingPipeline.getNodeTags(), desc))
+                .collect(Collectors.toList());
+    }
 
-        //Choose random node; should be adjusted to seek for a proper node to migrate to (e.g. based on user e.g.
-        // selected labels, locality, free resources,...)
-        NodeInfoDescription targetNode = possibleTargetNodes.get(new Random().nextInt(possibleTargetNodes.size()));
+    private boolean nodeTagsContainElementTag(List<String> pipelineNodeTags,
+                                              NodeInfoDescription desc){
+        return desc.getStaticNodeMetadata().getLocationTags().stream().anyMatch(pipelineNodeTags::contains);
+    }
 
-        entityToMigrate.setDeploymentTargetNodeHostname(targetNode.getHostname());
-        entityToMigrate.setDeploymentTargetNodeId(targetNode.getNodeControllerId());
-        entityToMigrate.setDeploymentTargetNodePort(targetNode.getPort());
-        entityToMigrate.setElementEndpointHostname(targetNode.getHostname());
-        entityToMigrate.setElementEndpointPort(targetNode.getPort());
+    private List<NodeInfoDescription> filterResourceUtilization(List<NodeInfoDescription> possibleTargetNodes){
+        //Currently only checking for free disk space and memory
+        List<NodeInfoDescription> filteredTargetNodes = new ArrayList<>();
+        for(NodeInfoDescription nodeInfo : possibleTargetNodes){
+            Queue<ResourceMetrics> rmHistory = ClusterResourceManager.getResourceMetricsMap()
+                    .get(nodeInfo.getNodeControllerId());
+            if(rmHistory == null) return null;
+            Hardware hardware = entityToMigrate.getResourceRequirements().stream()
+                            .filter(nodeRR -> nodeRR instanceof Hardware).map(nodeRR -> (Hardware)nodeRR).findFirst().
+                            orElse(null);
+            if(hardware != null){
+                if (rmHistory.peek() != null
+                        && hardware.getDisk() <= rmHistory.peek().getFreeDiskSpaceInBytes()
+                        && hardware.getMemory() <= rmHistory.peek().getFreeMemoryInBytes()) {
+                    filteredTargetNodes.add(nodeInfo);
+                }
+            }
+        }
+        return filteredTargetNodes;
+    }
 
+    private Pipeline generateTargetPipeline(){
         Optional<DataProcessorInvocation> originalInvocation =
                 correspondingPipeline.getSepas().stream().filter(dp ->
                         dp.getDeploymentRunningInstanceId().equals(entityToMigrate.getDeploymentRunningInstanceId()))
@@ -75,4 +133,13 @@ public class MigrationPipelineGenerator {
         return targetPipeline;
     }
 
+    private void changeEntityDescriptionToMatchRandomNode(List<NodeInfoDescription> nodes){
+        NodeInfoDescription targetNode = nodes.get(new Random().nextInt(nodes.size()));
+
+        entityToMigrate.setDeploymentTargetNodeHostname(targetNode.getHostname());
+        entityToMigrate.setDeploymentTargetNodeId(targetNode.getNodeControllerId());
+        entityToMigrate.setDeploymentTargetNodePort(targetNode.getPort());
+        entityToMigrate.setElementEndpointHostname(targetNode.getHostname());
+        entityToMigrate.setElementEndpointPort(targetNode.getPort());
+    }
 }
diff --git a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/node/StreamPipesClusterManager.java b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/node/StreamPipesClusterManager.java
index 3ab464b..f792197 100644
--- a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/node/StreamPipesClusterManager.java
+++ b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/node/StreamPipesClusterManager.java
@@ -170,11 +170,10 @@ public class StreamPipesClusterManager extends AbstractClusterManager {
 
     public static Message handleOffloadRequest(InvocableStreamPipesEntity elementToMigrate) {
         Pipeline currentPipeline = getPipelineStorageApi().getPipeline(elementToMigrate.getCorrespondingPipeline());
-        Pipeline offloadPipeline = MigrationPipelineGenerator.generateMigrationPipeline(elementToMigrate,
-                currentPipeline);
-        //TODO: Handle this case properly
+        Pipeline offloadPipeline = new MigrationPipelineGenerator(elementToMigrate, currentPipeline)
+                .generateMigrationPipeline();
         if(offloadPipeline == null)
-            return Notifications.error(NotificationType.UNKNOWN_ERROR);
+            return Notifications.error(NotificationType.NO_NODE_FOUND);
 
         try {
             PipelineOperationStatus status = Operations.handlePipelineElementMigration(offloadPipeline,
diff --git a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/node/management/resources/ClusterResourceManager.java b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/node/management/resources/ClusterResourceManager.java
index 7b75c19..47cea26 100644
--- a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/node/management/resources/ClusterResourceManager.java
+++ b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/node/management/resources/ClusterResourceManager.java
@@ -17,21 +17,29 @@
  */
 package org.apache.streampipes.manager.node.management.resources;
 
+import com.fasterxml.jackson.core.JsonProcessingException;
+import org.apache.http.client.ClientProtocolException;
+import org.apache.http.client.fluent.Request;
+import org.apache.http.client.fluent.Response;
 import org.apache.streampipes.model.node.NodeInfoDescription;
+import org.apache.streampipes.model.resource.ResourceMetrics;
+import org.apache.streampipes.serializers.json.JacksonSerializer;
 import org.apache.streampipes.storage.api.INodeInfoStorage;
 import org.apache.streampipes.storage.management.StorageDispatcher;
 
-import java.net.InetSocketAddress;
-import java.net.MalformedURLException;
-import java.net.Socket;
-import java.net.URL;
+import java.io.IOException;
+import java.net.*;
+import java.util.HashMap;
 import java.util.List;
+import java.util.Map;
+import java.util.Queue;
+import java.util.concurrent.ArrayBlockingQueue;
 
 public class ClusterResourceManager {
 
-    private static final int RESOURCE_RETRIEVE_FREQUENCY_MS = 60000;
-    private static final int SOCKET_TIMEOUT_MS = 500;
+    private static final int SOCKET_TIMEOUT_MS = 1000;
     private static ClusterResourceManager instance = null;
+    private static Map<String, Queue<ResourceMetrics>> resourceMetricsMap = new HashMap<>();
 
     private ClusterResourceManager() {}
 
@@ -45,47 +53,33 @@ public class ClusterResourceManager {
         return instance;
     }
 
-    public void run() {
-        new Thread(getNodes, "nodes").start();
+    public static Map<String, Queue<ResourceMetrics>> getResourceMetricsMap(){
+        return resourceMetricsMap;
     }
 
-    private final Runnable getNodes = () -> {
-        while (true) {
-            try {
-                List<NodeInfoDescription> nodes =  getNodeStorageApi().getAllNodes();
-                if (nodes.size() > 0) {
-                    nodes.forEach(node -> {
-                        try {
-                            URL nodeUrl = generateNodeUrl(node);
-                            // TODO: gather current resources from all active node controller endpoints
-
-                        } catch (MalformedURLException e) {
-                            e.printStackTrace();
-                        }
-                    });
+    public void checkResources(){
+        List<NodeInfoDescription> nodes =  getNodeStorageApi().getAllActiveNodes();
+        if (nodes.size() > 0) {
+            nodes.forEach(node -> {
+                try {
+                    URL nodeUrl = generateNodeUrl(node);
+                    Response resp = Request.Get(nodeUrl.toURI()).socketTimeout(SOCKET_TIMEOUT_MS).execute();
+                    addResourceMetrics(node, extractResourceMetrics(resp.returnContent().asString()));
+                } catch (MalformedURLException e) {
+                    e.printStackTrace();
+                } catch (ClientProtocolException e) {
+                    e.printStackTrace();
+                } catch (IOException e) {
+                    e.printStackTrace();
+                } catch (URISyntaxException e) {
+                    e.printStackTrace();
                 }
-                Thread.sleep(RESOURCE_RETRIEVE_FREQUENCY_MS);
-            } catch (InterruptedException e) {
-                e.printStackTrace();
-            }
+            });
         }
-    };
-
-    private URL generateNodeUrl(NodeInfoDescription desc) throws MalformedURLException {
-        return new URL("http", desc.getHostname(), desc.getPort(), "");
     }
 
-    private boolean healthCheck(URL url) {
-        boolean isAlive = true;
-        try {
-            InetSocketAddress sa = new InetSocketAddress(url.getHost(), url.getPort());
-            Socket ss = new Socket();
-            ss.connect(sa, SOCKET_TIMEOUT_MS);
-            ss.close();
-        } catch(Exception e) {
-            isAlive = false;
-        }
-        return isAlive;
+    private URL generateNodeUrl(NodeInfoDescription desc) throws MalformedURLException {
+        return new URL("http", desc.getHostname(), desc.getPort(), "/api/v2/node/info/resources");
     }
 
     // Helpers
@@ -93,4 +87,23 @@ public class ClusterResourceManager {
     private static INodeInfoStorage getNodeStorageApi() {
         return StorageDispatcher.INSTANCE.getNoSqlStore().getNodeStorage();
     }
+
+    private ResourceMetrics extractResourceMetrics(String rm){
+        try {
+            return JacksonSerializer.getObjectMapper().readValue(rm, ResourceMetrics.class);
+        } catch (JsonProcessingException e) {
+            e.printStackTrace();
+        }
+        return null;
+    }
+
+    private void addResourceMetrics(NodeInfoDescription node, ResourceMetrics rm){
+        if(!resourceMetricsMap.containsKey(node.getNodeControllerId()))
+            resourceMetricsMap.put(node.getNodeControllerId(), new ArrayBlockingQueue<ResourceMetrics>(10));
+        if(!resourceMetricsMap.get(node.getNodeControllerId()).offer(rm)){
+            resourceMetricsMap.get(node.getNodeControllerId()).poll();
+            resourceMetricsMap.get(node.getNodeControllerId()).offer(rm);
+        }
+    }
+
 }
diff --git a/streampipes-rest/src/main/java/org/apache/streampipes/rest/impl/Node.java b/streampipes-rest/src/main/java/org/apache/streampipes/rest/impl/Node.java
index 5ccd75b..e3644c9 100644
--- a/streampipes-rest/src/main/java/org/apache/streampipes/rest/impl/Node.java
+++ b/streampipes-rest/src/main/java/org/apache/streampipes/rest/impl/Node.java
@@ -17,15 +17,11 @@
  */
 package org.apache.streampipes.rest.impl;
 
-import org.apache.streampipes.manager.migration.MigrationPipelineGenerator;
 import org.apache.streampipes.manager.node.StreamPipesClusterManager;
-import org.apache.streampipes.manager.operations.Operations;
 import org.apache.streampipes.model.base.InvocableStreamPipesEntity;
 import org.apache.streampipes.model.message.NotificationType;
 import org.apache.streampipes.model.message.Notifications;
 import org.apache.streampipes.model.node.NodeInfoDescription;
-import org.apache.streampipes.model.pipeline.Pipeline;
-import org.apache.streampipes.model.pipeline.PipelineOperationStatus;
 import org.apache.streampipes.rest.api.INode;
 import org.apache.streampipes.rest.shared.annotation.JacksonSerialized;