You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@streampipes.apache.org by go...@apache.org on 2021/05/10 21:03:15 UTC
[incubator-streampipes] 04/07: Refactored and standardized
offloading
This is an automated email from the ASF dual-hosted git repository.
gomm pushed a commit to branch edge-extensions
in repository https://gitbox.apache.org/repos/asf/incubator-streampipes.git
commit d27684af4bb2c70c8aa96a1301d334ce7f97fc47
Author: Daniel Gomm <da...@outlook.de>
AuthorDate: Thu May 6 13:43:36 2021 +0200
Refactored and standardized offloading
---
.../backend/StreamPipesBackendApplication.java | 2 +
.../model/message/NotificationType.java | 1 +
.../model/resource}/ResourceMetrics.java | 2 +-
.../offloading/OffloadingPolicyManager.java | 17 +++-
.../offloading/model/OffloadingStrategy.java | 7 ++
.../model/OffloadingStrategyFactory.java | 59 +++++++++++++
.../model/property/CPULoadResourceProperty.java | 2 +-
.../property/FreeDiskSpaceResourceProperty.java | 2 +-
.../model/property/FreeMemoryResourceProperty.java | 2 +-
.../model/property/ResourceProperty.java | 2 +-
.../management/pe/InvocableElementManager.java | 57 ++++++++++---
.../management/resource/ResourceManager.java | 12 +--
.../migration/MigrationPipelineGenerator.java | 99 ++++++++++++++++++----
.../manager/node/StreamPipesClusterManager.java | 7 +-
.../resources/ClusterResourceManager.java | 95 ++++++++++++---------
.../org/apache/streampipes/rest/impl/Node.java | 4 -
16 files changed, 277 insertions(+), 93 deletions(-)
diff --git a/streampipes-backend/src/main/java/org/apache/streampipes/backend/StreamPipesBackendApplication.java b/streampipes-backend/src/main/java/org/apache/streampipes/backend/StreamPipesBackendApplication.java
index 899eb8d..aced851 100644
--- a/streampipes-backend/src/main/java/org/apache/streampipes/backend/StreamPipesBackendApplication.java
+++ b/streampipes-backend/src/main/java/org/apache/streampipes/backend/StreamPipesBackendApplication.java
@@ -20,6 +20,7 @@ package org.apache.streampipes.backend;
import org.apache.shiro.web.env.EnvironmentLoaderListener;
import org.apache.shiro.web.servlet.OncePerRequestFilter;
import org.apache.shiro.web.servlet.ShiroFilter;
+import org.apache.streampipes.manager.node.management.resources.ClusterResourceManager;
import org.apache.streampipes.manager.operations.Operations;
import org.apache.streampipes.model.pipeline.Pipeline;
import org.apache.streampipes.model.pipeline.PipelineOperationStatus;
@@ -60,6 +61,7 @@ public class StreamPipesBackendApplication {
ScheduledExecutorService executorService = Executors.newSingleThreadScheduledExecutor();
executorService.schedule(this::startAllPreviouslyStoppedPipelines, 5, TimeUnit.SECONDS);
+ executorService.scheduleAtFixedRate(ClusterResourceManager.getInstance()::checkResources, 30l, 60l, TimeUnit.SECONDS);
}
@PreDestroy
diff --git a/streampipes-model/src/main/java/org/apache/streampipes/model/message/NotificationType.java b/streampipes-model/src/main/java/org/apache/streampipes/model/message/NotificationType.java
index 270d77d..a0a8e7d 100644
--- a/streampipes-model/src/main/java/org/apache/streampipes/model/message/NotificationType.java
+++ b/streampipes-model/src/main/java/org/apache/streampipes/model/message/NotificationType.java
@@ -33,6 +33,7 @@ public enum NotificationType {
NO_MATCHING_PROTOCOL_CONNECTION("Not a valid connection", "No supported input protocol matches provided output protocol"),
REMOTE_SERVER_NOT_ACCESSIBLE("Can't connect to remote server", "Please contact the admin of the system"),
NO_MATCHING_SCHEME("The JSON from the server is not valid", "The keys in the element description don't map the keys in the JSON response"),
+ NO_NODE_FOUND("No node found", "No node fulfilling the requirements is available"),
LOGIN_FAILED("Login failed", "Please re-enter your password"),
LOGIN_SUCCESS("Login success", ""),
diff --git a/streampipes-node-controller/src/main/java/org/apache/streampipes/node/controller/management/resource/model/ResourceMetrics.java b/streampipes-model/src/main/java/org/apache/streampipes/model/resource/ResourceMetrics.java
similarity index 97%
rename from streampipes-node-controller/src/main/java/org/apache/streampipes/node/controller/management/resource/model/ResourceMetrics.java
rename to streampipes-model/src/main/java/org/apache/streampipes/model/resource/ResourceMetrics.java
index ddaaff6..a70bb4b 100644
--- a/streampipes-node-controller/src/main/java/org/apache/streampipes/node/controller/management/resource/model/ResourceMetrics.java
+++ b/streampipes-model/src/main/java/org/apache/streampipes/model/resource/ResourceMetrics.java
@@ -15,7 +15,7 @@
* limitations under the License.
*
*/
-package org.apache.streampipes.node.controller.management.resource.model;
+package org.apache.streampipes.model.resource;
public class ResourceMetrics {
diff --git a/streampipes-node-controller/src/main/java/org/apache/streampipes/node/controller/management/offloading/OffloadingPolicyManager.java b/streampipes-node-controller/src/main/java/org/apache/streampipes/node/controller/management/offloading/OffloadingPolicyManager.java
index f041695..ed0d922 100644
--- a/streampipes-node-controller/src/main/java/org/apache/streampipes/node/controller/management/offloading/OffloadingPolicyManager.java
+++ b/streampipes-node-controller/src/main/java/org/apache/streampipes/node/controller/management/offloading/OffloadingPolicyManager.java
@@ -18,9 +18,13 @@
package org.apache.streampipes.node.controller.management.offloading;
+import org.apache.streampipes.model.Response;
+import org.apache.streampipes.model.base.InvocableStreamPipesEntity;
import org.apache.streampipes.node.controller.management.offloading.model.OffloadingStrategy;
import org.apache.streampipes.node.controller.management.pe.InvocableElementManager;
-import org.apache.streampipes.node.controller.management.resource.model.ResourceMetrics;
+import org.apache.streampipes.model.resource.ResourceMetrics;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import java.util.ArrayList;
import java.util.List;
@@ -29,6 +33,7 @@ public class OffloadingPolicyManager {
private final List<OffloadingStrategy<?>> offloadingStrategies = new ArrayList<>();
private static OffloadingPolicyManager instance;
+ private static final Logger LOG = LoggerFactory.getLogger(OffloadingPolicyManager.class.getCanonicalName());
public static OffloadingPolicyManager getInstance(){
if(instance == null){
@@ -41,7 +46,15 @@ public class OffloadingPolicyManager {
for(OffloadingStrategy strategy:offloadingStrategies){
strategy.getOffloadingPolicy().addValue(strategy.getResourceProperty().getProperty(rm));
if(strategy.getOffloadingPolicy().isViolated()){
- InvocableElementManager.getInstance().postOffloadRequest(strategy.getSelectionStrategy().selectEntity());
+ InvocableStreamPipesEntity offloadEntity = strategy.getSelectionStrategy().selectEntity();
+ if(offloadEntity != null){
+ Response resp = InvocableElementManager.getInstance().postOffloadRequest(offloadEntity);
+ if(resp.isSuccess())
+ LOG.info("Successfully offloaded: " + offloadEntity.getAppId()
+ + " from Pipeline: " + offloadEntity.getCorrespondingPipeline());
+ else LOG.info("Failed to offload: " + offloadEntity.getAppId()
+ + " from Pipeline: " + offloadEntity.getCorrespondingPipeline());
+ }else LOG.info("No entity to offload found");
}
}
}
diff --git a/streampipes-node-controller/src/main/java/org/apache/streampipes/node/controller/management/offloading/model/OffloadingStrategy.java b/streampipes-node-controller/src/main/java/org/apache/streampipes/node/controller/management/offloading/model/OffloadingStrategy.java
index 8a6218d..d359508 100644
--- a/streampipes-node-controller/src/main/java/org/apache/streampipes/node/controller/management/offloading/model/OffloadingStrategy.java
+++ b/streampipes-node-controller/src/main/java/org/apache/streampipes/node/controller/management/offloading/model/OffloadingStrategy.java
@@ -21,17 +21,24 @@ package org.apache.streampipes.node.controller.management.offloading.model;
import org.apache.streampipes.node.controller.management.offloading.model.policies.OffloadingPolicy;
import org.apache.streampipes.node.controller.management.offloading.model.property.ResourceProperty;
import org.apache.streampipes.node.controller.management.offloading.model.selection.SelectionStrategy;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
public class OffloadingStrategy<T> {
private SelectionStrategy selectionStrategy;
private OffloadingPolicy<T> offloadingPolicy;
private ResourceProperty<T> resourceProperty;
+ private static final Logger LOG = LoggerFactory.getLogger(OffloadingStrategy.class.getCanonicalName());
public OffloadingStrategy(OffloadingPolicy offloadingPolicy, ResourceProperty resourceProperty,
SelectionStrategy selectionStrategy){
this.offloadingPolicy = offloadingPolicy;
this.resourceProperty = resourceProperty;
this.selectionStrategy = selectionStrategy;
+ LOG.info("Registered offloading strategy: "
+ + this.selectionStrategy.getClass().getSimpleName() + " | "
+ + this.resourceProperty.getClass().getSimpleName() + " | "
+ + this.offloadingPolicy.getClass().getSimpleName());
}
public SelectionStrategy getSelectionStrategy() {
diff --git a/streampipes-node-controller/src/main/java/org/apache/streampipes/node/controller/management/offloading/model/OffloadingStrategyFactory.java b/streampipes-node-controller/src/main/java/org/apache/streampipes/node/controller/management/offloading/model/OffloadingStrategyFactory.java
new file mode 100644
index 0000000..19e852e
--- /dev/null
+++ b/streampipes-node-controller/src/main/java/org/apache/streampipes/node/controller/management/offloading/model/OffloadingStrategyFactory.java
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.streampipes.node.controller.management.offloading.model;
+
+import org.apache.streampipes.node.controller.management.offloading.model.policies.Comparator;
+import org.apache.streampipes.node.controller.management.offloading.model.policies.OffloadingPolicy;
+import org.apache.streampipes.node.controller.management.offloading.model.policies.ThresholdViolationOffloadingPolicy;
+import org.apache.streampipes.node.controller.management.offloading.model.property.CPULoadResourceProperty;
+import org.apache.streampipes.node.controller.management.offloading.model.property.FreeDiskSpaceResourceProperty;
+import org.apache.streampipes.node.controller.management.offloading.model.property.FreeMemoryResourceProperty;
+import org.apache.streampipes.node.controller.management.offloading.model.selection.CPULoadSelectionStrategy;
+import org.apache.streampipes.node.controller.management.offloading.model.selection.RandomSelectionStrategy;
+import org.apache.streampipes.node.controller.management.offloading.model.selection.SelectionStrategy;
+
+public class OffloadingStrategyFactory {
+
+ public OffloadingStrategy getFromEnv(){
+ if(System.getenv("SP_AUTO_OFFLOADING_POLICY") == null || System.getenv("SP_AUTO_OFFLOADING_POLICY").isEmpty()){
+ return new OffloadingStrategy<Float>(new ThresholdViolationOffloadingPolicy<>(5, Comparator.GREATER,90f, 4),
+ new CPULoadResourceProperty(), new RandomSelectionStrategy());
+ }else{
+ switch (System.getenv("SP_AUTO_OFFLOADING_POLICY")){
+ case "CPU":
+ return new OffloadingStrategy<Float>(new ThresholdViolationOffloadingPolicy<>(5, Comparator.GREATER,90f, 4),
+ new CPULoadResourceProperty(), new RandomSelectionStrategy());
+ case "debug":
+ return new OffloadingStrategy<Float>(new ThresholdViolationOffloadingPolicy<>(5,
+ Comparator.GREATER,0.5f, 3),
+ new CPULoadResourceProperty(), new RandomSelectionStrategy());
+ case "memory":
+ return new OffloadingStrategy<Long>(new ThresholdViolationOffloadingPolicy<>(5, Comparator.SMALLER,
+ 549755813888l, 5),
+ new FreeMemoryResourceProperty(), new RandomSelectionStrategy());
+ case "disk space":
+ return new OffloadingStrategy<Long>(new ThresholdViolationOffloadingPolicy<>(5, Comparator.SMALLER,
+ 549755813888l, 5),
+ new FreeDiskSpaceResourceProperty(), new RandomSelectionStrategy());
+ }
+ }
+ return null;
+ }
+
+}
diff --git a/streampipes-node-controller/src/main/java/org/apache/streampipes/node/controller/management/offloading/model/property/CPULoadResourceProperty.java b/streampipes-node-controller/src/main/java/org/apache/streampipes/node/controller/management/offloading/model/property/CPULoadResourceProperty.java
index 7beb720..b695c4f 100644
--- a/streampipes-node-controller/src/main/java/org/apache/streampipes/node/controller/management/offloading/model/property/CPULoadResourceProperty.java
+++ b/streampipes-node-controller/src/main/java/org/apache/streampipes/node/controller/management/offloading/model/property/CPULoadResourceProperty.java
@@ -18,7 +18,7 @@
package org.apache.streampipes.node.controller.management.offloading.model.property;
-import org.apache.streampipes.node.controller.management.resource.model.ResourceMetrics;
+import org.apache.streampipes.model.resource.ResourceMetrics;
public class CPULoadResourceProperty implements ResourceProperty<Float>{
diff --git a/streampipes-node-controller/src/main/java/org/apache/streampipes/node/controller/management/offloading/model/property/FreeDiskSpaceResourceProperty.java b/streampipes-node-controller/src/main/java/org/apache/streampipes/node/controller/management/offloading/model/property/FreeDiskSpaceResourceProperty.java
index 840de8d..31e7ab8 100644
--- a/streampipes-node-controller/src/main/java/org/apache/streampipes/node/controller/management/offloading/model/property/FreeDiskSpaceResourceProperty.java
+++ b/streampipes-node-controller/src/main/java/org/apache/streampipes/node/controller/management/offloading/model/property/FreeDiskSpaceResourceProperty.java
@@ -18,7 +18,7 @@
package org.apache.streampipes.node.controller.management.offloading.model.property;
-import org.apache.streampipes.node.controller.management.resource.model.ResourceMetrics;
+import org.apache.streampipes.model.resource.ResourceMetrics;
public class FreeDiskSpaceResourceProperty implements ResourceProperty<Long>{
diff --git a/streampipes-node-controller/src/main/java/org/apache/streampipes/node/controller/management/offloading/model/property/FreeMemoryResourceProperty.java b/streampipes-node-controller/src/main/java/org/apache/streampipes/node/controller/management/offloading/model/property/FreeMemoryResourceProperty.java
index 11c5ce3..e58c4a3 100644
--- a/streampipes-node-controller/src/main/java/org/apache/streampipes/node/controller/management/offloading/model/property/FreeMemoryResourceProperty.java
+++ b/streampipes-node-controller/src/main/java/org/apache/streampipes/node/controller/management/offloading/model/property/FreeMemoryResourceProperty.java
@@ -18,7 +18,7 @@
package org.apache.streampipes.node.controller.management.offloading.model.property;
-import org.apache.streampipes.node.controller.management.resource.model.ResourceMetrics;
+import org.apache.streampipes.model.resource.ResourceMetrics;
public class FreeMemoryResourceProperty implements ResourceProperty<Long> {
@Override
diff --git a/streampipes-node-controller/src/main/java/org/apache/streampipes/node/controller/management/offloading/model/property/ResourceProperty.java b/streampipes-node-controller/src/main/java/org/apache/streampipes/node/controller/management/offloading/model/property/ResourceProperty.java
index 9b61663..13abc3f 100644
--- a/streampipes-node-controller/src/main/java/org/apache/streampipes/node/controller/management/offloading/model/property/ResourceProperty.java
+++ b/streampipes-node-controller/src/main/java/org/apache/streampipes/node/controller/management/offloading/model/property/ResourceProperty.java
@@ -18,7 +18,7 @@
package org.apache.streampipes.node.controller.management.offloading.model.property;
-import org.apache.streampipes.node.controller.management.resource.model.ResourceMetrics;
+import org.apache.streampipes.model.resource.ResourceMetrics;
public interface ResourceProperty<T> {
T getProperty(ResourceMetrics resourceMetrics);
diff --git a/streampipes-node-controller/src/main/java/org/apache/streampipes/node/controller/management/pe/InvocableElementManager.java b/streampipes-node-controller/src/main/java/org/apache/streampipes/node/controller/management/pe/InvocableElementManager.java
index 8f25aa7..1a318d9 100644
--- a/streampipes-node-controller/src/main/java/org/apache/streampipes/node/controller/management/pe/InvocableElementManager.java
+++ b/streampipes-node-controller/src/main/java/org/apache/streampipes/node/controller/management/pe/InvocableElementManager.java
@@ -167,15 +167,13 @@ public class InvocableElementManager implements IPipelineElementLifeCycle {
return response;
}
- public void postOffloadRequest(InvocableStreamPipesEntity instanceToOffload){
+ public Response postOffloadRequest(InvocableStreamPipesEntity instanceToOffload){
try {
String url = generateBackendOffloadEndpoint();
String desc = toJson(instanceToOffload);
- Request.Post(url)
- .bodyString(desc, ContentType.APPLICATION_JSON)
- .execute();
+ return handleResponse(Request.Post(url).bodyString(desc, ContentType.APPLICATION_JSON).execute());
} catch (IOException e) {
- e.printStackTrace();
+ throw new SpRuntimeException(e);
}
}
@@ -275,14 +273,49 @@ public class InvocableElementManager implements IPipelineElementLifeCycle {
}
private List<ConsumableStreamPipesEntity> getSupportedEntities(InvocableRegistration registration){
- //Check if the node supports the entity (atm checks if it has a GPU) TODO: simplify; check for other Hardware
- return registration.getSupportedPipelineElements().stream().filter(nse -> (!nse.getResourceRequirements().stream()
+ //Check if the node supports the entity (atm only hardware requirements; could be expanded to include
+ // software requirements)
+ return registration.getSupportedPipelineElements().stream()
+ .filter(this::checkGpuRequirement)
+ .filter(this::checkCpuRequirement)
+ .filter(this::checkMemoryRequirement)
+ //.filter(this::checkDiskSpaceRequirement) currently does not work (node diskspace is falsely set to 0
+ // in Node Hardware Resources, regardless of actual free diskspace) TODO: fix this issue
+ .collect(Collectors.toList());
+ }
+
+ private boolean checkGpuRequirement(ConsumableStreamPipesEntity spEntity){
+ boolean entityRequiresGpu = spEntity.getResourceRequirements().stream()
.filter(req -> req instanceof Hardware).filter(req -> ((Hardware) req).isGpu()).map(req -> ((Hardware) req)
- .isGpu()).findFirst().orElse(false) ||
- nse.getResourceRequirements().stream().filter(req -> req instanceof Hardware)
- .filter(req -> ((Hardware) req).isGpu()).map(req -> ((Hardware) req)
- .isGpu()).findFirst().orElse(false) == getNodeInfoDescription()
- .getNodeResources().getHardwareResource().getGpu().getCores()>0)).collect(Collectors.toList());
+ .isGpu()).findFirst().orElse(false);
+ boolean nodeHasGPU = (getNodeInfoDescription().getNodeResources().getHardwareResource().getGpu().getCores()>0);
+ return (!entityRequiresGpu || nodeHasGPU);
+ }
+
+ private boolean checkCpuRequirement(ConsumableStreamPipesEntity spEntity){
+ int requiredCPUCores = spEntity.getResourceRequirements().stream()
+ .filter(req -> req instanceof Hardware).filter(req -> ((Hardware) req).isGpu())
+ .map(req -> ((Hardware) req).getCpuCores()).findFirst().orElse(0);
+ int nodeCPUCores = getNodeInfoDescription().getNodeResources().getHardwareResource().getCpu().getCores();
+ return (requiredCPUCores <= nodeCPUCores);
+ }
+
+ private boolean checkMemoryRequirement(ConsumableStreamPipesEntity spEntity){
+ long requiredMemory = spEntity.getResourceRequirements().stream()
+ .filter(req -> req instanceof Hardware).filter(req -> ((Hardware) req).isGpu())
+ .map(req -> ((Hardware) req).getMemory()).findFirst().orElse(0l);
+ //Looks at total memory (could be adjusted to consider currently available Memory)
+ long actualNodeMemory = getNodeInfoDescription().getNodeResources().getHardwareResource().getMemory().getMemTotal();
+ return (requiredMemory <= actualNodeMemory);
+ }
+
+ private boolean checkDiskSpaceRequirement(ConsumableStreamPipesEntity spEntity){
+ long requiredDiskSpace = spEntity.getResourceRequirements().stream()
+ .filter(req -> req instanceof Hardware).filter(req -> ((Hardware) req).isGpu())
+ .map(req -> ((Hardware) req).getDisk()).findFirst().orElse(0l);
+ //Looks at total diskspace (could be adjusted to consider currently available diskspace)
+ long actualNodeDiskSpace = getNodeInfoDescription().getNodeResources().getHardwareResource().getDisk().getDiskTotal();
+ return (requiredDiskSpace <= actualNodeDiskSpace);
}
private List<InvocableStreamPipesEntity> getAllInvocables() {
diff --git a/streampipes-node-controller/src/main/java/org/apache/streampipes/node/controller/management/resource/ResourceManager.java b/streampipes-node-controller/src/main/java/org/apache/streampipes/node/controller/management/resource/ResourceManager.java
index a65039a..c3df5b9 100644
--- a/streampipes-node-controller/src/main/java/org/apache/streampipes/node/controller/management/resource/ResourceManager.java
+++ b/streampipes-node-controller/src/main/java/org/apache/streampipes/node/controller/management/resource/ResourceManager.java
@@ -21,12 +21,8 @@ import com.fasterxml.jackson.core.JsonProcessingException;
import org.apache.streampipes.commons.exceptions.SpRuntimeException;
import org.apache.streampipes.node.controller.config.NodeConfiguration;
import org.apache.streampipes.node.controller.management.offloading.OffloadingPolicyManager;
-import org.apache.streampipes.node.controller.management.offloading.model.OffloadingStrategy;
-import org.apache.streampipes.node.controller.management.offloading.model.policies.Comparator;
-import org.apache.streampipes.node.controller.management.offloading.model.policies.ThresholdViolationOffloadingPolicy;
-import org.apache.streampipes.node.controller.management.offloading.model.property.CPULoadResourceProperty;
-import org.apache.streampipes.node.controller.management.offloading.model.selection.RandomSelectionStrategy;
-import org.apache.streampipes.node.controller.management.resource.model.ResourceMetrics;
+import org.apache.streampipes.node.controller.management.offloading.model.OffloadingStrategyFactory;
+import org.apache.streampipes.model.resource.ResourceMetrics;
import org.apache.streampipes.node.controller.management.resource.utils.DiskSpace;
import org.apache.streampipes.node.controller.management.resource.utils.ResourceUtils;
import org.apache.streampipes.serializers.json.JacksonSerializer;
@@ -53,9 +49,7 @@ public class ResourceManager {
private ResourceManager() {
//Offloading Policy
- OffloadingPolicyManager.getInstance().addOffloadingStrategy(new OffloadingStrategy<Float>(new
- ThresholdViolationOffloadingPolicy<>(5, Comparator.GREATER,60f, 5),
- new CPULoadResourceProperty(), new RandomSelectionStrategy()));
+ OffloadingPolicyManager.getInstance().addOffloadingStrategy(new OffloadingStrategyFactory().getFromEnv());
}
public static ResourceManager getInstance() {
diff --git a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/migration/MigrationPipelineGenerator.java b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/migration/MigrationPipelineGenerator.java
index b8a688e..0545d22 100644
--- a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/migration/MigrationPipelineGenerator.java
+++ b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/migration/MigrationPipelineGenerator.java
@@ -19,42 +19,100 @@ package org.apache.streampipes.manager.migration;
import com.fasterxml.jackson.core.JsonProcessingException;
import org.apache.streampipes.manager.node.StreamPipesClusterManager;
+import org.apache.streampipes.manager.node.management.resources.ClusterResourceManager;
import org.apache.streampipes.model.base.InvocableStreamPipesEntity;
import org.apache.streampipes.model.graph.DataProcessorInvocation;
import org.apache.streampipes.model.node.NodeInfoDescription;
import org.apache.streampipes.model.pipeline.Pipeline;
+import org.apache.streampipes.model.resource.Hardware;
+import org.apache.streampipes.model.resource.ResourceMetrics;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Optional;
-import java.util.Random;
+import java.util.*;
+import java.util.stream.Collectors;
public class MigrationPipelineGenerator {
- public static Pipeline generateMigrationPipeline(InvocableStreamPipesEntity entityToMigrate, Pipeline correspondingPipeline){
+ private InvocableStreamPipesEntity entityToMigrate;
+ private Pipeline correspondingPipeline;
+ public MigrationPipelineGenerator(InvocableStreamPipesEntity entityToMigrate, Pipeline correspondingPipeline){
+ this.entityToMigrate = entityToMigrate;
+ this.correspondingPipeline = correspondingPipeline;
+ }
+
+
+ public Pipeline generateMigrationPipeline(){
+
+ List<NodeInfoDescription> possibleTargetNodes = getNodeInfos();
+
+ switch(correspondingPipeline.getExecutionPolicy()){
+ case "custom":
+ possibleTargetNodes = filterLocationTags(possibleTargetNodes);
+ case "locality-aware":
+ //TODO: incorporate strategy for locality-aware deployment
+ case "default":
+ //TODO: incorporate strategy for default deployment
+ }
+
+ //Check current resource utilization on node
+ possibleTargetNodes = filterResourceUtilization(possibleTargetNodes);
+
+
+ //Different strategies possible (atm cancel offloading)
+ if(possibleTargetNodes == null || possibleTargetNodes.isEmpty())
+ return null;
+
+ //Random Selection of new Node within the remaining possible nodes
+ changeEntityDescriptionToMatchRandomNode(possibleTargetNodes);
+
+ return generateTargetPipeline();
+ }
+
+ private List<NodeInfoDescription> getNodeInfos(){
List<NodeInfoDescription> possibleTargetNodes = new ArrayList<>();
List<NodeInfoDescription> nodeInfo = StreamPipesClusterManager.getAllActiveAndHealthyNodes();
nodeInfo.forEach(desc ->{
if(desc.getSupportedElements().stream().anyMatch(element -> element.equals(entityToMigrate.getAppId()))
- && !desc.getNodeControllerId().equals(entityToMigrate.getDeploymentTargetNodeId()))
+ && !desc.getNodeControllerId().equals(entityToMigrate.getDeploymentTargetNodeId()))
possibleTargetNodes.add(desc);
});
+ return possibleTargetNodes;
+ }
- if(possibleTargetNodes.isEmpty())
- return null;
+ private List<NodeInfoDescription> filterLocationTags(List<NodeInfoDescription> possibleTargetNodes){
+ return possibleTargetNodes.stream()
+ .filter(desc -> nodeTagsContainElementTag(correspondingPipeline.getNodeTags(), desc))
+ .collect(Collectors.toList());
+ }
- //Choose random node; should be adjusted to seek for a proper node to migrate to (e.g. based on user e.g.
- // selected labels, locality, free resources,...)
- NodeInfoDescription targetNode = possibleTargetNodes.get(new Random().nextInt(possibleTargetNodes.size()));
+ private boolean nodeTagsContainElementTag(List<String> pipelineNodeTags,
+ NodeInfoDescription desc){
+ return desc.getStaticNodeMetadata().getLocationTags().stream().anyMatch(pipelineNodeTags::contains);
+ }
- entityToMigrate.setDeploymentTargetNodeHostname(targetNode.getHostname());
- entityToMigrate.setDeploymentTargetNodeId(targetNode.getNodeControllerId());
- entityToMigrate.setDeploymentTargetNodePort(targetNode.getPort());
- entityToMigrate.setElementEndpointHostname(targetNode.getHostname());
- entityToMigrate.setElementEndpointPort(targetNode.getPort());
+ private List<NodeInfoDescription> filterResourceUtilization(List<NodeInfoDescription> possibleTargetNodes){
+ //Currently only checking for free disk space and memory
+ List<NodeInfoDescription> filteredTargetNodes = new ArrayList<>();
+ for(NodeInfoDescription nodeInfo : possibleTargetNodes){
+ Queue<ResourceMetrics> rmHistory = ClusterResourceManager.getResourceMetricsMap()
+ .get(nodeInfo.getNodeControllerId());
+ if(rmHistory == null) return null;
+ Hardware hardware = entityToMigrate.getResourceRequirements().stream()
+ .filter(nodeRR -> nodeRR instanceof Hardware).map(nodeRR -> (Hardware)nodeRR).findFirst().
+ orElse(null);
+ if(hardware != null){
+ if (rmHistory.peek() != null
+ && hardware.getDisk() <= rmHistory.peek().getFreeDiskSpaceInBytes()
+ && hardware.getMemory() <= rmHistory.peek().getFreeMemoryInBytes()) {
+ filteredTargetNodes.add(nodeInfo);
+ }
+ }
+ }
+ return filteredTargetNodes;
+ }
+ private Pipeline generateTargetPipeline(){
Optional<DataProcessorInvocation> originalInvocation =
correspondingPipeline.getSepas().stream().filter(dp ->
dp.getDeploymentRunningInstanceId().equals(entityToMigrate.getDeploymentRunningInstanceId()))
@@ -75,4 +133,13 @@ public class MigrationPipelineGenerator {
return targetPipeline;
}
+ private void changeEntityDescriptionToMatchRandomNode(List<NodeInfoDescription> nodes){
+ NodeInfoDescription targetNode = nodes.get(new Random().nextInt(nodes.size()));
+
+ entityToMigrate.setDeploymentTargetNodeHostname(targetNode.getHostname());
+ entityToMigrate.setDeploymentTargetNodeId(targetNode.getNodeControllerId());
+ entityToMigrate.setDeploymentTargetNodePort(targetNode.getPort());
+ entityToMigrate.setElementEndpointHostname(targetNode.getHostname());
+ entityToMigrate.setElementEndpointPort(targetNode.getPort());
+ }
}
diff --git a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/node/StreamPipesClusterManager.java b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/node/StreamPipesClusterManager.java
index 3ab464b..f792197 100644
--- a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/node/StreamPipesClusterManager.java
+++ b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/node/StreamPipesClusterManager.java
@@ -170,11 +170,10 @@ public class StreamPipesClusterManager extends AbstractClusterManager {
public static Message handleOffloadRequest(InvocableStreamPipesEntity elementToMigrate) {
Pipeline currentPipeline = getPipelineStorageApi().getPipeline(elementToMigrate.getCorrespondingPipeline());
- Pipeline offloadPipeline = MigrationPipelineGenerator.generateMigrationPipeline(elementToMigrate,
- currentPipeline);
- //TODO: Handle this case properly
+ Pipeline offloadPipeline = new MigrationPipelineGenerator(elementToMigrate, currentPipeline)
+ .generateMigrationPipeline();
if(offloadPipeline == null)
- return Notifications.error(NotificationType.UNKNOWN_ERROR);
+ return Notifications.error(NotificationType.NO_NODE_FOUND);
try {
PipelineOperationStatus status = Operations.handlePipelineElementMigration(offloadPipeline,
diff --git a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/node/management/resources/ClusterResourceManager.java b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/node/management/resources/ClusterResourceManager.java
index 7b75c19..47cea26 100644
--- a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/node/management/resources/ClusterResourceManager.java
+++ b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/node/management/resources/ClusterResourceManager.java
@@ -17,21 +17,29 @@
*/
package org.apache.streampipes.manager.node.management.resources;
+import com.fasterxml.jackson.core.JsonProcessingException;
+import org.apache.http.client.ClientProtocolException;
+import org.apache.http.client.fluent.Request;
+import org.apache.http.client.fluent.Response;
import org.apache.streampipes.model.node.NodeInfoDescription;
+import org.apache.streampipes.model.resource.ResourceMetrics;
+import org.apache.streampipes.serializers.json.JacksonSerializer;
import org.apache.streampipes.storage.api.INodeInfoStorage;
import org.apache.streampipes.storage.management.StorageDispatcher;
-import java.net.InetSocketAddress;
-import java.net.MalformedURLException;
-import java.net.Socket;
-import java.net.URL;
+import java.io.IOException;
+import java.net.*;
+import java.util.HashMap;
import java.util.List;
+import java.util.Map;
+import java.util.Queue;
+import java.util.concurrent.ArrayBlockingQueue;
public class ClusterResourceManager {
- private static final int RESOURCE_RETRIEVE_FREQUENCY_MS = 60000;
- private static final int SOCKET_TIMEOUT_MS = 500;
+ private static final int SOCKET_TIMEOUT_MS = 1000;
private static ClusterResourceManager instance = null;
+ private static Map<String, Queue<ResourceMetrics>> resourceMetricsMap = new HashMap<>();
private ClusterResourceManager() {}
@@ -45,47 +53,33 @@ public class ClusterResourceManager {
return instance;
}
- public void run() {
- new Thread(getNodes, "nodes").start();
+ public static Map<String, Queue<ResourceMetrics>> getResourceMetricsMap(){
+ return resourceMetricsMap;
}
- private final Runnable getNodes = () -> {
- while (true) {
- try {
- List<NodeInfoDescription> nodes = getNodeStorageApi().getAllNodes();
- if (nodes.size() > 0) {
- nodes.forEach(node -> {
- try {
- URL nodeUrl = generateNodeUrl(node);
- // TODO: gather current resources from all active node controller endpoints
-
- } catch (MalformedURLException e) {
- e.printStackTrace();
- }
- });
+ public void checkResources(){
+ List<NodeInfoDescription> nodes = getNodeStorageApi().getAllActiveNodes();
+ if (nodes.size() > 0) {
+ nodes.forEach(node -> {
+ try {
+ URL nodeUrl = generateNodeUrl(node);
+ Response resp = Request.Get(nodeUrl.toURI()).socketTimeout(SOCKET_TIMEOUT_MS).execute();
+ addResourceMetrics(node, extractResourceMetrics(resp.returnContent().asString()));
+ } catch (MalformedURLException e) {
+ e.printStackTrace();
+ } catch (ClientProtocolException e) {
+ e.printStackTrace();
+ } catch (IOException e) {
+ e.printStackTrace();
+ } catch (URISyntaxException e) {
+ e.printStackTrace();
}
- Thread.sleep(RESOURCE_RETRIEVE_FREQUENCY_MS);
- } catch (InterruptedException e) {
- e.printStackTrace();
- }
+ });
}
- };
-
- private URL generateNodeUrl(NodeInfoDescription desc) throws MalformedURLException {
- return new URL("http", desc.getHostname(), desc.getPort(), "");
}
- private boolean healthCheck(URL url) {
- boolean isAlive = true;
- try {
- InetSocketAddress sa = new InetSocketAddress(url.getHost(), url.getPort());
- Socket ss = new Socket();
- ss.connect(sa, SOCKET_TIMEOUT_MS);
- ss.close();
- } catch(Exception e) {
- isAlive = false;
- }
- return isAlive;
+ private URL generateNodeUrl(NodeInfoDescription desc) throws MalformedURLException {
+ return new URL("http", desc.getHostname(), desc.getPort(), "/api/v2/node/info/resources");
}
// Helpers
@@ -93,4 +87,23 @@ public class ClusterResourceManager {
private static INodeInfoStorage getNodeStorageApi() {
return StorageDispatcher.INSTANCE.getNoSqlStore().getNodeStorage();
}
+
+ private ResourceMetrics extractResourceMetrics(String rm){
+ try {
+ return JacksonSerializer.getObjectMapper().readValue(rm, ResourceMetrics.class);
+ } catch (JsonProcessingException e) {
+ e.printStackTrace();
+ }
+ return null;
+ }
+
+ private void addResourceMetrics(NodeInfoDescription node, ResourceMetrics rm){
+ if(!resourceMetricsMap.containsKey(node.getNodeControllerId()))
+ resourceMetricsMap.put(node.getNodeControllerId(), new ArrayBlockingQueue<ResourceMetrics>(10));
+ if(!resourceMetricsMap.get(node.getNodeControllerId()).offer(rm)){
+ resourceMetricsMap.get(node.getNodeControllerId()).poll();
+ resourceMetricsMap.get(node.getNodeControllerId()).offer(rm);
+ }
+ }
+
}
diff --git a/streampipes-rest/src/main/java/org/apache/streampipes/rest/impl/Node.java b/streampipes-rest/src/main/java/org/apache/streampipes/rest/impl/Node.java
index 5ccd75b..e3644c9 100644
--- a/streampipes-rest/src/main/java/org/apache/streampipes/rest/impl/Node.java
+++ b/streampipes-rest/src/main/java/org/apache/streampipes/rest/impl/Node.java
@@ -17,15 +17,11 @@
*/
package org.apache.streampipes.rest.impl;
-import org.apache.streampipes.manager.migration.MigrationPipelineGenerator;
import org.apache.streampipes.manager.node.StreamPipesClusterManager;
-import org.apache.streampipes.manager.operations.Operations;
import org.apache.streampipes.model.base.InvocableStreamPipesEntity;
import org.apache.streampipes.model.message.NotificationType;
import org.apache.streampipes.model.message.Notifications;
import org.apache.streampipes.model.node.NodeInfoDescription;
-import org.apache.streampipes.model.pipeline.Pipeline;
-import org.apache.streampipes.model.pipeline.PipelineOperationStatus;
import org.apache.streampipes.rest.api.INode;
import org.apache.streampipes.rest.shared.annotation.JacksonSerialized;