You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@streampipes.apache.org by go...@apache.org on 2021/05/10 21:03:18 UTC

[incubator-streampipes] 07/07: Refactoring and added priority based offloading

This is an automated email from the ASF dual-hosted git repository.

gomm pushed a commit to branch edge-extensions
in repository https://gitbox.apache.org/repos/asf/incubator-streampipes.git

commit 589dec6de316e969f6c27f9ca5f343260b3efe07
Author: Daniel Gomm <da...@outlook.de>
AuthorDate: Mon May 10 23:01:14 2021 +0200

    Refactoring and added priority based offloading
---
 .../model/base/InvocableStreamPipesEntity.java     | 22 ++++++
 .../streampipes/model/pipeline/Pipeline.java       |  4 ++
 .../offloading/AutoOffloadingManager.java          | 42 -----------
 .../offloading/OffloadingPolicyManager.java        | 34 +++++++++
 .../model/OffloadingStrategyFactory.java           |  5 +-
 .../model/selection/PrioritySelectionStrategy.java | 69 ++++++++++++++++++
 .../management/pe/InvocableElementManager.java     | 53 ++------------
 .../management/resource/utils/FileSystemType.java  |  3 +-
 .../management/resource/utils/ResourceChecker.java | 82 ++++++++++++++++++++++
 .../management/resource/utils/ResourceUtils.java   |  2 +
 .../migration/MigrationPipelineGenerator.java      | 11 +--
 ui/src/app/core-model/gen/streampipes-model.ts     | 12 ++--
 .../save-pipeline/save-pipeline.component.ts       | 11 +++
 13 files changed, 250 insertions(+), 100 deletions(-)

diff --git a/streampipes-model/src/main/java/org/apache/streampipes/model/base/InvocableStreamPipesEntity.java b/streampipes-model/src/main/java/org/apache/streampipes/model/base/InvocableStreamPipesEntity.java
index 457bfe1..08c25a8 100644
--- a/streampipes-model/src/main/java/org/apache/streampipes/model/base/InvocableStreamPipesEntity.java
+++ b/streampipes-model/src/main/java/org/apache/streampipes/model/base/InvocableStreamPipesEntity.java
@@ -99,6 +99,10 @@ public abstract class InvocableStreamPipesEntity extends NamedStreamPipesEntity
   @RdfProperty(StreamPipes.ELEMENT_ENDPOINT_SERVICE_NAME)
   private String elementEndpointServiceName;
 
+  private Integer priorityScore;
+
+  private boolean preemption;
+
   //@RdfProperty(StreamPipes.PE_CONFIGURED)
   private boolean configured;
 
@@ -123,6 +127,8 @@ public abstract class InvocableStreamPipesEntity extends NamedStreamPipesEntity
     this.deploymentRunningInstanceId = other.getDeploymentRunningInstanceId();
     this.elementEndpointServiceName = other.getElementEndpointServiceName();
     this.correspondingUser = other.getCorrespondingUser();
+    this.preemption = other.isPreemption();
+    this.priorityScore = other.getPriorityScore();
     if (other.getStreamRequirements() != null) {
       this.streamRequirements = new Cloner().streams(other.getStreamRequirements());
     }
@@ -296,4 +302,20 @@ public abstract class InvocableStreamPipesEntity extends NamedStreamPipesEntity
   public void setResourceRequirements(List<NodeResourceRequirement> resourceRequirements) {
     this.resourceRequirements = resourceRequirements;
   }
+
+  public Integer getPriorityScore() {
+    return priorityScore;
+  }
+
+  public void setPriorityScore(Integer priorityScore) {
+    this.priorityScore = priorityScore;
+  }
+
+  public boolean isPreemption() {
+    return preemption;
+  }
+
+  public void setPreemption(boolean preemption) {
+    this.preemption = preemption;
+  }
 }
diff --git a/streampipes-model/src/main/java/org/apache/streampipes/model/pipeline/Pipeline.java b/streampipes-model/src/main/java/org/apache/streampipes/model/pipeline/Pipeline.java
index e32b229..167417d 100644
--- a/streampipes-model/src/main/java/org/apache/streampipes/model/pipeline/Pipeline.java
+++ b/streampipes-model/src/main/java/org/apache/streampipes/model/pipeline/Pipeline.java
@@ -170,6 +170,8 @@ public class Pipeline extends ElementComposition {
 
   public void setPriorityScore(int priorityScore) {
     this.priorityScore = priorityScore;
+    this.getSepas().forEach(processor -> processor.setPriorityScore(priorityScore));
+    this.getActions().forEach(sink -> sink.setPriorityScore(priorityScore));
   }
 
   public boolean isPreemption() {
@@ -178,6 +180,8 @@ public class Pipeline extends ElementComposition {
 
   public void setPreemption(boolean preemption) {
     this.preemption = preemption;
+    this.getSepas().forEach(processor -> processor.setPreemption(preemption));
+    this.getActions().forEach(sink -> sink.setPreemption(preemption));
   }
 
   public boolean isRestartOnSystemReboot() {
diff --git a/streampipes-node-controller/src/main/java/org/apache/streampipes/node/controller/management/offloading/AutoOffloadingManager.java b/streampipes-node-controller/src/main/java/org/apache/streampipes/node/controller/management/offloading/AutoOffloadingManager.java
deleted file mode 100644
index bc30ea2..0000000
--- a/streampipes-node-controller/src/main/java/org/apache/streampipes/node/controller/management/offloading/AutoOffloadingManager.java
+++ /dev/null
@@ -1,42 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-package org.apache.streampipes.node.controller.management.offloading;
-
-import org.apache.streampipes.model.base.InvocableStreamPipesEntity;
-import org.apache.streampipes.node.controller.management.pe.InvocableElementManager;
-import org.apache.streampipes.node.controller.management.pe.storage.RunningInvocableInstances;
-
-import java.util.List;
-import java.util.Random;
-
-public class AutoOffloadingManager {
-
-    public static boolean offloadRandom(){
-        List<InvocableStreamPipesEntity> instances = RunningInvocableInstances.INSTANCE.getAll();
-        if(instances.size() == 0)
-            return false;
-        InvocableStreamPipesEntity randomlySelectedInstance = instances.get(new Random().nextInt(instances.size()));
-        InvocableElementManager.getInstance().postOffloadRequest(randomlySelectedInstance);
-        return true;
-    }
-
-    public static void offloadHeaviest(){
-        //TODO: Migrate the PE with the highest load on CPU (or possibly other resource)
-    }
-
-}
diff --git a/streampipes-node-controller/src/main/java/org/apache/streampipes/node/controller/management/offloading/OffloadingPolicyManager.java b/streampipes-node-controller/src/main/java/org/apache/streampipes/node/controller/management/offloading/OffloadingPolicyManager.java
index e9a9c1a..6f49266 100644
--- a/streampipes-node-controller/src/main/java/org/apache/streampipes/node/controller/management/offloading/OffloadingPolicyManager.java
+++ b/streampipes-node-controller/src/main/java/org/apache/streampipes/node/controller/management/offloading/OffloadingPolicyManager.java
@@ -18,19 +18,31 @@
 
 package org.apache.streampipes.node.controller.management.offloading;
 
+import org.apache.http.client.fluent.Request;
+import org.apache.http.entity.ContentType;
+import org.apache.streampipes.commons.exceptions.SpRuntimeException;
 import org.apache.streampipes.model.Response;
 import org.apache.streampipes.model.base.InvocableStreamPipesEntity;
+import org.apache.streampipes.model.node.NodeInfoDescription;
 import org.apache.streampipes.model.node.monitor.ResourceMetrics;
+import org.apache.streampipes.model.pipeline.Pipeline;
+import org.apache.streampipes.node.controller.config.NodeConfiguration;
 import org.apache.streampipes.node.controller.management.offloading.model.OffloadingStrategy;
 import org.apache.streampipes.node.controller.management.pe.InvocableElementManager;
+import org.apache.streampipes.serializers.json.JacksonSerializer;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.io.IOException;
 import java.util.ArrayList;
 import java.util.List;
 
 public class OffloadingPolicyManager {
 
+    private static final String HTTP_PROTOCOL = "http://";
+    private static final String COLON = ":";
+    private static final String SLASH = "/";
+
     private final List<OffloadingStrategy<?>> offloadingStrategies = new ArrayList<>();
     private static OffloadingPolicyManager instance;
     private static final Logger LOG = LoggerFactory.getLogger(OffloadingPolicyManager.class.getCanonicalName());
@@ -63,4 +75,26 @@ public class OffloadingPolicyManager {
         this.offloadingStrategies.add(offloadingStrategy);
     }
 
+    public List<NodeInfoDescription> getOnlineNodes(){
+        try {
+            String url = generateNodeManagementOnlineNodesEndpoint();
+            org.apache.http.client.fluent.Response resp = Request.Get(url).execute();
+            ArrayList<NodeInfoDescription> onlineNodes =
+                    JacksonSerializer.getObjectMapper().readValue(resp.returnContent().asString(),
+                    ArrayList.class);
+            return onlineNodes;
+        } catch (IOException e) {
+            throw new SpRuntimeException(e);
+        }
+    }
+
+    private String generateNodeManagementOnlineNodesEndpoint() {
+        return HTTP_PROTOCOL
+                + NodeConfiguration.getBackendHost()
+                + COLON
+                + NodeConfiguration.getBackendPort()
+                + SLASH
+                + "streampipes-backend/api/v2/users/admin@streampipes.org/nodes/online";
+    }
+
 }
diff --git a/streampipes-node-controller/src/main/java/org/apache/streampipes/node/controller/management/offloading/model/OffloadingStrategyFactory.java b/streampipes-node-controller/src/main/java/org/apache/streampipes/node/controller/management/offloading/model/OffloadingStrategyFactory.java
index 19e852e..22068c5 100644
--- a/streampipes-node-controller/src/main/java/org/apache/streampipes/node/controller/management/offloading/model/OffloadingStrategyFactory.java
+++ b/streampipes-node-controller/src/main/java/org/apache/streampipes/node/controller/management/offloading/model/OffloadingStrategyFactory.java
@@ -25,6 +25,7 @@ import org.apache.streampipes.node.controller.management.offloading.model.proper
 import org.apache.streampipes.node.controller.management.offloading.model.property.FreeDiskSpaceResourceProperty;
 import org.apache.streampipes.node.controller.management.offloading.model.property.FreeMemoryResourceProperty;
 import org.apache.streampipes.node.controller.management.offloading.model.selection.CPULoadSelectionStrategy;
+import org.apache.streampipes.node.controller.management.offloading.model.selection.PrioritySelectionStrategy;
 import org.apache.streampipes.node.controller.management.offloading.model.selection.RandomSelectionStrategy;
 import org.apache.streampipes.node.controller.management.offloading.model.selection.SelectionStrategy;
 
@@ -41,8 +42,8 @@ public class OffloadingStrategyFactory {
                             new CPULoadResourceProperty(), new RandomSelectionStrategy());
                 case "debug":
                     return new OffloadingStrategy<Float>(new ThresholdViolationOffloadingPolicy<>(5,
-                            Comparator.GREATER,0.5f, 3),
-                            new CPULoadResourceProperty(), new RandomSelectionStrategy());
+                            Comparator.GREATER,0.5f, 1),
+                            new CPULoadResourceProperty(), new PrioritySelectionStrategy());
                 case "memory":
                     return new OffloadingStrategy<Long>(new ThresholdViolationOffloadingPolicy<>(5, Comparator.SMALLER,
                             549755813888l, 5),
diff --git a/streampipes-node-controller/src/main/java/org/apache/streampipes/node/controller/management/offloading/model/selection/PrioritySelectionStrategy.java b/streampipes-node-controller/src/main/java/org/apache/streampipes/node/controller/management/offloading/model/selection/PrioritySelectionStrategy.java
new file mode 100644
index 0000000..000a93b
--- /dev/null
+++ b/streampipes-node-controller/src/main/java/org/apache/streampipes/node/controller/management/offloading/model/selection/PrioritySelectionStrategy.java
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.streampipes.node.controller.management.offloading.model.selection;
+
+import org.apache.streampipes.model.base.InvocableStreamPipesEntity;
+import org.apache.streampipes.model.node.NodeInfoDescription;
+import org.apache.streampipes.node.controller.management.node.NodeManager;
+import org.apache.streampipes.node.controller.management.offloading.OffloadingPolicyManager;
+import org.apache.streampipes.node.controller.management.pe.storage.RunningInvocableInstances;
+
+import java.util.ArrayList;
+import java.util.Comparator;
+import java.util.List;
+import java.util.stream.Collectors;
+
+public class PrioritySelectionStrategy implements SelectionStrategy{
+
+    @Override
+    public InvocableStreamPipesEntity selectEntity() {
+        List<InvocableStreamPipesEntity> instances = RunningInvocableInstances.INSTANCE.getAll();
+        //List all other nodes that are online
+        List<NodeInfoDescription> onlineNodes = getNodeInfos().stream().filter(desc ->
+                !desc.getNodeControllerId().equals(NodeManager.getInstance().retrieveNodeInfoDescription().getNodeControllerId()))
+                .collect(Collectors.toList());
+
+        List<InvocableStreamPipesEntity> candidateEntities =
+                instances.stream()
+                        .filter(InvocableStreamPipesEntity::isPreemption)
+                        .filter(entity -> checkIfSupported(entity, onlineNodes)).collect(Collectors.toList());
+        if(candidateEntities.isEmpty()) return null;
+        return lowestPriority(candidateEntities);
+    }
+
+    private List<NodeInfoDescription> getNodeInfos(){
+        return OffloadingPolicyManager.getInstance().getOnlineNodes();
+    }
+
+    private InvocableStreamPipesEntity lowestPriority(List<InvocableStreamPipesEntity> entities){
+        List<InvocableStreamPipesEntity> sortedEntities = entities.stream()
+                .sorted(Comparator.comparingInt(InvocableStreamPipesEntity::getPriorityScore))
+                .collect(Collectors.toList());
+        return sortedEntities.get(0);
+    }
+
+    private boolean checkIfSupported(InvocableStreamPipesEntity entity, List<NodeInfoDescription> nodeInfos){
+        List<NodeInfoDescription> candidateNodes = new ArrayList<>();
+        for(NodeInfoDescription nodeInfo : nodeInfos){
+            if (nodeInfo.getSupportedElements().contains(entity.getAppId()))
+                candidateNodes.add(nodeInfo);
+        }
+        return !candidateNodes.isEmpty();
+    }
+}
diff --git a/streampipes-node-controller/src/main/java/org/apache/streampipes/node/controller/management/pe/InvocableElementManager.java b/streampipes-node-controller/src/main/java/org/apache/streampipes/node/controller/management/pe/InvocableElementManager.java
index 671630c..33e7a23 100644
--- a/streampipes-node-controller/src/main/java/org/apache/streampipes/node/controller/management/pe/InvocableElementManager.java
+++ b/streampipes-node-controller/src/main/java/org/apache/streampipes/node/controller/management/pe/InvocableElementManager.java
@@ -41,6 +41,7 @@ import org.apache.streampipes.model.staticproperty.StaticProperty;
 import org.apache.streampipes.node.controller.config.NodeConfiguration;
 import org.apache.streampipes.node.controller.management.node.NodeManager;
 import org.apache.streampipes.node.controller.management.pe.storage.RunningInvocableInstances;
+import org.apache.streampipes.node.controller.management.resource.utils.ResourceChecker;
 import org.apache.streampipes.serializers.json.JacksonSerializer;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -168,13 +169,9 @@ public class InvocableElementManager implements IPipelineElementLifeCycle {
     }
 
     public Response postOffloadRequest(InvocableStreamPipesEntity instanceToOffload){
-        try {
-            String url = generatePipelineManagementOffloadEndpoint();
-            String desc = toJson(instanceToOffload);
-            return handleResponse(Request.Post(url).bodyString(desc, ContentType.APPLICATION_JSON).execute());
-        } catch (IOException e) {
-            throw new SpRuntimeException(e);
-        }
+        Response resp = new Response();
+        post(generatePipelineManagementOffloadEndpoint(), toJson(instanceToOffload), resp);
+        return resp;
     }
 
     private void adaptPipelineDescription(InvocableStreamPipesEntity graph, PipelineElementReconfigurationEntity reconfigurationEntity){
@@ -275,47 +272,9 @@ public class InvocableElementManager implements IPipelineElementLifeCycle {
     private List<ConsumableStreamPipesEntity> getSupportedEntities(InvocableRegistration registration){
         //Check if the node supports the entity (atm only hardware requirements; could be expanded to include
         // software requirements)
+        ResourceChecker resourceChecker = new ResourceChecker(getNodeInfoDescription());
         return registration.getSupportedPipelineElements().stream()
-                .filter(this::checkGpuRequirement)
-                .filter(this::checkCpuRequirement)
-                .filter(this::checkMemoryRequirement)
-                //.filter(this::checkDiskSpaceRequirement) currently does not work (node diskspace is falsely set to 0
-                // in Node Hardware Resources, regardless of actual free diskspace) TODO: fix this issue
-                    .collect(Collectors.toList());
-    }
-
-    private boolean checkGpuRequirement(ConsumableStreamPipesEntity spEntity){
-        boolean entityRequiresGpu = spEntity.getResourceRequirements().stream()
-                .filter(req -> req instanceof Hardware).filter(req -> ((Hardware) req).isGpu()).map(req -> ((Hardware) req)
-                        .isGpu()).findFirst().orElse(false);
-        boolean nodeHasGPU = (getNodeInfoDescription().getNodeResources().getHardwareResource().getGpu().getCores()>0);
-        return (!entityRequiresGpu || nodeHasGPU);
-    }
-
-    private boolean checkCpuRequirement(ConsumableStreamPipesEntity spEntity){
-        int requiredCPUCores = spEntity.getResourceRequirements().stream()
-                .filter(req -> req instanceof Hardware).filter(req -> ((Hardware) req).isGpu())
-                .map(req -> ((Hardware) req).getCpuCores()).findFirst().orElse(0);
-        int nodeCPUCores = getNodeInfoDescription().getNodeResources().getHardwareResource().getCpu().getCores();
-        return (requiredCPUCores <= nodeCPUCores);
-    }
-
-    private boolean checkMemoryRequirement(ConsumableStreamPipesEntity spEntity){
-        long requiredMemory = spEntity.getResourceRequirements().stream()
-                .filter(req -> req instanceof Hardware).filter(req -> ((Hardware) req).isGpu())
-                .map(req -> ((Hardware) req).getMemory()).findFirst().orElse(0l);
-        //Looks at total memory (could be adjusted to consider currently available Memory)
-        long actualNodeMemory = getNodeInfoDescription().getNodeResources().getHardwareResource().getMemory().getMemTotal();
-        return (requiredMemory <= actualNodeMemory);
-    }
-
-    private boolean checkDiskSpaceRequirement(ConsumableStreamPipesEntity spEntity){
-        long requiredDiskSpace = spEntity.getResourceRequirements().stream()
-                .filter(req -> req instanceof Hardware).filter(req -> ((Hardware) req).isGpu())
-                .map(req -> ((Hardware) req).getDisk()).findFirst().orElse(0l);
-        //Looks at total diskspace (could be adjusted to consider currently available diskspace)
-        long actualNodeDiskSpace = getNodeInfoDescription().getNodeResources().getHardwareResource().getDisk().getDiskTotal();
-        return (requiredDiskSpace <= actualNodeDiskSpace);
+                .filter(resourceChecker::checkResources).collect(Collectors.toList());
     }
 
     private List<InvocableStreamPipesEntity> getAllInvocables() {
diff --git a/streampipes-node-controller/src/main/java/org/apache/streampipes/node/controller/management/resource/utils/FileSystemType.java b/streampipes-node-controller/src/main/java/org/apache/streampipes/node/controller/management/resource/utils/FileSystemType.java
index 44984db..8b56c82 100644
--- a/streampipes-node-controller/src/main/java/org/apache/streampipes/node/controller/management/resource/utils/FileSystemType.java
+++ b/streampipes-node-controller/src/main/java/org/apache/streampipes/node/controller/management/resource/utils/FileSystemType.java
@@ -22,7 +22,8 @@ public enum FileSystemType {
     NVME("/dev/nvme"),
     DISK("/dev/disk"),
     ROOT("/dev/root"),
-    MMCBLK("/dev/mmcblk0p1");
+    MMCBLK("/dev/mmcblk0p1"),
+    SDB("/dev/sdb");
 
     private final String name;
 
diff --git a/streampipes-node-controller/src/main/java/org/apache/streampipes/node/controller/management/resource/utils/ResourceChecker.java b/streampipes-node-controller/src/main/java/org/apache/streampipes/node/controller/management/resource/utils/ResourceChecker.java
new file mode 100644
index 0000000..9af923b
--- /dev/null
+++ b/streampipes-node-controller/src/main/java/org/apache/streampipes/node/controller/management/resource/utils/ResourceChecker.java
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.streampipes.node.controller.management.resource.utils;
+
+import org.apache.streampipes.model.base.ConsumableStreamPipesEntity;
+import org.apache.streampipes.model.node.NodeInfoDescription;
+import org.apache.streampipes.model.resource.Hardware;
+
+import java.util.Optional;
+
+public class ResourceChecker {
+    private final NodeInfoDescription nodeInfo;
+
+    public ResourceChecker(NodeInfoDescription nodeInfo){
+        this.nodeInfo = nodeInfo;
+    }
+
+    public boolean checkResources(ConsumableStreamPipesEntity entity){
+        return (checkCpuRequirement(entity) && checkGpuRequirement(entity)
+                && checkMemoryRequirement(entity) && checkDiskSpaceRequirement(entity));
+    }
+
+    private boolean checkGpuRequirement(ConsumableStreamPipesEntity spEntity){
+        Optional<Hardware> hw = extractHardware(spEntity);
+        boolean entityRequiresGpu = false;
+        if(hw.isPresent())
+            entityRequiresGpu = hw.get().isGpu();
+        boolean nodeHasGPU = (nodeInfo.getNodeResources().getHardwareResource().getGpu().getCores()>0);
+        return (!entityRequiresGpu || nodeHasGPU);
+    }
+
+    private boolean checkCpuRequirement(ConsumableStreamPipesEntity spEntity){
+        Optional<Hardware> hw = extractHardware(spEntity);
+        int requiredCPUCores = 0;
+        if(hw.isPresent())
+            requiredCPUCores = hw.get().getCpuCores();
+        int nodeCPUCores = nodeInfo.getNodeResources().getHardwareResource().getCpu().getCores();
+        return (requiredCPUCores <= nodeCPUCores);
+    }
+
+    private boolean checkMemoryRequirement(ConsumableStreamPipesEntity spEntity){
+        Optional<Hardware> hw = extractHardware(spEntity);
+        long requiredMemory = 0L;
+        if(hw.isPresent())
+            requiredMemory = hw.get().getMemory();
+        //Looks at total memory (could be adjusted to consider currently available Memory)
+        long actualNodeMemory = nodeInfo.getNodeResources().getHardwareResource().getMemory().getMemTotal();
+        return (requiredMemory <= actualNodeMemory);
+    }
+
+    private boolean checkDiskSpaceRequirement(ConsumableStreamPipesEntity spEntity){
+        Optional<Hardware> hw = extractHardware(spEntity);
+        long requiredDiskSpace = 0L;
+        if(hw.isPresent())
+            requiredDiskSpace = hw.get().getDisk();
+        //Looks at total diskspace (could be adjusted to consider currently available diskspace)
+        long actualNodeDiskSpace = nodeInfo.getNodeResources().getHardwareResource().getDisk().getDiskTotal();
+        return (requiredDiskSpace <= actualNodeDiskSpace);
+    }
+
+    private Optional<Hardware> extractHardware(ConsumableStreamPipesEntity spEntity){
+        return spEntity.getResourceRequirements().stream()
+                .filter(req -> req instanceof Hardware).map(req -> (Hardware) req).findFirst();
+    }
+
+}
diff --git a/streampipes-node-controller/src/main/java/org/apache/streampipes/node/controller/management/resource/utils/ResourceUtils.java b/streampipes-node-controller/src/main/java/org/apache/streampipes/node/controller/management/resource/utils/ResourceUtils.java
index 603994f..a584397 100644
--- a/streampipes-node-controller/src/main/java/org/apache/streampipes/node/controller/management/resource/utils/ResourceUtils.java
+++ b/streampipes-node-controller/src/main/java/org/apache/streampipes/node/controller/management/resource/utils/ResourceUtils.java
@@ -55,6 +55,8 @@ public class ResourceUtils {
             } else if (volume.contains(FileSystemType.MMCBLK.getName())){
                 // Docker in Jetson Nano
                 addDiskUsage(diskUsage, f);
+            } else if (volume.contains(FileSystemType.SDB.getName())){
+                addDiskUsage(diskUsage, f);
             }
         }
         return diskUsage.isEmpty() ? defaultDiskUsage() : diskUsage;
diff --git a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/migration/MigrationPipelineGenerator.java b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/migration/MigrationPipelineGenerator.java
index 057d8a9..b73c479 100644
--- a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/migration/MigrationPipelineGenerator.java
+++ b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/migration/MigrationPipelineGenerator.java
@@ -35,6 +35,8 @@ public class MigrationPipelineGenerator {
 
     private InvocableStreamPipesEntity entityToMigrate;
     private Pipeline correspondingPipeline;
+    private final float memoryMultiplier = 0.9F;
+    private final float diskSpaceMultiplier = 0.9F;
 
     public MigrationPipelineGenerator(InvocableStreamPipesEntity entityToMigrate, Pipeline correspondingPipeline){
         this.entityToMigrate = entityToMigrate;
@@ -47,7 +49,7 @@ public class MigrationPipelineGenerator {
         List<NodeInfoDescription> possibleTargetNodes = getNodeInfos();
 
         switch(correspondingPipeline.getExecutionPolicy()){
-            case "custom":
+            case "custom": //TODO: Enum class
                 possibleTargetNodes = filterLocationTags(possibleTargetNodes);
             case "locality-aware":
                 //TODO: incorporate strategy for locality-aware deployment
@@ -101,10 +103,11 @@ public class MigrationPipelineGenerator {
             Hardware hardware = entityToMigrate.getResourceRequirements().stream()
                             .filter(nodeRR -> nodeRR instanceof Hardware).map(nodeRR -> (Hardware)nodeRR).findFirst().
                             orElse(null);
-            if(hardware != null){
+            if(hardware != null){ //TODO: Map CPU load ()
+                //Does produce empty list if no hardware requirements are defined
                 if (rmHistory.peek() != null
-                        && hardware.getDisk() <= rmHistory.peek().getFreeDiskSpaceInBytes()
-                        && hardware.getMemory() <= rmHistory.peek().getFreeMemoryInBytes()) {
+                        && hardware.getDisk() <= diskSpaceMultiplier * rmHistory.peek().getFreeDiskSpaceInBytes()
+                        && hardware.getMemory() <= memoryMultiplier * rmHistory.peek().getFreeMemoryInBytes()) {
                     filteredTargetNodes.add(nodeInfo);
                 }
             }
diff --git a/ui/src/app/core-model/gen/streampipes-model.ts b/ui/src/app/core-model/gen/streampipes-model.ts
index 67b2985..9ae5263 100644
--- a/ui/src/app/core-model/gen/streampipes-model.ts
+++ b/ui/src/app/core-model/gen/streampipes-model.ts
@@ -19,7 +19,7 @@
 /* tslint:disable */
 /* eslint-disable */
 // @ts-nocheck
-// Generated using typescript-generator version 2.27.744 on 2021-04-30 09:41:31.
+// Generated using typescript-generator version 2.27.744 on 2021-05-10 21:29:45.
 
 export class AbstractStreamPipesEntity {
     "@class": "org.apache.streampipes.model.base.AbstractStreamPipesEntity" | "org.apache.streampipes.model.base.NamedStreamPipesEntity" | "org.apache.streampipes.model.connect.adapter.AdapterDescription" | "org.apache.streampipes.model.connect.adapter.AdapterSetDescription" | "org.apache.streampipes.model.connect.adapter.GenericAdapterSetDescription" | "org.apache.streampipes.model.connect.adapter.SpecificAdapterSetDescription" | "org.apache.streampipes.model.connect.adapter.AdapterStre [...]
@@ -151,8 +151,8 @@ export class NamedStreamPipesEntity extends AbstractStreamPipesEntity {
         instance.applicationLinks = __getCopyArrayFn(ApplicationLink.fromData)(data.applicationLinks);
         instance.internallyManaged = data.internallyManaged;
         instance.connectedTo = __getCopyArrayFn(__identity<string>())(data.connectedTo);
-        instance.uri = data.uri;
         instance.dom = data.dom;
+        instance.uri = data.uri;
         return instance;
     }
 }
@@ -1044,6 +1044,8 @@ export class InvocableStreamPipesEntity extends NamedStreamPipesEntity {
     elementEndpointPort: number;
     elementEndpointServiceName: string;
     inputStreams: SpDataStreamUnion[];
+    preemption: boolean;
+    priorityScore: number;
     resourceRequirements: NodeResourceRequirementUnion[];
     staticProperties: StaticPropertyUnion[];
     statusInfoSettings: ElementStatusInfoSettings;
@@ -1073,6 +1075,8 @@ export class InvocableStreamPipesEntity extends NamedStreamPipesEntity {
         instance.deploymentTargetNodePort = data.deploymentTargetNodePort;
         instance.deploymentRunningInstanceId = data.deploymentRunningInstanceId;
         instance.elementEndpointServiceName = data.elementEndpointServiceName;
+        instance.priorityScore = data.priorityScore;
+        instance.preemption = data.preemption;
         instance.configured = data.configured;
         instance.uncompleted = data.uncompleted;
         return instance;
@@ -1834,8 +1838,8 @@ export class GenericAdapterSetDescription extends AdapterSetDescription implemen
         }
         const instance = target || new GenericAdapterSetDescription();
         super.fromData(data, instance);
-        instance.protocolDescription = ProtocolDescription.fromData(data.protocolDescription);
         instance.formatDescription = FormatDescription.fromData(data.formatDescription);
+        instance.protocolDescription = ProtocolDescription.fromData(data.protocolDescription);
         instance.eventSchema = EventSchema.fromData(data.eventSchema);
         return instance;
     }
@@ -1853,8 +1857,8 @@ export class GenericAdapterStreamDescription extends AdapterStreamDescription im
         }
         const instance = target || new GenericAdapterStreamDescription();
         super.fromData(data, instance);
-        instance.protocolDescription = ProtocolDescription.fromData(data.protocolDescription);
         instance.formatDescription = FormatDescription.fromData(data.formatDescription);
+        instance.protocolDescription = ProtocolDescription.fromData(data.protocolDescription);
         instance.eventSchema = EventSchema.fromData(data.eventSchema);
         return instance;
     }
diff --git a/ui/src/app/editor/dialog/save-pipeline/save-pipeline.component.ts b/ui/src/app/editor/dialog/save-pipeline/save-pipeline.component.ts
index 50d6a09..481130f 100644
--- a/ui/src/app/editor/dialog/save-pipeline/save-pipeline.component.ts
+++ b/ui/src/app/editor/dialog/save-pipeline/save-pipeline.component.ts
@@ -325,6 +325,13 @@ export class SavePipelineComponent implements OnInit {
     })
   }
 
+  modifyPipelineElementsPreemption(pipelineElements){
+    pipelineElements.forEach(p => {
+      p.preemption = this.selectedPreemption;
+      p.priorityScore = this.tmpPipeline.priorityScore;
+    })
+  }
+
   savePipeline(switchTab) {
     if (this.tmpPipeline.name == "") {
       //this.showToast("error", "Please enter a name for your pipeline");
@@ -344,6 +351,8 @@ export class SavePipelineComponent implements OnInit {
       } else {
         this.tmpPipeline.priorityScore = 0;
       }
+      this.modifyPipelineElementsPreemption(this.tmpPipeline.sepas);
+      this.modifyPipelineElementsPreemption(this.tmpPipeline.actions);
       if (this.selectedNodeTags?.length > 0 && this.selectedPipelineExecutionPolicy === "custom") {
         this.tmpPipeline.nodeTags = this.selectedNodeTags;
       } else {
@@ -364,6 +373,8 @@ export class SavePipelineComponent implements OnInit {
       } else {
         this.tmpPipeline.priorityScore = 0;
       }
+      this.modifyPipelineElementsPreemption(this.tmpPipeline.sepas);
+      this.modifyPipelineElementsPreemption(this.tmpPipeline.actions);
       if (this.selectedNodeTags?.length > 0 && this.selectedPipelineExecutionPolicy === "custom") {
         this.tmpPipeline.nodeTags = this.selectedNodeTags;
       } else {