You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@streampipes.apache.org by go...@apache.org on 2021/05/10 21:03:18 UTC
[incubator-streampipes] 07/07: Refactoring and added priority based
offloading
This is an automated email from the ASF dual-hosted git repository.
gomm pushed a commit to branch edge-extensions
in repository https://gitbox.apache.org/repos/asf/incubator-streampipes.git
commit 589dec6de316e969f6c27f9ca5f343260b3efe07
Author: Daniel Gomm <da...@outlook.de>
AuthorDate: Mon May 10 23:01:14 2021 +0200
Refactoring and added priority based offloading
---
.../model/base/InvocableStreamPipesEntity.java | 22 ++++++
.../streampipes/model/pipeline/Pipeline.java | 4 ++
.../offloading/AutoOffloadingManager.java | 42 -----------
.../offloading/OffloadingPolicyManager.java | 34 +++++++++
.../model/OffloadingStrategyFactory.java | 5 +-
.../model/selection/PrioritySelectionStrategy.java | 69 ++++++++++++++++++
.../management/pe/InvocableElementManager.java | 53 ++------------
.../management/resource/utils/FileSystemType.java | 3 +-
.../management/resource/utils/ResourceChecker.java | 82 ++++++++++++++++++++++
.../management/resource/utils/ResourceUtils.java | 2 +
.../migration/MigrationPipelineGenerator.java | 11 +--
ui/src/app/core-model/gen/streampipes-model.ts | 12 ++--
.../save-pipeline/save-pipeline.component.ts | 11 +++
13 files changed, 250 insertions(+), 100 deletions(-)
diff --git a/streampipes-model/src/main/java/org/apache/streampipes/model/base/InvocableStreamPipesEntity.java b/streampipes-model/src/main/java/org/apache/streampipes/model/base/InvocableStreamPipesEntity.java
index 457bfe1..08c25a8 100644
--- a/streampipes-model/src/main/java/org/apache/streampipes/model/base/InvocableStreamPipesEntity.java
+++ b/streampipes-model/src/main/java/org/apache/streampipes/model/base/InvocableStreamPipesEntity.java
@@ -99,6 +99,10 @@ public abstract class InvocableStreamPipesEntity extends NamedStreamPipesEntity
@RdfProperty(StreamPipes.ELEMENT_ENDPOINT_SERVICE_NAME)
private String elementEndpointServiceName;
+ private Integer priorityScore;
+
+ private boolean preemption;
+
//@RdfProperty(StreamPipes.PE_CONFIGURED)
private boolean configured;
@@ -123,6 +127,8 @@ public abstract class InvocableStreamPipesEntity extends NamedStreamPipesEntity
this.deploymentRunningInstanceId = other.getDeploymentRunningInstanceId();
this.elementEndpointServiceName = other.getElementEndpointServiceName();
this.correspondingUser = other.getCorrespondingUser();
+ this.preemption = other.isPreemption();
+ this.priorityScore = other.getPriorityScore();
if (other.getStreamRequirements() != null) {
this.streamRequirements = new Cloner().streams(other.getStreamRequirements());
}
@@ -296,4 +302,20 @@ public abstract class InvocableStreamPipesEntity extends NamedStreamPipesEntity
public void setResourceRequirements(List<NodeResourceRequirement> resourceRequirements) {
this.resourceRequirements = resourceRequirements;
}
+
+ public Integer getPriorityScore() {
+ return priorityScore;
+ }
+
+ public void setPriorityScore(Integer priorityScore) {
+ this.priorityScore = priorityScore;
+ }
+
+ public boolean isPreemption() {
+ return preemption;
+ }
+
+ public void setPreemption(boolean preemption) {
+ this.preemption = preemption;
+ }
}
diff --git a/streampipes-model/src/main/java/org/apache/streampipes/model/pipeline/Pipeline.java b/streampipes-model/src/main/java/org/apache/streampipes/model/pipeline/Pipeline.java
index e32b229..167417d 100644
--- a/streampipes-model/src/main/java/org/apache/streampipes/model/pipeline/Pipeline.java
+++ b/streampipes-model/src/main/java/org/apache/streampipes/model/pipeline/Pipeline.java
@@ -170,6 +170,8 @@ public class Pipeline extends ElementComposition {
public void setPriorityScore(int priorityScore) {
this.priorityScore = priorityScore;
+ this.getSepas().forEach(processor -> processor.setPriorityScore(priorityScore));
+ this.getActions().forEach(sink -> sink.setPriorityScore(priorityScore));
}
public boolean isPreemption() {
@@ -178,6 +180,8 @@ public class Pipeline extends ElementComposition {
public void setPreemption(boolean preemption) {
this.preemption = preemption;
+ this.getSepas().forEach(processor -> processor.setPreemption(preemption));
+ this.getActions().forEach(sink -> sink.setPreemption(preemption));
}
public boolean isRestartOnSystemReboot() {
diff --git a/streampipes-node-controller/src/main/java/org/apache/streampipes/node/controller/management/offloading/AutoOffloadingManager.java b/streampipes-node-controller/src/main/java/org/apache/streampipes/node/controller/management/offloading/AutoOffloadingManager.java
deleted file mode 100644
index bc30ea2..0000000
--- a/streampipes-node-controller/src/main/java/org/apache/streampipes/node/controller/management/offloading/AutoOffloadingManager.java
+++ /dev/null
@@ -1,42 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-package org.apache.streampipes.node.controller.management.offloading;
-
-import org.apache.streampipes.model.base.InvocableStreamPipesEntity;
-import org.apache.streampipes.node.controller.management.pe.InvocableElementManager;
-import org.apache.streampipes.node.controller.management.pe.storage.RunningInvocableInstances;
-
-import java.util.List;
-import java.util.Random;
-
-public class AutoOffloadingManager {
-
- public static boolean offloadRandom(){
- List<InvocableStreamPipesEntity> instances = RunningInvocableInstances.INSTANCE.getAll();
- if(instances.size() == 0)
- return false;
- InvocableStreamPipesEntity randomlySelectedInstance = instances.get(new Random().nextInt(instances.size()));
- InvocableElementManager.getInstance().postOffloadRequest(randomlySelectedInstance);
- return true;
- }
-
- public static void offloadHeaviest(){
- //TODO: Migrate the PE with the highest load on CPU (or possibly other resource)
- }
-
-}
diff --git a/streampipes-node-controller/src/main/java/org/apache/streampipes/node/controller/management/offloading/OffloadingPolicyManager.java b/streampipes-node-controller/src/main/java/org/apache/streampipes/node/controller/management/offloading/OffloadingPolicyManager.java
index e9a9c1a..6f49266 100644
--- a/streampipes-node-controller/src/main/java/org/apache/streampipes/node/controller/management/offloading/OffloadingPolicyManager.java
+++ b/streampipes-node-controller/src/main/java/org/apache/streampipes/node/controller/management/offloading/OffloadingPolicyManager.java
@@ -18,19 +18,31 @@
package org.apache.streampipes.node.controller.management.offloading;
+import org.apache.http.client.fluent.Request;
+import org.apache.http.entity.ContentType;
+import org.apache.streampipes.commons.exceptions.SpRuntimeException;
import org.apache.streampipes.model.Response;
import org.apache.streampipes.model.base.InvocableStreamPipesEntity;
+import org.apache.streampipes.model.node.NodeInfoDescription;
import org.apache.streampipes.model.node.monitor.ResourceMetrics;
+import org.apache.streampipes.model.pipeline.Pipeline;
+import org.apache.streampipes.node.controller.config.NodeConfiguration;
import org.apache.streampipes.node.controller.management.offloading.model.OffloadingStrategy;
import org.apache.streampipes.node.controller.management.pe.InvocableElementManager;
+import org.apache.streampipes.serializers.json.JacksonSerializer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
public class OffloadingPolicyManager {
+ private static final String HTTP_PROTOCOL = "http://";
+ private static final String COLON = ":";
+ private static final String SLASH = "/";
+
private final List<OffloadingStrategy<?>> offloadingStrategies = new ArrayList<>();
private static OffloadingPolicyManager instance;
private static final Logger LOG = LoggerFactory.getLogger(OffloadingPolicyManager.class.getCanonicalName());
@@ -63,4 +75,26 @@ public class OffloadingPolicyManager {
this.offloadingStrategies.add(offloadingStrategy);
}
+ public List<NodeInfoDescription> getOnlineNodes(){
+ try {
+ String url = generateNodeManagementOnlineNodesEndpoint();
+ org.apache.http.client.fluent.Response resp = Request.Get(url).execute();
+ ArrayList<NodeInfoDescription> onlineNodes =
+ JacksonSerializer.getObjectMapper().readValue(resp.returnContent().asString(),
+ ArrayList.class);
+ return onlineNodes;
+ } catch (IOException e) {
+ throw new SpRuntimeException(e);
+ }
+ }
+
+ private String generateNodeManagementOnlineNodesEndpoint() {
+ return HTTP_PROTOCOL
+ + NodeConfiguration.getBackendHost()
+ + COLON
+ + NodeConfiguration.getBackendPort()
+ + SLASH
+ + "streampipes-backend/api/v2/users/admin@streampipes.org/nodes/online";
+ }
+
}
diff --git a/streampipes-node-controller/src/main/java/org/apache/streampipes/node/controller/management/offloading/model/OffloadingStrategyFactory.java b/streampipes-node-controller/src/main/java/org/apache/streampipes/node/controller/management/offloading/model/OffloadingStrategyFactory.java
index 19e852e..22068c5 100644
--- a/streampipes-node-controller/src/main/java/org/apache/streampipes/node/controller/management/offloading/model/OffloadingStrategyFactory.java
+++ b/streampipes-node-controller/src/main/java/org/apache/streampipes/node/controller/management/offloading/model/OffloadingStrategyFactory.java
@@ -25,6 +25,7 @@ import org.apache.streampipes.node.controller.management.offloading.model.proper
import org.apache.streampipes.node.controller.management.offloading.model.property.FreeDiskSpaceResourceProperty;
import org.apache.streampipes.node.controller.management.offloading.model.property.FreeMemoryResourceProperty;
import org.apache.streampipes.node.controller.management.offloading.model.selection.CPULoadSelectionStrategy;
+import org.apache.streampipes.node.controller.management.offloading.model.selection.PrioritySelectionStrategy;
import org.apache.streampipes.node.controller.management.offloading.model.selection.RandomSelectionStrategy;
import org.apache.streampipes.node.controller.management.offloading.model.selection.SelectionStrategy;
@@ -41,8 +42,8 @@ public class OffloadingStrategyFactory {
new CPULoadResourceProperty(), new RandomSelectionStrategy());
case "debug":
return new OffloadingStrategy<Float>(new ThresholdViolationOffloadingPolicy<>(5,
- Comparator.GREATER,0.5f, 3),
- new CPULoadResourceProperty(), new RandomSelectionStrategy());
+ Comparator.GREATER,0.5f, 1),
+ new CPULoadResourceProperty(), new PrioritySelectionStrategy());
case "memory":
return new OffloadingStrategy<Long>(new ThresholdViolationOffloadingPolicy<>(5, Comparator.SMALLER,
549755813888l, 5),
diff --git a/streampipes-node-controller/src/main/java/org/apache/streampipes/node/controller/management/offloading/model/selection/PrioritySelectionStrategy.java b/streampipes-node-controller/src/main/java/org/apache/streampipes/node/controller/management/offloading/model/selection/PrioritySelectionStrategy.java
new file mode 100644
index 0000000..000a93b
--- /dev/null
+++ b/streampipes-node-controller/src/main/java/org/apache/streampipes/node/controller/management/offloading/model/selection/PrioritySelectionStrategy.java
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.streampipes.node.controller.management.offloading.model.selection;
+
+import org.apache.streampipes.model.base.InvocableStreamPipesEntity;
+import org.apache.streampipes.model.node.NodeInfoDescription;
+import org.apache.streampipes.node.controller.management.node.NodeManager;
+import org.apache.streampipes.node.controller.management.offloading.OffloadingPolicyManager;
+import org.apache.streampipes.node.controller.management.pe.storage.RunningInvocableInstances;
+
+import java.util.ArrayList;
+import java.util.Comparator;
+import java.util.List;
+import java.util.stream.Collectors;
+
+public class PrioritySelectionStrategy implements SelectionStrategy{
+
+ @Override
+ public InvocableStreamPipesEntity selectEntity() {
+ List<InvocableStreamPipesEntity> instances = RunningInvocableInstances.INSTANCE.getAll();
+ //List all other nodes that are online
+ List<NodeInfoDescription> onlineNodes = getNodeInfos().stream().filter(desc ->
+ !desc.getNodeControllerId().equals(NodeManager.getInstance().retrieveNodeInfoDescription().getNodeControllerId()))
+ .collect(Collectors.toList());
+
+ List<InvocableStreamPipesEntity> candidateEntities =
+ instances.stream()
+ .filter(InvocableStreamPipesEntity::isPreemption)
+ .filter(entity -> checkIfSupported(entity, onlineNodes)).collect(Collectors.toList());
+ if(candidateEntities.isEmpty()) return null;
+ return lowestPriority(candidateEntities);
+ }
+
+ private List<NodeInfoDescription> getNodeInfos(){
+ return OffloadingPolicyManager.getInstance().getOnlineNodes();
+ }
+
+ private InvocableStreamPipesEntity lowestPriority(List<InvocableStreamPipesEntity> entities){
+ List<InvocableStreamPipesEntity> sortedEntities = entities.stream()
+ .sorted(Comparator.comparingInt(InvocableStreamPipesEntity::getPriorityScore))
+ .collect(Collectors.toList());
+ return sortedEntities.get(0);
+ }
+
+ private boolean checkIfSupported(InvocableStreamPipesEntity entity, List<NodeInfoDescription> nodeInfos){
+ List<NodeInfoDescription> candidateNodes = new ArrayList<>();
+ for(NodeInfoDescription nodeInfo : nodeInfos){
+ if (nodeInfo.getSupportedElements().contains(entity.getAppId()))
+ candidateNodes.add(nodeInfo);
+ }
+ return !candidateNodes.isEmpty();
+ }
+}
diff --git a/streampipes-node-controller/src/main/java/org/apache/streampipes/node/controller/management/pe/InvocableElementManager.java b/streampipes-node-controller/src/main/java/org/apache/streampipes/node/controller/management/pe/InvocableElementManager.java
index 671630c..33e7a23 100644
--- a/streampipes-node-controller/src/main/java/org/apache/streampipes/node/controller/management/pe/InvocableElementManager.java
+++ b/streampipes-node-controller/src/main/java/org/apache/streampipes/node/controller/management/pe/InvocableElementManager.java
@@ -41,6 +41,7 @@ import org.apache.streampipes.model.staticproperty.StaticProperty;
import org.apache.streampipes.node.controller.config.NodeConfiguration;
import org.apache.streampipes.node.controller.management.node.NodeManager;
import org.apache.streampipes.node.controller.management.pe.storage.RunningInvocableInstances;
+import org.apache.streampipes.node.controller.management.resource.utils.ResourceChecker;
import org.apache.streampipes.serializers.json.JacksonSerializer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -168,13 +169,9 @@ public class InvocableElementManager implements IPipelineElementLifeCycle {
}
public Response postOffloadRequest(InvocableStreamPipesEntity instanceToOffload){
- try {
- String url = generatePipelineManagementOffloadEndpoint();
- String desc = toJson(instanceToOffload);
- return handleResponse(Request.Post(url).bodyString(desc, ContentType.APPLICATION_JSON).execute());
- } catch (IOException e) {
- throw new SpRuntimeException(e);
- }
+ Response resp = new Response();
+ post(generatePipelineManagementOffloadEndpoint(), toJson(instanceToOffload), resp);
+ return resp;
}
private void adaptPipelineDescription(InvocableStreamPipesEntity graph, PipelineElementReconfigurationEntity reconfigurationEntity){
@@ -275,47 +272,9 @@ public class InvocableElementManager implements IPipelineElementLifeCycle {
private List<ConsumableStreamPipesEntity> getSupportedEntities(InvocableRegistration registration){
//Check if the node supports the entity (atm only hardware requirements; could be expanded to include
// software requirements)
+ ResourceChecker resourceChecker = new ResourceChecker(getNodeInfoDescription());
return registration.getSupportedPipelineElements().stream()
- .filter(this::checkGpuRequirement)
- .filter(this::checkCpuRequirement)
- .filter(this::checkMemoryRequirement)
- //.filter(this::checkDiskSpaceRequirement) currently does not work (node diskspace is falsely set to 0
- // in Node Hardware Resources, regardless of actual free diskspace) TODO: fix this issue
- .collect(Collectors.toList());
- }
-
- private boolean checkGpuRequirement(ConsumableStreamPipesEntity spEntity){
- boolean entityRequiresGpu = spEntity.getResourceRequirements().stream()
- .filter(req -> req instanceof Hardware).filter(req -> ((Hardware) req).isGpu()).map(req -> ((Hardware) req)
- .isGpu()).findFirst().orElse(false);
- boolean nodeHasGPU = (getNodeInfoDescription().getNodeResources().getHardwareResource().getGpu().getCores()>0);
- return (!entityRequiresGpu || nodeHasGPU);
- }
-
- private boolean checkCpuRequirement(ConsumableStreamPipesEntity spEntity){
- int requiredCPUCores = spEntity.getResourceRequirements().stream()
- .filter(req -> req instanceof Hardware).filter(req -> ((Hardware) req).isGpu())
- .map(req -> ((Hardware) req).getCpuCores()).findFirst().orElse(0);
- int nodeCPUCores = getNodeInfoDescription().getNodeResources().getHardwareResource().getCpu().getCores();
- return (requiredCPUCores <= nodeCPUCores);
- }
-
- private boolean checkMemoryRequirement(ConsumableStreamPipesEntity spEntity){
- long requiredMemory = spEntity.getResourceRequirements().stream()
- .filter(req -> req instanceof Hardware).filter(req -> ((Hardware) req).isGpu())
- .map(req -> ((Hardware) req).getMemory()).findFirst().orElse(0l);
- //Looks at total memory (could be adjusted to consider currently available Memory)
- long actualNodeMemory = getNodeInfoDescription().getNodeResources().getHardwareResource().getMemory().getMemTotal();
- return (requiredMemory <= actualNodeMemory);
- }
-
- private boolean checkDiskSpaceRequirement(ConsumableStreamPipesEntity spEntity){
- long requiredDiskSpace = spEntity.getResourceRequirements().stream()
- .filter(req -> req instanceof Hardware).filter(req -> ((Hardware) req).isGpu())
- .map(req -> ((Hardware) req).getDisk()).findFirst().orElse(0l);
- //Looks at total diskspace (could be adjusted to consider currently available diskspace)
- long actualNodeDiskSpace = getNodeInfoDescription().getNodeResources().getHardwareResource().getDisk().getDiskTotal();
- return (requiredDiskSpace <= actualNodeDiskSpace);
+ .filter(resourceChecker::checkResources).collect(Collectors.toList());
}
private List<InvocableStreamPipesEntity> getAllInvocables() {
diff --git a/streampipes-node-controller/src/main/java/org/apache/streampipes/node/controller/management/resource/utils/FileSystemType.java b/streampipes-node-controller/src/main/java/org/apache/streampipes/node/controller/management/resource/utils/FileSystemType.java
index 44984db..8b56c82 100644
--- a/streampipes-node-controller/src/main/java/org/apache/streampipes/node/controller/management/resource/utils/FileSystemType.java
+++ b/streampipes-node-controller/src/main/java/org/apache/streampipes/node/controller/management/resource/utils/FileSystemType.java
@@ -22,7 +22,8 @@ public enum FileSystemType {
NVME("/dev/nvme"),
DISK("/dev/disk"),
ROOT("/dev/root"),
- MMCBLK("/dev/mmcblk0p1");
+ MMCBLK("/dev/mmcblk0p1"),
+ SDB("/dev/sdb");
private final String name;
diff --git a/streampipes-node-controller/src/main/java/org/apache/streampipes/node/controller/management/resource/utils/ResourceChecker.java b/streampipes-node-controller/src/main/java/org/apache/streampipes/node/controller/management/resource/utils/ResourceChecker.java
new file mode 100644
index 0000000..9af923b
--- /dev/null
+++ b/streampipes-node-controller/src/main/java/org/apache/streampipes/node/controller/management/resource/utils/ResourceChecker.java
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.streampipes.node.controller.management.resource.utils;
+
+import org.apache.streampipes.model.base.ConsumableStreamPipesEntity;
+import org.apache.streampipes.model.node.NodeInfoDescription;
+import org.apache.streampipes.model.resource.Hardware;
+
+import java.util.Optional;
+
+public class ResourceChecker {
+ private final NodeInfoDescription nodeInfo;
+
+ public ResourceChecker(NodeInfoDescription nodeInfo){
+ this.nodeInfo = nodeInfo;
+ }
+
+ public boolean checkResources(ConsumableStreamPipesEntity entity){
+ return (checkCpuRequirement(entity) && checkGpuRequirement(entity)
+ && checkMemoryRequirement(entity) && checkDiskSpaceRequirement(entity));
+ }
+
+ private boolean checkGpuRequirement(ConsumableStreamPipesEntity spEntity){
+ Optional<Hardware> hw = extractHardware(spEntity);
+ boolean entityRequiresGpu = false;
+ if(hw.isPresent())
+ entityRequiresGpu = hw.get().isGpu();
+ boolean nodeHasGPU = (nodeInfo.getNodeResources().getHardwareResource().getGpu().getCores()>0);
+ return (!entityRequiresGpu || nodeHasGPU);
+ }
+
+ private boolean checkCpuRequirement(ConsumableStreamPipesEntity spEntity){
+ Optional<Hardware> hw = extractHardware(spEntity);
+ int requiredCPUCores = 0;
+ if(hw.isPresent())
+ requiredCPUCores = hw.get().getCpuCores();
+ int nodeCPUCores = nodeInfo.getNodeResources().getHardwareResource().getCpu().getCores();
+ return (requiredCPUCores <= nodeCPUCores);
+ }
+
+ private boolean checkMemoryRequirement(ConsumableStreamPipesEntity spEntity){
+ Optional<Hardware> hw = extractHardware(spEntity);
+ long requiredMemory = 0L;
+ if(hw.isPresent())
+ requiredMemory = hw.get().getMemory();
+ //Looks at total memory (could be adjusted to consider currently available Memory)
+ long actualNodeMemory = nodeInfo.getNodeResources().getHardwareResource().getMemory().getMemTotal();
+ return (requiredMemory <= actualNodeMemory);
+ }
+
+ private boolean checkDiskSpaceRequirement(ConsumableStreamPipesEntity spEntity){
+ Optional<Hardware> hw = extractHardware(spEntity);
+ long requiredDiskSpace = 0L;
+ if(hw.isPresent())
+ requiredDiskSpace = hw.get().getDisk();
+ //Looks at total diskspace (could be adjusted to consider currently available diskspace)
+ long actualNodeDiskSpace = nodeInfo.getNodeResources().getHardwareResource().getDisk().getDiskTotal();
+ return (requiredDiskSpace <= actualNodeDiskSpace);
+ }
+
+ private Optional<Hardware> extractHardware(ConsumableStreamPipesEntity spEntity){
+ return spEntity.getResourceRequirements().stream()
+ .filter(req -> req instanceof Hardware).map(req -> (Hardware) req).findFirst();
+ }
+
+}
diff --git a/streampipes-node-controller/src/main/java/org/apache/streampipes/node/controller/management/resource/utils/ResourceUtils.java b/streampipes-node-controller/src/main/java/org/apache/streampipes/node/controller/management/resource/utils/ResourceUtils.java
index 603994f..a584397 100644
--- a/streampipes-node-controller/src/main/java/org/apache/streampipes/node/controller/management/resource/utils/ResourceUtils.java
+++ b/streampipes-node-controller/src/main/java/org/apache/streampipes/node/controller/management/resource/utils/ResourceUtils.java
@@ -55,6 +55,8 @@ public class ResourceUtils {
} else if (volume.contains(FileSystemType.MMCBLK.getName())){
// Docker in Jetson Nano
addDiskUsage(diskUsage, f);
+ } else if (volume.contains(FileSystemType.SDB.getName())){
+ addDiskUsage(diskUsage, f);
}
}
return diskUsage.isEmpty() ? defaultDiskUsage() : diskUsage;
diff --git a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/migration/MigrationPipelineGenerator.java b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/migration/MigrationPipelineGenerator.java
index 057d8a9..b73c479 100644
--- a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/migration/MigrationPipelineGenerator.java
+++ b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/migration/MigrationPipelineGenerator.java
@@ -35,6 +35,8 @@ public class MigrationPipelineGenerator {
private InvocableStreamPipesEntity entityToMigrate;
private Pipeline correspondingPipeline;
+ private final float memoryMultiplier = 0.9F;
+ private final float diskSpaceMultiplier = 0.9F;
public MigrationPipelineGenerator(InvocableStreamPipesEntity entityToMigrate, Pipeline correspondingPipeline){
this.entityToMigrate = entityToMigrate;
@@ -47,7 +49,7 @@ public class MigrationPipelineGenerator {
List<NodeInfoDescription> possibleTargetNodes = getNodeInfos();
switch(correspondingPipeline.getExecutionPolicy()){
- case "custom":
+ case "custom": //TODO: Enum class
possibleTargetNodes = filterLocationTags(possibleTargetNodes);
case "locality-aware":
//TODO: incorporate strategy for locality-aware deployment
@@ -101,10 +103,11 @@ public class MigrationPipelineGenerator {
Hardware hardware = entityToMigrate.getResourceRequirements().stream()
.filter(nodeRR -> nodeRR instanceof Hardware).map(nodeRR -> (Hardware)nodeRR).findFirst().
orElse(null);
- if(hardware != null){
+ if(hardware != null){ //TODO: Map CPU load ()
+ //Does produce empty list if no hardware requirements are defined
if (rmHistory.peek() != null
- && hardware.getDisk() <= rmHistory.peek().getFreeDiskSpaceInBytes()
- && hardware.getMemory() <= rmHistory.peek().getFreeMemoryInBytes()) {
+ && hardware.getDisk() <= diskSpaceMultiplier * rmHistory.peek().getFreeDiskSpaceInBytes()
+ && hardware.getMemory() <= memoryMultiplier * rmHistory.peek().getFreeMemoryInBytes()) {
filteredTargetNodes.add(nodeInfo);
}
}
diff --git a/ui/src/app/core-model/gen/streampipes-model.ts b/ui/src/app/core-model/gen/streampipes-model.ts
index 67b2985..9ae5263 100644
--- a/ui/src/app/core-model/gen/streampipes-model.ts
+++ b/ui/src/app/core-model/gen/streampipes-model.ts
@@ -19,7 +19,7 @@
/* tslint:disable */
/* eslint-disable */
// @ts-nocheck
-// Generated using typescript-generator version 2.27.744 on 2021-04-30 09:41:31.
+// Generated using typescript-generator version 2.27.744 on 2021-05-10 21:29:45.
export class AbstractStreamPipesEntity {
"@class": "org.apache.streampipes.model.base.AbstractStreamPipesEntity" | "org.apache.streampipes.model.base.NamedStreamPipesEntity" | "org.apache.streampipes.model.connect.adapter.AdapterDescription" | "org.apache.streampipes.model.connect.adapter.AdapterSetDescription" | "org.apache.streampipes.model.connect.adapter.GenericAdapterSetDescription" | "org.apache.streampipes.model.connect.adapter.SpecificAdapterSetDescription" | "org.apache.streampipes.model.connect.adapter.AdapterStre [...]
@@ -151,8 +151,8 @@ export class NamedStreamPipesEntity extends AbstractStreamPipesEntity {
instance.applicationLinks = __getCopyArrayFn(ApplicationLink.fromData)(data.applicationLinks);
instance.internallyManaged = data.internallyManaged;
instance.connectedTo = __getCopyArrayFn(__identity<string>())(data.connectedTo);
- instance.uri = data.uri;
instance.dom = data.dom;
+ instance.uri = data.uri;
return instance;
}
}
@@ -1044,6 +1044,8 @@ export class InvocableStreamPipesEntity extends NamedStreamPipesEntity {
elementEndpointPort: number;
elementEndpointServiceName: string;
inputStreams: SpDataStreamUnion[];
+ preemption: boolean;
+ priorityScore: number;
resourceRequirements: NodeResourceRequirementUnion[];
staticProperties: StaticPropertyUnion[];
statusInfoSettings: ElementStatusInfoSettings;
@@ -1073,6 +1075,8 @@ export class InvocableStreamPipesEntity extends NamedStreamPipesEntity {
instance.deploymentTargetNodePort = data.deploymentTargetNodePort;
instance.deploymentRunningInstanceId = data.deploymentRunningInstanceId;
instance.elementEndpointServiceName = data.elementEndpointServiceName;
+ instance.priorityScore = data.priorityScore;
+ instance.preemption = data.preemption;
instance.configured = data.configured;
instance.uncompleted = data.uncompleted;
return instance;
@@ -1834,8 +1838,8 @@ export class GenericAdapterSetDescription extends AdapterSetDescription implemen
}
const instance = target || new GenericAdapterSetDescription();
super.fromData(data, instance);
- instance.protocolDescription = ProtocolDescription.fromData(data.protocolDescription);
instance.formatDescription = FormatDescription.fromData(data.formatDescription);
+ instance.protocolDescription = ProtocolDescription.fromData(data.protocolDescription);
instance.eventSchema = EventSchema.fromData(data.eventSchema);
return instance;
}
@@ -1853,8 +1857,8 @@ export class GenericAdapterStreamDescription extends AdapterStreamDescription im
}
const instance = target || new GenericAdapterStreamDescription();
super.fromData(data, instance);
- instance.protocolDescription = ProtocolDescription.fromData(data.protocolDescription);
instance.formatDescription = FormatDescription.fromData(data.formatDescription);
+ instance.protocolDescription = ProtocolDescription.fromData(data.protocolDescription);
instance.eventSchema = EventSchema.fromData(data.eventSchema);
return instance;
}
diff --git a/ui/src/app/editor/dialog/save-pipeline/save-pipeline.component.ts b/ui/src/app/editor/dialog/save-pipeline/save-pipeline.component.ts
index 50d6a09..481130f 100644
--- a/ui/src/app/editor/dialog/save-pipeline/save-pipeline.component.ts
+++ b/ui/src/app/editor/dialog/save-pipeline/save-pipeline.component.ts
@@ -325,6 +325,13 @@ export class SavePipelineComponent implements OnInit {
})
}
+ modifyPipelineElementsPreemption(pipelineElements){
+ pipelineElements.forEach(p => {
+ p.preemption = this.selectedPreemption;
+ p.priorityScore = this.tmpPipeline.priorityScore;
+ })
+ }
+
savePipeline(switchTab) {
if (this.tmpPipeline.name == "") {
//this.showToast("error", "Please enter a name for your pipeline");
@@ -344,6 +351,8 @@ export class SavePipelineComponent implements OnInit {
} else {
this.tmpPipeline.priorityScore = 0;
}
+ this.modifyPipelineElementsPreemption(this.tmpPipeline.sepas);
+ this.modifyPipelineElementsPreemption(this.tmpPipeline.actions);
if (this.selectedNodeTags?.length > 0 && this.selectedPipelineExecutionPolicy === "custom") {
this.tmpPipeline.nodeTags = this.selectedNodeTags;
} else {
@@ -364,6 +373,8 @@ export class SavePipelineComponent implements OnInit {
} else {
this.tmpPipeline.priorityScore = 0;
}
+ this.modifyPipelineElementsPreemption(this.tmpPipeline.sepas);
+ this.modifyPipelineElementsPreemption(this.tmpPipeline.actions);
if (this.selectedNodeTags?.length > 0 && this.selectedPipelineExecutionPolicy === "custom") {
this.tmpPipeline.nodeTags = this.selectedNodeTags;
} else {