You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@streampipes.apache.org by go...@apache.org on 2021/05/10 21:03:16 UTC

[incubator-streampipes] 05/07: Merge branch 'edge-extensions' of https://github.com/apache/incubator-streampipes into edge-extensions

This is an automated email from the ASF dual-hosted git repository.

gomm pushed a commit to branch edge-extensions
in repository https://gitbox.apache.org/repos/asf/incubator-streampipes.git

commit 388de9106b881e8e2806ea92faa85e9521fa2082
Merge: d27684a e4c427c
Author: Daniel Gomm <da...@outlook.de>
AuthorDate: Thu May 6 16:47:33 2021 +0200

    Merge branch 'edge-extensions' of https://github.com/apache/incubator-streampipes into edge-extensions
    
     Conflicts:
    	streampipes-backend/src/main/java/org/apache/streampipes/backend/StreamPipesBackendApplication.java
    	streampipes-model/src/main/java/org/apache/streampipes/model/node/monitor/ResourceMetrics.java
    	streampipes-model/src/main/java/org/apache/streampipes/model/resource/ResourceMetrics.java
    	streampipes-node-controller/src/main/java/org/apache/streampipes/node/controller/management/resource/ResourceManager.java
    	streampipes-node-controller/src/main/java/org/apache/streampipes/node/controller/management/resource/model/ResourceMetrics.java
    	streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/migration/MigrationPipelineGenerator.java
    	streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/node/StreamPipesClusterManager.java
    	streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/node/management/resources/ClusterResourceManager.java
    	streampipes-rest/src/main/java/org/apache/streampipes/rest/impl/NodeManagementResource.java

 .../runConfigurations/node_controller_edge_01.xml  |   1 +
 .../runConfigurations/node_controller_edge_02.xml  |   2 +-
 pom.xml                                            |   1 +
 streampipes-backend/pom.xml                        |   6 +
 .../backend/StreamPipesBackendApplication.java     |   7 +-
 .../backend/StreamPipesResourceConfig.java         |   2 +-
 .../apache/streampipes/model/NodeHealthStatus.java |  76 ++++++++
 .../model/grounding/TransportProtocol.java         |   1 +
 .../streampipes/model/node/NodeCondition.java      |  22 ++-
 .../model/node/NodeInfoDescription.java            |  23 +++
 .../model/node/NodeInfoDescriptionBuilder.java     |   1 +
 .../model/node/container/DeploymentContainer.java  |  51 ++++-
 .../model/node/container/DockerContainer.java      |   6 +-
 .../node/container/DockerContainerBuilder.java     |  19 +-
 .../node/container/SupportedArchitectures.java     |  30 +--
 .../model/node/container/SupportedOsType.java      |  30 +--
 .../monitor}/ResourceMetrics.java                  |   2 +-
 .../api/ContainerDeploymentResource.java           |  16 +-
 .../node/controller/api/HealthCheckResource.java   |  10 +-
 .../node/controller/config/NodeConfiguration.java  |   1 +
 .../container/DockerExtensionsContainer.java       |   7 +
 .../controller/container/DockerKafkaContainer.java |   7 +
 .../container/DockerMosquittoContainer.java        |   7 +
 .../container/DockerZookeeperContainer.java        |   7 +
 .../management/NodeControllerSubmitter.java        |   4 +-
 .../controller/management/node/NodeConstants.java  |   2 +-
 ...tratorManager.java => DockerEngineManager.java} |  24 +--
 ...inerOrchestrator.java => IContainerEngine.java} |   2 +-
 .../docker/DockerContainerDeclarerSingleton.java   |   2 +-
 .../orchestrator/docker/utils/DockerUtils.java     |   2 +-
 .../management/pe/InvocableElementManager.java     |  12 +-
 .../management/resource/ResourceManager.java       |   5 +
 streampipes-node-management/pom.xml                |  50 +++++
 .../node/management/NodeManagement.java            | 151 +++++++++++++++
 .../management/operation/monitor/NodeMonitor.java  |   9 +-
 .../monitor/health/ClusterHealthCheckMonitor.java  | 189 +++++++++++++++++++
 .../operation/monitor/health/NodeHealthCheck.java  |  64 +++++++
 .../operation/monitor/health/NodeLiveness.java     | 102 ++++++++++
 .../monitor/resource/ClusterResourceMonitor.java   |  65 +++++++
 .../monitor/resource/NodeResourceCollector.java    |  64 +++++++
 .../management/operation/relay/RelayHandler.java   |  60 ++++++
 .../management/operation/relay/RelayOperation.java |   8 +-
 .../operation/sync/SynchronizationFactory.java     |  44 +++++
 .../operation/sync/SynchronizationHandler.java     |  77 ++++++++
 .../operation/sync/SynchronizationType.java        |   9 +-
 .../node/management/utils/HttpRequest.java         |   6 +-
 .../node/management/utils/HttpUtils.java           | 130 +++++++++++++
 .../node/management/utils/StorageUtils.java        |  84 +++++++++
 streampipes-pipeline-management/pom.xml            |   5 +
 .../pipeline/AbstractPipelineExecutor.java         | 163 ++++++++++++++--
 .../pipeline/PipelineMigrationExecutor.java        |   9 +-
 .../manager/matching/InvocationGraphBuilder.java   |  44 ++---
 .../migration/MigrationPipelineGenerator.java      |   3 +-
 .../migration/PipelineElementOffloadHandler.java   |  69 +++++++
 .../manager/node/StreamPipesClusterManager.java    | 207 ---------------------
 .../management/cluster/AbstractClusterManager.java | 162 ----------------
 .../management/cluster/AvailableNodesFetcher.java  |  66 -------
 .../management/healthcheck/ClusterHealthCheck.java |  82 --------
 .../resources/ClusterResourceManager.java          | 109 -----------
 .../streampipes/manager/operations/Operations.java |  11 +-
 streampipes-rest/pom.xml                           |   6 +
 .../rest/api/{INode.java => INodeManagement.java}  |  13 +-
 .../{Node.java => NodeManagementResource.java}     |  53 ++----
 .../streampipes/rest/impl/PipelineResource.java    |  12 ++
 .../storage/couchdb/impl/NodeInfoStorageImpl.java  |   8 +-
 .../apache/streampipes/vocabulary/StreamPipes.java |   4 +-
 .../node-configuration.component.html              |  21 ++-
 .../node-configuration.component.scss              |   4 +
 .../node-configuration.component.ts                |  30 ++-
 .../data-marketplace/data-marketplace.component.ts |   4 +-
 ui/src/app/core-model/gen/streampipes-model.ts     |  16 +-
 .../migrate-pipeline-processors.component.ts       |   2 +-
 .../save-pipeline/save-pipeline.component.ts       |  53 ++++--
 ui/src/app/platform-services/apis/node.service.ts  |   8 +-
 74 files changed, 1810 insertions(+), 854 deletions(-)

diff --cc streampipes-node-controller/src/main/java/org/apache/streampipes/node/controller/management/pe/InvocableElementManager.java
index 1a318d9,3e2ad80..671630c
--- a/streampipes-node-controller/src/main/java/org/apache/streampipes/node/controller/management/pe/InvocableElementManager.java
+++ b/streampipes-node-controller/src/main/java/org/apache/streampipes/node/controller/management/pe/InvocableElementManager.java
@@@ -167,13 -163,16 +167,13 @@@ public class InvocableElementManager im
          return response;
      }
  
 -    public void postOffloadRequest(InvocableStreamPipesEntity instanceToOffload){
 +    public Response postOffloadRequest(InvocableStreamPipesEntity instanceToOffload){
          try {
-             String url = generateBackendOffloadEndpoint();
+             String url = generatePipelineManagementOffloadEndpoint();
              String desc = toJson(instanceToOffload);
 -            org.apache.http.client.fluent.Response resp = Request.Post(url)
 -                    .bodyString(desc, ContentType.APPLICATION_JSON)
 -                    .execute();
 -            resp.returnContent();
 +            return handleResponse(Request.Post(url).bodyString(desc, ContentType.APPLICATION_JSON).execute());
          } catch (IOException e) {
 -            e.printStackTrace();
 +            throw new SpRuntimeException(e);
          }
      }
  
@@@ -259,10 -258,9 +259,10 @@@
      }
  
      private void updateAndSyncNodeInfoDescription(InvocableRegistration registration) {
 -        setSupportedPipelineElements(registration.getSupportedPipelineElementAppIds());
 +        setSupportedPipelineElements(getSupportedEntities(registration).stream()
 +                .map(NamedStreamPipesEntity::getAppId).collect(Collectors.toList()));
          try {
-             String url = generateBackendEndpoint();
+             String url = generateNodeManagementUpdateEndpoint();
              String desc = toJson(getNodeInfoDescription());
              Request.Put(url)
                      .bodyString(desc, ContentType.APPLICATION_JSON)
diff --cc streampipes-node-controller/src/main/java/org/apache/streampipes/node/controller/management/resource/ResourceManager.java
index c3df5b9,737f521..76eabfa
--- a/streampipes-node-controller/src/main/java/org/apache/streampipes/node/controller/management/resource/ResourceManager.java
+++ b/streampipes-node-controller/src/main/java/org/apache/streampipes/node/controller/management/resource/ResourceManager.java
@@@ -20,9 -20,11 +20,14 @@@ package org.apache.streampipes.node.con
  import com.fasterxml.jackson.core.JsonProcessingException;
  import org.apache.streampipes.commons.exceptions.SpRuntimeException;
  import org.apache.streampipes.node.controller.config.NodeConfiguration;
 +import org.apache.streampipes.node.controller.management.offloading.OffloadingPolicyManager;
 +import org.apache.streampipes.node.controller.management.offloading.model.OffloadingStrategyFactory;
 +import org.apache.streampipes.model.resource.ResourceMetrics;
+ import org.apache.streampipes.node.controller.management.offloading.AutoOffloadingManager;
+ import org.apache.streampipes.node.controller.management.offloading.policies.Comparator;
+ import org.apache.streampipes.node.controller.management.offloading.policies.MultiOccurrenceThresholdViolationPolicy;
+ import org.apache.streampipes.node.controller.management.offloading.policies.OffloadingPolicy;
+ import org.apache.streampipes.model.node.monitor.ResourceMetrics;
  import org.apache.streampipes.node.controller.management.resource.utils.DiskSpace;
  import org.apache.streampipes.node.controller.management.resource.utils.ResourceUtils;
  import org.apache.streampipes.serializers.json.JacksonSerializer;
diff --cc streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/migration/MigrationPipelineGenerator.java
index 0545d22,f69a3b9..f670341
--- a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/migration/MigrationPipelineGenerator.java
+++ b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/migration/MigrationPipelineGenerator.java
@@@ -24,95 -22,39 +24,96 @@@ import org.apache.streampipes.model.bas
  import org.apache.streampipes.model.graph.DataProcessorInvocation;
  import org.apache.streampipes.model.node.NodeInfoDescription;
  import org.apache.streampipes.model.pipeline.Pipeline;
 +import org.apache.streampipes.model.resource.Hardware;
 +import org.apache.streampipes.model.resource.ResourceMetrics;
+ import org.apache.streampipes.node.management.NodeManagement;
  
  
 -import java.util.ArrayList;
 -import java.util.List;
 -import java.util.Optional;
 -import java.util.Random;
 +import java.util.*;
 +import java.util.stream.Collectors;
  
  public class MigrationPipelineGenerator {
  
 -    public static Pipeline generateMigrationPipeline(InvocableStreamPipesEntity entityToMigrate, Pipeline correspondingPipeline){
 +    private InvocableStreamPipesEntity entityToMigrate;
 +    private Pipeline correspondingPipeline;
  
 +    public MigrationPipelineGenerator(InvocableStreamPipesEntity entityToMigrate, Pipeline correspondingPipeline){
 +        this.entityToMigrate = entityToMigrate;
 +        this.correspondingPipeline = correspondingPipeline;
 +    }
 +
 +
 +    public Pipeline generateMigrationPipeline(){
 +
 +        List<NodeInfoDescription> possibleTargetNodes = getNodeInfos();
 +
 +        switch(correspondingPipeline.getExecutionPolicy()){
 +            case "custom":
 +                possibleTargetNodes = filterLocationTags(possibleTargetNodes);
 +            case "locality-aware":
 +                //TODO: incorporate strategy for locality-aware deployment
 +            case "default":
 +                //TODO: incorporate strategy for default deployment
 +        }
 +
 +        //Check current resource utilization on node
 +        possibleTargetNodes = filterResourceUtilization(possibleTargetNodes);
 +
 +
 +        //Different strategies possible (atm cancel offloading)
 +        if(possibleTargetNodes == null || possibleTargetNodes.isEmpty())
 +            return null;
 +
 +        //Random Selection of new Node within the remaining possible nodes
 +        changeEntityDescriptionToMatchRandomNode(possibleTargetNodes);
 +
 +        return generateTargetPipeline();
 +    }
 +
 +    private List<NodeInfoDescription> getNodeInfos(){
          List<NodeInfoDescription> possibleTargetNodes = new ArrayList<>();
-         List<NodeInfoDescription> nodeInfo = StreamPipesClusterManager.getAllActiveAndHealthyNodes();
+         List<NodeInfoDescription> nodeInfo = NodeManagement.getInstance().getOnlineNodes();
          nodeInfo.forEach(desc ->{
              if(desc.getSupportedElements().stream().anyMatch(element -> element.equals(entityToMigrate.getAppId()))
 -                && !desc.getNodeControllerId().equals(entityToMigrate.getDeploymentTargetNodeId()))
 +                    && !desc.getNodeControllerId().equals(entityToMigrate.getDeploymentTargetNodeId()))
                  possibleTargetNodes.add(desc);
          });
 +        return possibleTargetNodes;
 +    }
  
 -        if(possibleTargetNodes.isEmpty())
 -            return null;
 +    private List<NodeInfoDescription> filterLocationTags(List<NodeInfoDescription> possibleTargetNodes){
 +        return possibleTargetNodes.stream()
 +                .filter(desc -> nodeTagsContainElementTag(correspondingPipeline.getNodeTags(), desc))
 +                .collect(Collectors.toList());
 +    }
  
 -        //Choose random node; should be adjusted to seek for a proper node to migrate to (e.g. based on user e.g.
 -        // selected labels, locality, free resources,...)
 -        NodeInfoDescription targetNode = possibleTargetNodes.get(new Random().nextInt(possibleTargetNodes.size()));
 +    private boolean nodeTagsContainElementTag(List<String> pipelineNodeTags,
 +                                              NodeInfoDescription desc){
 +        return desc.getStaticNodeMetadata().getLocationTags().stream().anyMatch(pipelineNodeTags::contains);
 +    }
  
 -        entityToMigrate.setDeploymentTargetNodeHostname(targetNode.getHostname());
 -        entityToMigrate.setDeploymentTargetNodeId(targetNode.getNodeControllerId());
 -        entityToMigrate.setDeploymentTargetNodePort(targetNode.getPort());
 -        entityToMigrate.setElementEndpointHostname(targetNode.getHostname());
 -        entityToMigrate.setElementEndpointPort(targetNode.getPort());
 +    private List<NodeInfoDescription> filterResourceUtilization(List<NodeInfoDescription> possibleTargetNodes){
 +        //Currently only checking for free disk space and memory
 +        List<NodeInfoDescription> filteredTargetNodes = new ArrayList<>();
 +        for(NodeInfoDescription nodeInfo : possibleTargetNodes){
 +            Queue<ResourceMetrics> rmHistory = ClusterResourceManager.getResourceMetricsMap()
 +                    .get(nodeInfo.getNodeControllerId());
 +            if(rmHistory == null) return null;
 +            Hardware hardware = entityToMigrate.getResourceRequirements().stream()
 +                            .filter(nodeRR -> nodeRR instanceof Hardware).map(nodeRR -> (Hardware)nodeRR).findFirst().
 +                            orElse(null);
 +            if(hardware != null){
 +                if (rmHistory.peek() != null
 +                        && hardware.getDisk() <= rmHistory.peek().getFreeDiskSpaceInBytes()
 +                        && hardware.getMemory() <= rmHistory.peek().getFreeMemoryInBytes()) {
 +                    filteredTargetNodes.add(nodeInfo);
 +                }
 +            }
 +        }
 +        return filteredTargetNodes;
 +    }
  
 +    private Pipeline generateTargetPipeline(){
          Optional<DataProcessorInvocation> originalInvocation =
                  correspondingPipeline.getSepas().stream().filter(dp ->
                          dp.getDeploymentRunningInstanceId().equals(entityToMigrate.getDeploymentRunningInstanceId()))