You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@streampipes.apache.org by go...@apache.org on 2021/05/10 21:03:16 UTC
[incubator-streampipes] 05/07: Merge branch 'edge-extensions' of
https://github.com/apache/incubator-streampipes into edge-extensions
This is an automated email from the ASF dual-hosted git repository.
gomm pushed a commit to branch edge-extensions
in repository https://gitbox.apache.org/repos/asf/incubator-streampipes.git
commit 388de9106b881e8e2806ea92faa85e9521fa2082
Merge: d27684a e4c427c
Author: Daniel Gomm <da...@outlook.de>
AuthorDate: Thu May 6 16:47:33 2021 +0200
Merge branch 'edge-extensions' of https://github.com/apache/incubator-streampipes into edge-extensions
Conflicts:
streampipes-backend/src/main/java/org/apache/streampipes/backend/StreamPipesBackendApplication.java
streampipes-model/src/main/java/org/apache/streampipes/model/node/monitor/ResourceMetrics.java
streampipes-model/src/main/java/org/apache/streampipes/model/resource/ResourceMetrics.java
streampipes-node-controller/src/main/java/org/apache/streampipes/node/controller/management/resource/ResourceManager.java
streampipes-node-controller/src/main/java/org/apache/streampipes/node/controller/management/resource/model/ResourceMetrics.java
streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/migration/MigrationPipelineGenerator.java
streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/node/StreamPipesClusterManager.java
streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/node/management/resources/ClusterResourceManager.java
streampipes-rest/src/main/java/org/apache/streampipes/rest/impl/NodeManagementResource.java
.../runConfigurations/node_controller_edge_01.xml | 1 +
.../runConfigurations/node_controller_edge_02.xml | 2 +-
pom.xml | 1 +
streampipes-backend/pom.xml | 6 +
.../backend/StreamPipesBackendApplication.java | 7 +-
.../backend/StreamPipesResourceConfig.java | 2 +-
.../apache/streampipes/model/NodeHealthStatus.java | 76 ++++++++
.../model/grounding/TransportProtocol.java | 1 +
.../streampipes/model/node/NodeCondition.java | 22 ++-
.../model/node/NodeInfoDescription.java | 23 +++
.../model/node/NodeInfoDescriptionBuilder.java | 1 +
.../model/node/container/DeploymentContainer.java | 51 ++++-
.../model/node/container/DockerContainer.java | 6 +-
.../node/container/DockerContainerBuilder.java | 19 +-
.../node/container/SupportedArchitectures.java | 30 +--
.../model/node/container/SupportedOsType.java | 30 +--
.../monitor}/ResourceMetrics.java | 2 +-
.../api/ContainerDeploymentResource.java | 16 +-
.../node/controller/api/HealthCheckResource.java | 10 +-
.../node/controller/config/NodeConfiguration.java | 1 +
.../container/DockerExtensionsContainer.java | 7 +
.../controller/container/DockerKafkaContainer.java | 7 +
.../container/DockerMosquittoContainer.java | 7 +
.../container/DockerZookeeperContainer.java | 7 +
.../management/NodeControllerSubmitter.java | 4 +-
.../controller/management/node/NodeConstants.java | 2 +-
...tratorManager.java => DockerEngineManager.java} | 24 +--
...inerOrchestrator.java => IContainerEngine.java} | 2 +-
.../docker/DockerContainerDeclarerSingleton.java | 2 +-
.../orchestrator/docker/utils/DockerUtils.java | 2 +-
.../management/pe/InvocableElementManager.java | 12 +-
.../management/resource/ResourceManager.java | 5 +
streampipes-node-management/pom.xml | 50 +++++
.../node/management/NodeManagement.java | 151 +++++++++++++++
.../management/operation/monitor/NodeMonitor.java | 9 +-
.../monitor/health/ClusterHealthCheckMonitor.java | 189 +++++++++++++++++++
.../operation/monitor/health/NodeHealthCheck.java | 64 +++++++
.../operation/monitor/health/NodeLiveness.java | 102 ++++++++++
.../monitor/resource/ClusterResourceMonitor.java | 65 +++++++
.../monitor/resource/NodeResourceCollector.java | 64 +++++++
.../management/operation/relay/RelayHandler.java | 60 ++++++
.../management/operation/relay/RelayOperation.java | 8 +-
.../operation/sync/SynchronizationFactory.java | 44 +++++
.../operation/sync/SynchronizationHandler.java | 77 ++++++++
.../operation/sync/SynchronizationType.java | 9 +-
.../node/management/utils/HttpRequest.java | 6 +-
.../node/management/utils/HttpUtils.java | 130 +++++++++++++
.../node/management/utils/StorageUtils.java | 84 +++++++++
streampipes-pipeline-management/pom.xml | 5 +
.../pipeline/AbstractPipelineExecutor.java | 163 ++++++++++++++--
.../pipeline/PipelineMigrationExecutor.java | 9 +-
.../manager/matching/InvocationGraphBuilder.java | 44 ++---
.../migration/MigrationPipelineGenerator.java | 3 +-
.../migration/PipelineElementOffloadHandler.java | 69 +++++++
.../manager/node/StreamPipesClusterManager.java | 207 ---------------------
.../management/cluster/AbstractClusterManager.java | 162 ----------------
.../management/cluster/AvailableNodesFetcher.java | 66 -------
.../management/healthcheck/ClusterHealthCheck.java | 82 --------
.../resources/ClusterResourceManager.java | 109 -----------
.../streampipes/manager/operations/Operations.java | 11 +-
streampipes-rest/pom.xml | 6 +
.../rest/api/{INode.java => INodeManagement.java} | 13 +-
.../{Node.java => NodeManagementResource.java} | 53 ++----
.../streampipes/rest/impl/PipelineResource.java | 12 ++
.../storage/couchdb/impl/NodeInfoStorageImpl.java | 8 +-
.../apache/streampipes/vocabulary/StreamPipes.java | 4 +-
.../node-configuration.component.html | 21 ++-
.../node-configuration.component.scss | 4 +
.../node-configuration.component.ts | 30 ++-
.../data-marketplace/data-marketplace.component.ts | 4 +-
ui/src/app/core-model/gen/streampipes-model.ts | 16 +-
.../migrate-pipeline-processors.component.ts | 2 +-
.../save-pipeline/save-pipeline.component.ts | 53 ++++--
ui/src/app/platform-services/apis/node.service.ts | 8 +-
74 files changed, 1810 insertions(+), 854 deletions(-)
diff --cc streampipes-node-controller/src/main/java/org/apache/streampipes/node/controller/management/pe/InvocableElementManager.java
index 1a318d9,3e2ad80..671630c
--- a/streampipes-node-controller/src/main/java/org/apache/streampipes/node/controller/management/pe/InvocableElementManager.java
+++ b/streampipes-node-controller/src/main/java/org/apache/streampipes/node/controller/management/pe/InvocableElementManager.java
@@@ -167,13 -163,16 +167,13 @@@ public class InvocableElementManager im
return response;
}
- public void postOffloadRequest(InvocableStreamPipesEntity instanceToOffload){
+ public Response postOffloadRequest(InvocableStreamPipesEntity instanceToOffload){
try {
- String url = generateBackendOffloadEndpoint();
+ String url = generatePipelineManagementOffloadEndpoint();
String desc = toJson(instanceToOffload);
- org.apache.http.client.fluent.Response resp = Request.Post(url)
- .bodyString(desc, ContentType.APPLICATION_JSON)
- .execute();
- resp.returnContent();
+ return handleResponse(Request.Post(url).bodyString(desc, ContentType.APPLICATION_JSON).execute());
} catch (IOException e) {
- e.printStackTrace();
+ throw new SpRuntimeException(e);
}
}
@@@ -259,10 -258,9 +259,10 @@@
}
private void updateAndSyncNodeInfoDescription(InvocableRegistration registration) {
- setSupportedPipelineElements(registration.getSupportedPipelineElementAppIds());
+ setSupportedPipelineElements(getSupportedEntities(registration).stream()
+ .map(NamedStreamPipesEntity::getAppId).collect(Collectors.toList()));
try {
- String url = generateBackendEndpoint();
+ String url = generateNodeManagementUpdateEndpoint();
String desc = toJson(getNodeInfoDescription());
Request.Put(url)
.bodyString(desc, ContentType.APPLICATION_JSON)
diff --cc streampipes-node-controller/src/main/java/org/apache/streampipes/node/controller/management/resource/ResourceManager.java
index c3df5b9,737f521..76eabfa
--- a/streampipes-node-controller/src/main/java/org/apache/streampipes/node/controller/management/resource/ResourceManager.java
+++ b/streampipes-node-controller/src/main/java/org/apache/streampipes/node/controller/management/resource/ResourceManager.java
@@@ -20,9 -20,11 +20,14 @@@ package org.apache.streampipes.node.con
import com.fasterxml.jackson.core.JsonProcessingException;
import org.apache.streampipes.commons.exceptions.SpRuntimeException;
import org.apache.streampipes.node.controller.config.NodeConfiguration;
+import org.apache.streampipes.node.controller.management.offloading.OffloadingPolicyManager;
+import org.apache.streampipes.node.controller.management.offloading.model.OffloadingStrategyFactory;
+import org.apache.streampipes.model.resource.ResourceMetrics;
+ import org.apache.streampipes.node.controller.management.offloading.AutoOffloadingManager;
+ import org.apache.streampipes.node.controller.management.offloading.policies.Comparator;
+ import org.apache.streampipes.node.controller.management.offloading.policies.MultiOccurrenceThresholdViolationPolicy;
+ import org.apache.streampipes.node.controller.management.offloading.policies.OffloadingPolicy;
+ import org.apache.streampipes.model.node.monitor.ResourceMetrics;
import org.apache.streampipes.node.controller.management.resource.utils.DiskSpace;
import org.apache.streampipes.node.controller.management.resource.utils.ResourceUtils;
import org.apache.streampipes.serializers.json.JacksonSerializer;
diff --cc streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/migration/MigrationPipelineGenerator.java
index 0545d22,f69a3b9..f670341
--- a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/migration/MigrationPipelineGenerator.java
+++ b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/migration/MigrationPipelineGenerator.java
@@@ -24,95 -22,39 +24,96 @@@ import org.apache.streampipes.model.bas
import org.apache.streampipes.model.graph.DataProcessorInvocation;
import org.apache.streampipes.model.node.NodeInfoDescription;
import org.apache.streampipes.model.pipeline.Pipeline;
+import org.apache.streampipes.model.resource.Hardware;
+import org.apache.streampipes.model.resource.ResourceMetrics;
+ import org.apache.streampipes.node.management.NodeManagement;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Optional;
-import java.util.Random;
+import java.util.*;
+import java.util.stream.Collectors;
public class MigrationPipelineGenerator {
- public static Pipeline generateMigrationPipeline(InvocableStreamPipesEntity entityToMigrate, Pipeline correspondingPipeline){
+ private InvocableStreamPipesEntity entityToMigrate;
+ private Pipeline correspondingPipeline;
+ public MigrationPipelineGenerator(InvocableStreamPipesEntity entityToMigrate, Pipeline correspondingPipeline){
+ this.entityToMigrate = entityToMigrate;
+ this.correspondingPipeline = correspondingPipeline;
+ }
+
+
+ public Pipeline generateMigrationPipeline(){
+
+ List<NodeInfoDescription> possibleTargetNodes = getNodeInfos();
+
+ switch(correspondingPipeline.getExecutionPolicy()){
+ case "custom":
+ possibleTargetNodes = filterLocationTags(possibleTargetNodes);
+ case "locality-aware":
+ //TODO: incorporate strategy for locality-aware deployment
+ case "default":
+ //TODO: incorporate strategy for default deployment
+ }
+
+ //Check current resource utilization on node
+ possibleTargetNodes = filterResourceUtilization(possibleTargetNodes);
+
+
+ //Different strategies possible (atm cancel offloading)
+ if(possibleTargetNodes == null || possibleTargetNodes.isEmpty())
+ return null;
+
+ //Random Selection of new Node within the remaining possible nodes
+ changeEntityDescriptionToMatchRandomNode(possibleTargetNodes);
+
+ return generateTargetPipeline();
+ }
+
+ private List<NodeInfoDescription> getNodeInfos(){
List<NodeInfoDescription> possibleTargetNodes = new ArrayList<>();
- List<NodeInfoDescription> nodeInfo = StreamPipesClusterManager.getAllActiveAndHealthyNodes();
+ List<NodeInfoDescription> nodeInfo = NodeManagement.getInstance().getOnlineNodes();
nodeInfo.forEach(desc ->{
if(desc.getSupportedElements().stream().anyMatch(element -> element.equals(entityToMigrate.getAppId()))
- && !desc.getNodeControllerId().equals(entityToMigrate.getDeploymentTargetNodeId()))
+ && !desc.getNodeControllerId().equals(entityToMigrate.getDeploymentTargetNodeId()))
possibleTargetNodes.add(desc);
});
+ return possibleTargetNodes;
+ }
- if(possibleTargetNodes.isEmpty())
- return null;
+ private List<NodeInfoDescription> filterLocationTags(List<NodeInfoDescription> possibleTargetNodes){
+ return possibleTargetNodes.stream()
+ .filter(desc -> nodeTagsContainElementTag(correspondingPipeline.getNodeTags(), desc))
+ .collect(Collectors.toList());
+ }
- //Choose random node; should be adjusted to seek for a proper node to migrate to (e.g. based on user e.g.
- // selected labels, locality, free resources,...)
- NodeInfoDescription targetNode = possibleTargetNodes.get(new Random().nextInt(possibleTargetNodes.size()));
+ private boolean nodeTagsContainElementTag(List<String> pipelineNodeTags,
+ NodeInfoDescription desc){
+ return desc.getStaticNodeMetadata().getLocationTags().stream().anyMatch(pipelineNodeTags::contains);
+ }
- entityToMigrate.setDeploymentTargetNodeHostname(targetNode.getHostname());
- entityToMigrate.setDeploymentTargetNodeId(targetNode.getNodeControllerId());
- entityToMigrate.setDeploymentTargetNodePort(targetNode.getPort());
- entityToMigrate.setElementEndpointHostname(targetNode.getHostname());
- entityToMigrate.setElementEndpointPort(targetNode.getPort());
+ private List<NodeInfoDescription> filterResourceUtilization(List<NodeInfoDescription> possibleTargetNodes){
+ //Currently only checking for free disk space and memory
+ List<NodeInfoDescription> filteredTargetNodes = new ArrayList<>();
+ for(NodeInfoDescription nodeInfo : possibleTargetNodes){
+ Queue<ResourceMetrics> rmHistory = ClusterResourceManager.getResourceMetricsMap()
+ .get(nodeInfo.getNodeControllerId());
+ if(rmHistory == null) return null;
+ Hardware hardware = entityToMigrate.getResourceRequirements().stream()
+ .filter(nodeRR -> nodeRR instanceof Hardware).map(nodeRR -> (Hardware)nodeRR).findFirst().
+ orElse(null);
+ if(hardware != null){
+ if (rmHistory.peek() != null
+ && hardware.getDisk() <= rmHistory.peek().getFreeDiskSpaceInBytes()
+ && hardware.getMemory() <= rmHistory.peek().getFreeMemoryInBytes()) {
+ filteredTargetNodes.add(nodeInfo);
+ }
+ }
+ }
+ return filteredTargetNodes;
+ }
+ private Pipeline generateTargetPipeline(){
Optional<DataProcessorInvocation> originalInvocation =
correspondingPipeline.getSepas().stream().filter(dp ->
dp.getDeploymentRunningInstanceId().equals(entityToMigrate.getDeploymentRunningInstanceId()))