You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@streampipes.apache.org by wi...@apache.org on 2021/05/03 15:39:33 UTC

[incubator-streampipes] branch edge-extensions updated (c84435e -> 3b5901b)

This is an automated email from the ASF dual-hosted git repository.

wiener pushed a change to branch edge-extensions
in repository https://gitbox.apache.org/repos/asf/incubator-streampipes.git.


    from c84435e  [WIP] update node resource requirements in model, ui, SDK
     new 8aedda4  [WIP] refactored node management structure and add healtcheck monitoring for nodes
     new 3b5901b  add skeleton for node resource management

The 2 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 pom.xml                                            |   1 +
 streampipes-backend/pom.xml                        |   6 +
 .../backend/StreamPipesBackendApplication.java     |   6 +-
 .../backend/StreamPipesResourceConfig.java         |   2 +-
 .../apache/streampipes/model/NodeHealthStatus.java |  76 ++++++++
 .../model/grounding/TransportProtocol.java         |   1 +
 .../streampipes/model/node/NodeCondition.java      |  22 ++-
 .../model/node/NodeInfoDescription.java            |  23 +++
 .../model/node/NodeInfoDescriptionBuilder.java     |   1 +
 .../model/node/container/DeploymentContainer.java  |  51 ++++-
 .../model/node/container/DockerContainer.java      |   6 +-
 .../node/container/DockerContainerBuilder.java     |  19 +-
 .../SupportedArchitectures.java}                   |  23 ++-
 .../SupportedOsType.java}                          |  23 ++-
 .../model/node/monitor}/ResourceMetrics.java       |   2 +-
 .../api/ContainerDeploymentResource.java           |  16 +-
 .../node/controller/api/HealthCheckResource.java   |  10 +-
 .../node/controller/config/NodeConfiguration.java  |   1 +
 .../container/DockerExtensionsContainer.java       |   7 +
 .../controller/container/DockerKafkaContainer.java |   7 +
 .../container/DockerMosquittoContainer.java        |   7 +
 .../container/DockerZookeeperContainer.java        |   7 +
 .../management/NodeControllerSubmitter.java        |   4 +-
 .../controller/management/node/NodeConstants.java  |   2 +-
 ...tratorManager.java => DockerEngineManager.java} |  24 +--
 ...inerOrchestrator.java => IContainerEngine.java} |   2 +-
 .../docker/DockerContainerDeclarerSingleton.java   |   2 +-
 .../orchestrator/docker/utils/DockerUtils.java     |   2 +-
 .../management/pe/InvocableElementManager.java     |  12 +-
 .../management/resource/ResourceManager.java       |   2 +-
 .../pom.xml                                        |  14 +-
 .../node/management/NodeManagement.java            | 151 +++++++++++++++
 .../management/operation/monitor/NodeMonitor.java  |   8 +-
 .../monitor/health/ClusterHealthCheckMonitor.java  | 189 +++++++++++++++++++
 .../operation/monitor/health/NodeHealthCheck.java  |  64 +++++++
 .../operation/monitor/health/NodeLiveness.java     | 102 ++++++++++
 .../monitor/resource/ClusterResourceMonitor.java   |  65 +++++++
 .../monitor/resource/NodeResourceCollector.java    |  64 +++++++
 .../management/operation/relay/RelayHandler.java   |  60 ++++++
 .../management/operation/relay/RelayOperation.java |  12 +-
 .../operation/sync/SynchronizationFactory.java     |  44 +++++
 .../operation/sync/SynchronizationHandler.java     |  77 ++++++++
 .../operation/sync/SynchronizationType.java        |  10 +-
 .../node/management/utils/HttpRequest.java         |   7 +-
 .../node/management/utils/HttpUtils.java           | 130 +++++++++++++
 .../node/management/utils/StorageUtils.java        |  84 +++++++++
 streampipes-pipeline-management/pom.xml            |   5 +
 .../pipeline/AbstractPipelineExecutor.java         | 163 ++++++++++++++--
 .../pipeline/PipelineMigrationExecutor.java        |   9 +-
 .../manager/matching/InvocationGraphBuilder.java   |  44 ++---
 .../migration/MigrationPipelineGenerator.java      |   4 +-
 .../migration/PipelineElementOffloadHandler.java   |  69 +++++++
 .../manager/node/StreamPipesClusterManager.java    | 208 ---------------------
 .../management/cluster/AbstractClusterManager.java | 162 ----------------
 .../management/cluster/AvailableNodesFetcher.java  |  66 -------
 .../node/management/cluster/NodeSyncOptions.java   |  22 ---
 .../management/healthcheck/ClusterHealthCheck.java |  82 --------
 .../resources/ClusterResourceManager.java          |  96 ----------
 .../streampipes/manager/operations/Operations.java |  11 +-
 streampipes-rest/pom.xml                           |   6 +
 .../rest/api/{INode.java => INodeManagement.java}  |  13 +-
 .../{Node.java => NodeManagementResource.java}     |  57 ++----
 .../streampipes/rest/impl/PipelineResource.java    |  12 ++
 .../storage/couchdb/impl/NodeInfoStorageImpl.java  |   8 +-
 .../apache/streampipes/vocabulary/StreamPipes.java |   4 +-
 .../node-configuration.component.html              |  21 ++-
 .../node-configuration.component.scss              |   4 +
 .../node-configuration.component.ts                |  30 ++-
 .../data-marketplace/data-marketplace.component.ts |   4 +-
 ui/src/app/core-model/gen/streampipes-model.ts     |  16 +-
 .../migrate-pipeline-processors.component.ts       |   2 +-
 .../save-pipeline/save-pipeline.component.ts       |  53 ++++--
 ui/src/app/platform-services/apis/node.service.ts  |   8 +-
 73 files changed, 1742 insertions(+), 885 deletions(-)
 create mode 100644 streampipes-model/src/main/java/org/apache/streampipes/model/NodeHealthStatus.java
 copy streampipes-connect/src/main/java/org/apache/streampipes/connect/adapter/exception/AdapterException.java => streampipes-model/src/main/java/org/apache/streampipes/model/node/NodeCondition.java (66%)
 copy streampipes-model/src/main/java/org/apache/streampipes/model/node/{resources/software/DockerContainerRuntime.java => container/SupportedArchitectures.java} (60%)
 copy streampipes-model/src/main/java/org/apache/streampipes/model/node/{resources/software/DockerContainerRuntime.java => container/SupportedOsType.java} (62%)
 rename {streampipes-node-controller/src/main/java/org/apache/streampipes/node/controller/management/resource/model => streampipes-model/src/main/java/org/apache/streampipes/model/node/monitor}/ResourceMetrics.java (97%)
 rename streampipes-node-controller/src/main/java/org/apache/streampipes/node/controller/management/orchestrator/{DockerOrchestratorManager.java => DockerEngineManager.java} (91%)
 rename streampipes-node-controller/src/main/java/org/apache/streampipes/node/controller/management/orchestrator/{IContainerOrchestrator.java => IContainerEngine.java} (96%)
 copy {streampipes-data-explorer => streampipes-node-management}/pom.xml (87%)
 create mode 100644 streampipes-node-management/src/main/java/org/apache/streampipes/node/management/NodeManagement.java
 copy streampipes-client/src/main/java/org/apache/streampipes/client/api/SupportsPipelineApi.java => streampipes-node-management/src/main/java/org/apache/streampipes/node/management/operation/monitor/NodeMonitor.java (80%)
 create mode 100644 streampipes-node-management/src/main/java/org/apache/streampipes/node/management/operation/monitor/health/ClusterHealthCheckMonitor.java
 create mode 100644 streampipes-node-management/src/main/java/org/apache/streampipes/node/management/operation/monitor/health/NodeHealthCheck.java
 create mode 100644 streampipes-node-management/src/main/java/org/apache/streampipes/node/management/operation/monitor/health/NodeLiveness.java
 create mode 100644 streampipes-node-management/src/main/java/org/apache/streampipes/node/management/operation/monitor/resource/ClusterResourceMonitor.java
 create mode 100644 streampipes-node-management/src/main/java/org/apache/streampipes/node/management/operation/monitor/resource/NodeResourceCollector.java
 create mode 100644 streampipes-node-management/src/main/java/org/apache/streampipes/node/management/operation/relay/RelayHandler.java
 copy streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/setup/InstallationStep.java => streampipes-node-management/src/main/java/org/apache/streampipes/node/management/operation/relay/RelayOperation.java (84%)
 create mode 100644 streampipes-node-management/src/main/java/org/apache/streampipes/node/management/operation/sync/SynchronizationFactory.java
 create mode 100644 streampipes-node-management/src/main/java/org/apache/streampipes/node/management/operation/sync/SynchronizationHandler.java
 copy streampipes-client/src/main/java/org/apache/streampipes/client/api/SupportsPipelineApi.java => streampipes-node-management/src/main/java/org/apache/streampipes/node/management/operation/sync/SynchronizationType.java (82%)
 copy streampipes-model-client/src/main/java/org/apache/streampipes/model/client/deployment/OutputType.java => streampipes-node-management/src/main/java/org/apache/streampipes/node/management/utils/HttpRequest.java (89%)
 create mode 100644 streampipes-node-management/src/main/java/org/apache/streampipes/node/management/utils/HttpUtils.java
 create mode 100644 streampipes-node-management/src/main/java/org/apache/streampipes/node/management/utils/StorageUtils.java
 create mode 100644 streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/migration/PipelineElementOffloadHandler.java
 delete mode 100644 streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/node/StreamPipesClusterManager.java
 delete mode 100644 streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/node/management/cluster/AbstractClusterManager.java
 delete mode 100644 streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/node/management/cluster/AvailableNodesFetcher.java
 delete mode 100644 streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/node/management/cluster/NodeSyncOptions.java
 delete mode 100644 streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/node/management/healthcheck/ClusterHealthCheck.java
 delete mode 100644 streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/node/management/resources/ClusterResourceManager.java
 rename streampipes-rest/src/main/java/org/apache/streampipes/rest/api/{INode.java => INodeManagement.java} (88%)
 rename streampipes-rest/src/main/java/org/apache/streampipes/rest/impl/{Node.java => NodeManagementResource.java} (60%)

[incubator-streampipes] 01/02: [WIP] refactored node management structure and add healtcheck monitoring for nodes

Posted by wi...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

wiener pushed a commit to branch edge-extensions
in repository https://gitbox.apache.org/repos/asf/incubator-streampipes.git

commit 8aedda4af4804eace68a8b5ba82dc9d445f2cda2
Author: Patrick Wiener <wi...@fzi.de>
AuthorDate: Mon May 3 16:53:43 2021 +0200

    [WIP] refactored node management structure and add healtcheck monitoring for nodes
---
 pom.xml                                            |   1 +
 streampipes-backend/pom.xml                        |   6 +
 .../backend/StreamPipesBackendApplication.java     |   5 +-
 .../backend/StreamPipesResourceConfig.java         |   2 +-
 .../apache/streampipes/model/NodeHealthStatus.java |  76 ++++++++
 .../model/grounding/TransportProtocol.java         |   1 +
 .../streampipes/model/node/NodeCondition.java      |  22 ++-
 .../model/node/NodeInfoDescription.java            |  23 +++
 .../model/node/NodeInfoDescriptionBuilder.java     |   1 +
 .../model/node/container/DeploymentContainer.java  |  51 ++++-
 .../model/node/container/DockerContainer.java      |   6 +-
 .../node/container/DockerContainerBuilder.java     |  19 +-
 .../node/container/SupportedArchitectures.java     |  30 +--
 .../model/node/container/SupportedOsType.java      |  30 +--
 .../api/ContainerDeploymentResource.java           |  16 +-
 .../node/controller/api/HealthCheckResource.java   |  10 +-
 .../api/NodeInfoDescriptionResource.java           |   1 +
 .../node/controller/config/NodeConfiguration.java  |   1 +
 .../container/DockerExtensionsContainer.java       |   7 +
 .../controller/container/DockerKafkaContainer.java |   7 +
 .../container/DockerMosquittoContainer.java        |   7 +
 .../container/DockerZookeeperContainer.java        |   7 +
 .../management/NodeControllerSubmitter.java        |   4 +-
 .../controller/management/node/NodeConstants.java  |   2 +-
 ...tratorManager.java => DockerEngineManager.java} |  24 +--
 ...inerOrchestrator.java => IContainerEngine.java} |   2 +-
 .../docker/DockerContainerDeclarerSingleton.java   |   2 +-
 .../orchestrator/docker/utils/DockerUtils.java     |   2 +-
 .../management/pe/InvocableElementManager.java     |  12 +-
 streampipes-node-management/pom.xml                |  50 +++++
 .../node/management/NodeManagement.java            | 126 +++++++++++++
 .../monitor/health/ClusterHealthCheckMonitor.java  | 189 +++++++++++++++++++
 .../operation/monitor/health/HealthCheck.java      |  11 +-
 .../operation/monitor/health/NodeHealthCheck.java  |  63 +++++++
 .../operation/monitor/health/NodeLiveness.java     | 102 ++++++++++
 .../monitor/resources/ClusterResourceMonitor.java  |  26 +--
 .../management/operation/relay/RelayHandler.java   |  60 ++++++
 .../management/operation/relay/RelayOperation.java |   8 +-
 .../operation/sync/SynchronizationFactory.java     |  44 +++++
 .../operation/sync/SynchronizationHandler.java     |  77 ++++++++
 .../operation/sync/SynchronizationType.java        |   9 +-
 .../node/management/utils/HttpRequest.java         |   6 +-
 .../node/management/utils/HttpUtils.java           | 130 +++++++++++++
 .../node/management/utils/StorageUtils.java        |  84 +++++++++
 streampipes-pipeline-management/pom.xml            |   5 +
 .../pipeline/AbstractPipelineExecutor.java         | 163 ++++++++++++++--
 .../pipeline/PipelineMigrationExecutor.java        |   9 +-
 .../manager/matching/InvocationGraphBuilder.java   |  44 ++---
 .../migration/MigrationPipelineGenerator.java      |   4 +-
 .../migration/PipelineElementOffloadHandler.java   |  69 +++++++
 .../manager/node/StreamPipesClusterManager.java    | 208 ---------------------
 .../management/cluster/AbstractClusterManager.java | 162 ----------------
 .../management/cluster/AvailableNodesFetcher.java  |  66 -------
 .../management/healthcheck/ClusterHealthCheck.java |  82 --------
 .../streampipes/manager/operations/Operations.java |  11 +-
 streampipes-rest/pom.xml                           |   6 +
 .../rest/api/{INode.java => INodeManagement.java}  |  13 +-
 .../{Node.java => NodeManagementResource.java}     |  57 ++----
 .../streampipes/rest/impl/PipelineResource.java    |  12 ++
 .../storage/couchdb/impl/NodeInfoStorageImpl.java  |   8 +-
 .../apache/streampipes/vocabulary/StreamPipes.java |   4 +-
 .../node-configuration.component.html              |  21 ++-
 .../node-configuration.component.scss              |   4 +
 .../node-configuration.component.ts                |  30 ++-
 .../data-marketplace/data-marketplace.component.ts |   4 +-
 ui/src/app/core-model/gen/streampipes-model.ts     |  16 +-
 .../migrate-pipeline-processors.component.ts       |   2 +-
 .../save-pipeline/save-pipeline.component.ts       |  53 ++++--
 ui/src/app/platform-services/apis/node.service.ts  |   8 +-
 69 files changed, 1659 insertions(+), 764 deletions(-)

diff --git a/pom.xml b/pom.xml
index 2cfd0a7..0b6ce77 100644
--- a/pom.xml
+++ b/pom.xml
@@ -946,6 +946,7 @@
         <module>streampipes-wrapper-siddhi</module>
         <module>streampipes-wrapper-spark</module>
         <module>streampipes-wrapper-standalone</module>
+        <module>streampipes-node-management</module>
     </modules>
 
     <profiles>
diff --git a/streampipes-backend/pom.xml b/streampipes-backend/pom.xml
index c5c7a56..96f34b2 100644
--- a/streampipes-backend/pom.xml
+++ b/streampipes-backend/pom.xml
@@ -93,6 +93,12 @@
             <groupId>com.fasterxml.jackson.dataformat</groupId>
             <artifactId>jackson-dataformat-yaml</artifactId>
         </dependency>
+        <dependency>
+            <groupId>org.apache.streampipes</groupId>
+            <artifactId>streampipes-node-management</artifactId>
+            <version>0.68.0-SNAPSHOT</version>
+            <scope>compile</scope>
+        </dependency>
     </dependencies>
 
     <build>
diff --git a/streampipes-backend/src/main/java/org/apache/streampipes/backend/StreamPipesBackendApplication.java b/streampipes-backend/src/main/java/org/apache/streampipes/backend/StreamPipesBackendApplication.java
index 899eb8d..c5ca79b 100644
--- a/streampipes-backend/src/main/java/org/apache/streampipes/backend/StreamPipesBackendApplication.java
+++ b/streampipes-backend/src/main/java/org/apache/streampipes/backend/StreamPipesBackendApplication.java
@@ -23,6 +23,7 @@ import org.apache.shiro.web.servlet.ShiroFilter;
 import org.apache.streampipes.manager.operations.Operations;
 import org.apache.streampipes.model.pipeline.Pipeline;
 import org.apache.streampipes.model.pipeline.PipelineOperationStatus;
+import org.apache.streampipes.node.management.operation.monitor.health.ClusterHealthCheckMonitor;
 import org.apache.streampipes.rest.notifications.NotificationListener;
 import org.apache.streampipes.storage.management.StorageDispatcher;
 import org.slf4j.Logger;
@@ -58,8 +59,10 @@ public class StreamPipesBackendApplication {
   @PostConstruct
   public void init() {
     ScheduledExecutorService executorService = Executors.newSingleThreadScheduledExecutor();
-
     executorService.schedule(this::startAllPreviouslyStoppedPipelines, 5, TimeUnit.SECONDS);
+
+    LOG.info("Starting StreamPipes cluster monitor...");
+    ClusterHealthCheckMonitor.getInstance().run();
   }
 
   @PreDestroy
diff --git a/streampipes-backend/src/main/java/org/apache/streampipes/backend/StreamPipesResourceConfig.java b/streampipes-backend/src/main/java/org/apache/streampipes/backend/StreamPipesResourceConfig.java
index 46a3c62..96cfa82 100644
--- a/streampipes-backend/src/main/java/org/apache/streampipes/backend/StreamPipesResourceConfig.java
+++ b/streampipes-backend/src/main/java/org/apache/streampipes/backend/StreamPipesResourceConfig.java
@@ -101,7 +101,7 @@ public class StreamPipesResourceConfig extends ResourceConfig {
     register(VirtualSensor.class);
     register(Visualization.class);
     register(VisualizablePipeline.class);
-    register(Node.class);
+    register(NodeManagementResource.class);
 
 
     // Serializers
diff --git a/streampipes-model/src/main/java/org/apache/streampipes/model/NodeHealthStatus.java b/streampipes-model/src/main/java/org/apache/streampipes/model/NodeHealthStatus.java
new file mode 100644
index 0000000..123292e
--- /dev/null
+++ b/streampipes-model/src/main/java/org/apache/streampipes/model/NodeHealthStatus.java
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.apache.streampipes.model;
+
+public class NodeHealthStatus {
+
+    private String nodeControllerId;
+    private Boolean success;
+    private long timestamp;
+    private String optionalMessage;
+
+    public NodeHealthStatus() {
+    }
+
+    public NodeHealthStatus(Boolean success) {
+        this.success = success;
+    }
+
+    public NodeHealthStatus(String nodeControllerId, Boolean success, long timestamp, String optionalMessage) {
+        this.nodeControllerId = nodeControllerId;
+        this.success = success;
+        this.timestamp = timestamp;
+        this.optionalMessage = optionalMessage;
+    }
+
+    public String getNodeControllerId() {
+        return nodeControllerId;
+    }
+
+    public void setNodeControllerId(String nodeControllerId) {
+        this.nodeControllerId = nodeControllerId;
+    }
+
+    public Boolean getSuccess() {
+        return success;
+    }
+
+    public Boolean isSuccess() {
+        return success;
+    }
+
+    public void setSuccess(Boolean success) {
+        this.success = success;
+    }
+
+    public long getTimestamp() {
+        return timestamp;
+    }
+
+    public void setTimestamp(long timestamp) {
+        this.timestamp = timestamp;
+    }
+
+    public String getOptionalMessage() {
+        return optionalMessage;
+    }
+
+    public void setOptionalMessage(String optionalMessage) {
+        this.optionalMessage = optionalMessage;
+    }
+}
diff --git a/streampipes-model/src/main/java/org/apache/streampipes/model/grounding/TransportProtocol.java b/streampipes-model/src/main/java/org/apache/streampipes/model/grounding/TransportProtocol.java
index 8d89064..ed1f4d4 100644
--- a/streampipes-model/src/main/java/org/apache/streampipes/model/grounding/TransportProtocol.java
+++ b/streampipes-model/src/main/java/org/apache/streampipes/model/grounding/TransportProtocol.java
@@ -26,6 +26,7 @@ import org.apache.streampipes.model.util.Cloner;
 import org.apache.streampipes.vocabulary.StreamPipes;
 
 import javax.persistence.*;
+import java.util.Objects;
 
 @RdfsClass(StreamPipes.TRANSPORT_PROTOCOL)
 @Entity
diff --git a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/node/management/cluster/NodeSyncOptions.java b/streampipes-model/src/main/java/org/apache/streampipes/model/node/NodeCondition.java
similarity index 65%
copy from streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/node/management/cluster/NodeSyncOptions.java
copy to streampipes-model/src/main/java/org/apache/streampipes/model/node/NodeCondition.java
index ae67076..9c4cc97 100644
--- a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/node/management/cluster/NodeSyncOptions.java
+++ b/streampipes-model/src/main/java/org/apache/streampipes/model/node/NodeCondition.java
@@ -15,8 +15,24 @@
  * limitations under the License.
  *
  */
-package org.apache.streampipes.manager.node.management.cluster;
+package org.apache.streampipes.model.node;
 
-public enum NodeSyncOptions {
-    ACTIVATE_NODE, DEACTIVATE_NODE, UPDATE_NODE,RESTART_RELAYS, HEALTHY;
+public enum NodeCondition {
+    ACTIVE("active"),
+    INACTIVE("inactive"),
+    ONLINE("online"),
+    OFFLINE("offline"),
+    CREATED("created"),
+    CONNECTING("connecting"),
+    PAUSED("paused");
+
+    private final String condition;
+
+    NodeCondition(String condition) {
+        this.condition = condition;
+    }
+
+    public String getCondition() {
+        return condition;
+    }
 }
diff --git a/streampipes-model/src/main/java/org/apache/streampipes/model/node/NodeInfoDescription.java b/streampipes-model/src/main/java/org/apache/streampipes/model/node/NodeInfoDescription.java
index 182abe6..8cf0547 100644
--- a/streampipes-model/src/main/java/org/apache/streampipes/model/node/NodeInfoDescription.java
+++ b/streampipes-model/src/main/java/org/apache/streampipes/model/node/NodeInfoDescription.java
@@ -21,6 +21,7 @@ import com.fasterxml.jackson.annotation.JsonProperty;
 import com.google.gson.annotations.SerializedName;
 import io.fogsy.empire.annotations.RdfProperty;
 import io.fogsy.empire.annotations.RdfsClass;
+import org.apache.streampipes.model.NodeHealthStatus;
 import org.apache.streampipes.model.base.UnnamedStreamPipesEntity;
 import org.apache.streampipes.model.node.container.DeploymentContainer;
 import org.apache.streampipes.model.node.meta.StaticNodeMetadata;
@@ -49,6 +50,12 @@ public class NodeInfoDescription extends UnnamedStreamPipesEntity {
     @OneToOne(cascade = CascadeType.ALL)
     private boolean active;
 
+    @OneToOne(cascade = CascadeType.ALL)
+    private NodeCondition condition;
+
+    @OneToOne(cascade = CascadeType.ALL)
+    private long lastHeartBeatTime;
+
     @RdfProperty(StreamPipes.DEPLOYMENT_TARGET_NODE_ID)
     private String nodeControllerId;
 
@@ -185,4 +192,20 @@ public class NodeInfoDescription extends UnnamedStreamPipesEntity {
     public void setSupportedElements(List<String> supportedElements) {
         this.supportedElements = supportedElements;
     }
+
+    public NodeCondition getCondition() {
+        return condition;
+    }
+
+    public void setCondition(NodeCondition condition) {
+        this.condition = condition;
+    }
+
+    public long getLastHeartBeatTime() {
+        return lastHeartBeatTime;
+    }
+
+    public void setLastHeartBeatTime(long lastHeartBeatTime) {
+        this.lastHeartBeatTime = lastHeartBeatTime;
+    }
 }
diff --git a/streampipes-model/src/main/java/org/apache/streampipes/model/node/NodeInfoDescriptionBuilder.java b/streampipes-model/src/main/java/org/apache/streampipes/model/node/NodeInfoDescriptionBuilder.java
index d1ba465..4e7e810 100644
--- a/streampipes-model/src/main/java/org/apache/streampipes/model/node/NodeInfoDescriptionBuilder.java
+++ b/streampipes-model/src/main/java/org/apache/streampipes/model/node/NodeInfoDescriptionBuilder.java
@@ -139,6 +139,7 @@ public class NodeInfoDescriptionBuilder {
         nodeInfoDescription.setNodeResources(nodeResources);
         nodeInfoDescription.setSupportedElements(supportedElements);
         nodeInfoDescription.setActive(true);
+        nodeInfoDescription.setCondition(NodeCondition.CREATED);
         return nodeInfoDescription;
     }
 }
diff --git a/streampipes-model/src/main/java/org/apache/streampipes/model/node/container/DeploymentContainer.java b/streampipes-model/src/main/java/org/apache/streampipes/model/node/container/DeploymentContainer.java
index dad53ad..ec90467 100644
--- a/streampipes-model/src/main/java/org/apache/streampipes/model/node/container/DeploymentContainer.java
+++ b/streampipes-model/src/main/java/org/apache/streampipes/model/node/container/DeploymentContainer.java
@@ -38,8 +38,8 @@ import java.util.Map;
 @TsModel
 public abstract class DeploymentContainer extends UnnamedStreamPipesEntity {
 
-    @RdfProperty(StreamPipes.DEPLOYMENT_CONTAINER_IMAGE_URI)
-    private String imageUri;
+    @RdfProperty(StreamPipes.DEPLOYMENT_CONTAINER_IMAGE_TAG)
+    private String imageTag;
 
     @RdfProperty(StreamPipes.DEPLOYMENT_CONTAINER_NAME)
     private String containerName;
@@ -67,6 +67,17 @@ public abstract class DeploymentContainer extends UnnamedStreamPipesEntity {
 
     @OneToMany(fetch = FetchType.EAGER,
             cascade = {CascadeType.ALL})
+    @RdfProperty(StreamPipes.DEPLOYMENT_SUPPORTED_ARCHITECTURES)
+    private List<String> supportedArchitectures;
+
+    @OneToMany(fetch = FetchType.EAGER,
+            cascade = {CascadeType.ALL})
+    @RdfProperty(StreamPipes.DEPLOYMENT_SUPPORTED_OS_TYPES)
+    private List<String> supportedOperatingSystemTypes;
+
+
+    @OneToMany(fetch = FetchType.EAGER,
+            cascade = {CascadeType.ALL})
     @RdfProperty(StreamPipes.DEPLOYMENT_CONTAINER_DEPENDENCIES)
     private List<String> dependsOnContainers;
 
@@ -75,6 +86,8 @@ public abstract class DeploymentContainer extends UnnamedStreamPipesEntity {
         this.labels = new HashMap<>();
         this.volumes = new ArrayList<>();
         this.dependsOnContainers = new ArrayList<>();
+        this.supportedArchitectures = new ArrayList<>();
+        this.supportedOperatingSystemTypes = new ArrayList<>();
     }
 
     public DeploymentContainer(String elementId) {
@@ -83,30 +96,35 @@ public abstract class DeploymentContainer extends UnnamedStreamPipesEntity {
         this.labels = new HashMap<>();
         this.volumes = new ArrayList<>();
         this.dependsOnContainers = new ArrayList<>();
+        this.supportedArchitectures = new ArrayList<>();
+        this.supportedOperatingSystemTypes = new ArrayList<>();
     }
 
     public DeploymentContainer(DeploymentContainer other) {
         super(other);
     }
 
-    public DeploymentContainer(String imageUri, String containerName, String serviceId, String[] containerPorts,
+    public DeploymentContainer(String imageTag, String containerName, String serviceId, String[] containerPorts,
                                List<String> envVars, Map<String, String> labels, List<String> volumes,
+                               List<String> supportedArchitectures, List<String> supportedOperatingSystemsTypes,
                                List<String> dependsOnContainers) {
-        this.imageUri = imageUri;
+        this.imageTag = imageTag;
         this.containerName = containerName;
         this.serviceId = serviceId;
         this.containerPorts = containerPorts;
         this.envVars = envVars;
         this.labels = labels;
         this.volumes = volumes;
+        this.supportedArchitectures = supportedArchitectures;
+        this.supportedOperatingSystemTypes = supportedOperatingSystemsTypes;
         this.dependsOnContainers = dependsOnContainers;
     }
 
-    public String getImageUri() {
-        return imageUri;
+    public String getImageTag() {
+        return imageTag;
     }
-    public void setImageUri(String imageUri) {
-        this.imageUri = imageUri;
+    public void setImageTag(String imageTag) {
+        this.imageTag = imageTag;
     }
 
     public String getContainerName() {
@@ -164,4 +182,21 @@ public abstract class DeploymentContainer extends UnnamedStreamPipesEntity {
     public void setDependsOnContainers(List<String> dependsOnContainers) {
         this.dependsOnContainers = dependsOnContainers;
     }
+
+    public List<String> getSupportedArchitectures() {
+        return supportedArchitectures;
+    }
+
+    public void setSupportedArchitectures(List<String> supportedArchitectures) {
+        this.supportedArchitectures = supportedArchitectures;
+    }
+
+    public List<String> getSupportedOperatingSystemTypes() {
+        return supportedOperatingSystemTypes;
+    }
+
+    public void setSupportedOperatingSystemTypes(List<String> supportedOperatingSystemTypes) {
+        this.supportedOperatingSystemTypes = supportedOperatingSystemTypes;
+    }
+
 }
diff --git a/streampipes-model/src/main/java/org/apache/streampipes/model/node/container/DockerContainer.java b/streampipes-model/src/main/java/org/apache/streampipes/model/node/container/DockerContainer.java
index f66e67e..68c7182 100644
--- a/streampipes-model/src/main/java/org/apache/streampipes/model/node/container/DockerContainer.java
+++ b/streampipes-model/src/main/java/org/apache/streampipes/model/node/container/DockerContainer.java
@@ -38,9 +38,11 @@ public class DockerContainer extends DeploymentContainer {
         super();
     }
 
-    public DockerContainer(String imageURI, String containerName, String serviceId, String[] containerPorts,
+    public DockerContainer(String imageTag, String containerName, String serviceId, String[] containerPorts,
                            List<String> envVars, Map<String, String> labels, List<String> volumes,
+                           List<String> supportedArchitectures, List<String> supportedOperatingSystemTypes,
                            List<String> dependsOnContainers) {
-        super(imageURI, containerName, serviceId, containerPorts, envVars, labels, volumes, dependsOnContainers);
+        super(imageTag, containerName, serviceId, containerPorts, envVars, labels, volumes, supportedArchitectures,
+                supportedOperatingSystemTypes, dependsOnContainers);
     }
 }
diff --git a/streampipes-model/src/main/java/org/apache/streampipes/model/node/container/DockerContainerBuilder.java b/streampipes-model/src/main/java/org/apache/streampipes/model/node/container/DockerContainerBuilder.java
index 349df93..86a59b6 100644
--- a/streampipes-model/src/main/java/org/apache/streampipes/model/node/container/DockerContainerBuilder.java
+++ b/streampipes-model/src/main/java/org/apache/streampipes/model/node/container/DockerContainerBuilder.java
@@ -30,8 +30,8 @@ public class DockerContainerBuilder {
         return new DockerContainerBuilder(id);
     }
 
-    public DockerContainerBuilder withImage(String imageUri) {
-        this.dockerContainer.setImageUri(imageUri);
+    public DockerContainerBuilder withImage(String imageTag) {
+        this.dockerContainer.setImageTag(imageTag);
         return this;
     }
 
@@ -65,6 +65,21 @@ public class DockerContainerBuilder {
         return this;
     }
 
+    public DockerContainerBuilder supportedArchitectures(List<String> supportedArchitectures) {
+        this.dockerContainer.setSupportedArchitectures(supportedArchitectures);
+        return this;
+    }
+
+    public DockerContainerBuilder supportedArchitectures(String ... architectures) {
+        this.dockerContainer.setSupportedArchitectures(Arrays.asList(architectures));
+        return this;
+    }
+
+    public DockerContainerBuilder supportedOperatingSystemTypes(String ... operatingSystems) {
+        this.dockerContainer.setSupportedOperatingSystemTypes(Arrays.asList(operatingSystems));
+        return this;
+    }
+
     public DockerContainer build() {
         return dockerContainer;
     }
diff --git a/streampipes-node-controller/src/main/java/org/apache/streampipes/node/controller/api/HealthCheckResource.java b/streampipes-model/src/main/java/org/apache/streampipes/model/node/container/SupportedArchitectures.java
similarity index 56%
copy from streampipes-node-controller/src/main/java/org/apache/streampipes/node/controller/api/HealthCheckResource.java
copy to streampipes-model/src/main/java/org/apache/streampipes/model/node/container/SupportedArchitectures.java
index f4ac429..0d7aa04 100644
--- a/streampipes-node-controller/src/main/java/org/apache/streampipes/node/controller/api/HealthCheckResource.java
+++ b/streampipes-model/src/main/java/org/apache/streampipes/model/node/container/SupportedArchitectures.java
@@ -15,22 +15,26 @@
  * limitations under the License.
  *
  */
-package org.apache.streampipes.node.controller.api;
+package org.apache.streampipes.model.node.container;
 
-import org.apache.streampipes.node.controller.config.NodeConfiguration;
+import org.apache.streampipes.vocabulary.StreamPipes;
 
-import javax.ws.rs.GET;
-import javax.ws.rs.Path;
-import javax.ws.rs.Produces;
-import javax.ws.rs.core.MediaType;
-import javax.ws.rs.core.Response;
+public class SupportedArchitectures {
 
-@Path("/healthy")
-public class HealthCheckResource extends AbstractResource {
+    private static final String ARCHITECTURE_NAMESPACE = StreamPipes.NS + "architecture#";
+    public static final String AMD = ARCHITECTURE_NAMESPACE + "x86_64";
+    public static final String ARM32 = ARCHITECTURE_NAMESPACE + "arm32";
+    public static final String ARM64 = ARCHITECTURE_NAMESPACE + "aarch64";
 
-    @GET
-    @Produces(MediaType.APPLICATION_JSON)
-    public Response getHealth() {
-        return ok(String.format("PONG: %s", NodeConfiguration.getNodeControllerId()));
+    public static String amd(){
+        return AMD;
+    }
+
+    public static String arm32v7(){
+        return ARM32;
+    }
+
+    public static String arm64v8(){
+        return ARM64;
     }
 }
diff --git a/streampipes-node-controller/src/main/java/org/apache/streampipes/node/controller/api/HealthCheckResource.java b/streampipes-model/src/main/java/org/apache/streampipes/model/node/container/SupportedOsType.java
similarity index 57%
copy from streampipes-node-controller/src/main/java/org/apache/streampipes/node/controller/api/HealthCheckResource.java
copy to streampipes-model/src/main/java/org/apache/streampipes/model/node/container/SupportedOsType.java
index f4ac429..196d820 100644
--- a/streampipes-node-controller/src/main/java/org/apache/streampipes/node/controller/api/HealthCheckResource.java
+++ b/streampipes-model/src/main/java/org/apache/streampipes/model/node/container/SupportedOsType.java
@@ -15,22 +15,26 @@
  * limitations under the License.
  *
  */
-package org.apache.streampipes.node.controller.api;
+package org.apache.streampipes.model.node.container;
 
-import org.apache.streampipes.node.controller.config.NodeConfiguration;
+import org.apache.streampipes.vocabulary.StreamPipes;
 
-import javax.ws.rs.GET;
-import javax.ws.rs.Path;
-import javax.ws.rs.Produces;
-import javax.ws.rs.core.MediaType;
-import javax.ws.rs.core.Response;
+public class SupportedOsType {
 
-@Path("/healthy")
-public class HealthCheckResource extends AbstractResource {
+    private static final String OS_NAMESPACE = StreamPipes.NS + "ostype#";
+    public static final String WINDOWS = OS_NAMESPACE + "windows";
+    public static final String LINUX = OS_NAMESPACE + "linux";
+    public static final String DARWIN = OS_NAMESPACE + "darwin";
 
-    @GET
-    @Produces(MediaType.APPLICATION_JSON)
-    public Response getHealth() {
-        return ok(String.format("PONG: %s", NodeConfiguration.getNodeControllerId()));
+    public static String windows(){
+        return WINDOWS;
+    }
+
+    public static String linux(){
+        return LINUX;
+    }
+
+    public static String darwin(){
+        return DARWIN;
     }
 }
diff --git a/streampipes-node-controller/src/main/java/org/apache/streampipes/node/controller/api/ContainerDeploymentResource.java b/streampipes-node-controller/src/main/java/org/apache/streampipes/node/controller/api/ContainerDeploymentResource.java
index b7a980d..cc20cb0 100644
--- a/streampipes-node-controller/src/main/java/org/apache/streampipes/node/controller/api/ContainerDeploymentResource.java
+++ b/streampipes-node-controller/src/main/java/org/apache/streampipes/node/controller/api/ContainerDeploymentResource.java
@@ -19,9 +19,10 @@ package org.apache.streampipes.node.controller.api;
 
 import org.apache.streampipes.model.node.container.DockerContainer;
 import org.apache.streampipes.node.controller.management.node.NodeManager;
+import org.apache.streampipes.node.controller.management.orchestrator.docker.DockerContainerDeclarerSingleton;
 import org.apache.streampipes.node.controller.management.orchestrator.status.ContainerDeploymentStatus;
 import org.apache.streampipes.node.controller.management.orchestrator.docker.model.ContainerStatus;
-import org.apache.streampipes.node.controller.management.orchestrator.DockerOrchestratorManager;
+import org.apache.streampipes.node.controller.management.orchestrator.DockerEngineManager;
 import org.apache.streampipes.node.controller.management.pe.InvocableElementManager;
 
 import javax.ws.rs.*;
@@ -33,14 +34,21 @@ public class ContainerDeploymentResource extends AbstractResource {
     @GET
     @Produces(MediaType.APPLICATION_JSON)
     public javax.ws.rs.core.Response getPipelineElementContainer(){
-        return ok(DockerOrchestratorManager.getInstance().list());
+        return ok(DockerEngineManager.getInstance().list());
+    }
+
+    @GET
+    @Path("/registered")
+    @Produces(MediaType.APPLICATION_JSON)
+    public javax.ws.rs.core.Response getAllRegisteredContainer(){
+        return ok(DockerContainerDeclarerSingleton.getInstance().getAllDockerContainerAsList());
     }
 
     @POST
     @Path("/deploy")
     @Consumes(MediaType.APPLICATION_JSON)
     public javax.ws.rs.core.Response deployPipelineElementContainer(DockerContainer container) {
-        ContainerDeploymentStatus status = DockerOrchestratorManager.getInstance().deploy(container);
+        ContainerDeploymentStatus status = DockerEngineManager.getInstance().deploy(container);
 
         if (status.getStatus() == ContainerStatus.DEPLOYED) {
             NodeManager.getInstance().addToRegisteredContainers(status.getContainer());
@@ -52,7 +60,7 @@ public class ContainerDeploymentResource extends AbstractResource {
     @Path("/remove")
     @Consumes(MediaType.APPLICATION_JSON)
     public javax.ws.rs.core.Response removePipelineElementContainer(DockerContainer container) {
-        ContainerDeploymentStatus status = DockerOrchestratorManager.getInstance().remove(container);
+        ContainerDeploymentStatus status = DockerEngineManager.getInstance().remove(container);
 
         if (status.getStatus() == ContainerStatus.REMOVED) {
             InvocableElementManager.getInstance().unregister();
diff --git a/streampipes-node-controller/src/main/java/org/apache/streampipes/node/controller/api/HealthCheckResource.java b/streampipes-node-controller/src/main/java/org/apache/streampipes/node/controller/api/HealthCheckResource.java
index f4ac429..5a127a6 100644
--- a/streampipes-node-controller/src/main/java/org/apache/streampipes/node/controller/api/HealthCheckResource.java
+++ b/streampipes-node-controller/src/main/java/org/apache/streampipes/node/controller/api/HealthCheckResource.java
@@ -17,7 +17,10 @@
  */
 package org.apache.streampipes.node.controller.api;
 
+import org.apache.streampipes.model.NodeHealthStatus;
 import org.apache.streampipes.node.controller.config.NodeConfiguration;
+import org.apache.streampipes.node.controller.management.node.NodeManager;
+import org.apache.streampipes.node.controller.management.resource.ResourceManager;
 
 import javax.ws.rs.GET;
 import javax.ws.rs.Path;
@@ -31,6 +34,11 @@ public class HealthCheckResource extends AbstractResource {
     @GET
     @Produces(MediaType.APPLICATION_JSON)
     public Response getHealth() {
-        return ok(String.format("PONG: %s", NodeConfiguration.getNodeControllerId()));
+        NodeHealthStatus status = new NodeHealthStatus();
+        status.setNodeControllerId(NodeConfiguration.getNodeControllerId());
+        status.setSuccess(true);
+        status.setTimestamp(System.currentTimeMillis());
+        status.setOptionalMessage("I'm a healthy node");
+        return ok(status);
     }
 }
diff --git a/streampipes-node-controller/src/main/java/org/apache/streampipes/node/controller/api/NodeInfoDescriptionResource.java b/streampipes-node-controller/src/main/java/org/apache/streampipes/node/controller/api/NodeInfoDescriptionResource.java
index d37e246..4e4aa76 100644
--- a/streampipes-node-controller/src/main/java/org/apache/streampipes/node/controller/api/NodeInfoDescriptionResource.java
+++ b/streampipes-node-controller/src/main/java/org/apache/streampipes/node/controller/api/NodeInfoDescriptionResource.java
@@ -17,6 +17,7 @@
  */
 package org.apache.streampipes.node.controller.api;
 
+import org.apache.streampipes.model.node.NodeCondition;
 import org.apache.streampipes.model.node.NodeInfoDescription;
 import org.apache.streampipes.node.controller.management.node.NodeManager;
 import org.apache.streampipes.node.controller.management.relay.DataStreamRelayManager;
diff --git a/streampipes-node-controller/src/main/java/org/apache/streampipes/node/controller/config/NodeConfiguration.java b/streampipes-node-controller/src/main/java/org/apache/streampipes/node/controller/config/NodeConfiguration.java
index 485d2ad..d27dc46 100644
--- a/streampipes-node-controller/src/main/java/org/apache/streampipes/node/controller/config/NodeConfiguration.java
+++ b/streampipes-node-controller/src/main/java/org/apache/streampipes/node/controller/config/NodeConfiguration.java
@@ -294,6 +294,7 @@ public final class NodeConfiguration {
                     configMap.put(envKey, value);
                     setStreampipesVersion(value);
                     break;
+
                 case API_KEY:
                     if (!"true".equals(System.getenv("SP_DEBUG"))) {
                         if (!value.isEmpty()) {
diff --git a/streampipes-node-controller/src/main/java/org/apache/streampipes/node/controller/container/DockerExtensionsContainer.java b/streampipes-node-controller/src/main/java/org/apache/streampipes/node/controller/container/DockerExtensionsContainer.java
index 140f36f..b4d6433 100644
--- a/streampipes-node-controller/src/main/java/org/apache/streampipes/node/controller/container/DockerExtensionsContainer.java
+++ b/streampipes-node-controller/src/main/java/org/apache/streampipes/node/controller/container/DockerExtensionsContainer.java
@@ -27,6 +27,13 @@ public class DockerExtensionsContainer extends AbstractStreamPipesDockerContaine
     public DockerContainer declareDockerContainer() {
         return DockerContainerBuilder.create(StreamPipesDockerServiceID.SP_SVC_EXTENSIONS_ID)
                 .withImage("apachestreampipes/extensions-all-jvm:" + getStreamPipesVersion())
+                .supportedArchitectures(
+                        SupportedArchitectures.amd(),
+                        SupportedArchitectures.arm32v7(),
+                        SupportedArchitectures.arm64v8())
+                .supportedOperatingSystemTypes(
+                        SupportedOsType.linux(),
+                        SupportedOsType.darwin())
                 .withContainerName("streampipes-extensions")
                 .withExposedPorts(Ports.withMapping("8090"))
                 .withEnvironmentVariables(ContainerEnvBuilder.create()
diff --git a/streampipes-node-controller/src/main/java/org/apache/streampipes/node/controller/container/DockerKafkaContainer.java b/streampipes-node-controller/src/main/java/org/apache/streampipes/node/controller/container/DockerKafkaContainer.java
index b9fee60..16ba52e 100644
--- a/streampipes-node-controller/src/main/java/org/apache/streampipes/node/controller/container/DockerKafkaContainer.java
+++ b/streampipes-node-controller/src/main/java/org/apache/streampipes/node/controller/container/DockerKafkaContainer.java
@@ -28,6 +28,13 @@ public class DockerKafkaContainer extends AbstractStreamPipesDockerContainer {
     public DockerContainer declareDockerContainer() {
         return DockerContainerBuilder.create(StreamPipesDockerServiceID.SP_SVC_KAFKA_ID)
                 .withImage("fogsyio/kafka:2.2.0")
+                .supportedArchitectures(
+                        SupportedArchitectures.amd(),
+                        SupportedArchitectures.arm32v7(),
+                        SupportedArchitectures.arm64v8())
+                .supportedOperatingSystemTypes(
+                        SupportedOsType.linux(),
+                        SupportedOsType.darwin())
                 .withContainerName("streampipes-node-broker")
                 .dependsOn("svc/org.apache.streampipes.node.broker.zookeeper")
                 .withExposedPorts(Ports.withMapping("9094"))
diff --git a/streampipes-node-controller/src/main/java/org/apache/streampipes/node/controller/container/DockerMosquittoContainer.java b/streampipes-node-controller/src/main/java/org/apache/streampipes/node/controller/container/DockerMosquittoContainer.java
index b00eb30..b3fff80 100644
--- a/streampipes-node-controller/src/main/java/org/apache/streampipes/node/controller/container/DockerMosquittoContainer.java
+++ b/streampipes-node-controller/src/main/java/org/apache/streampipes/node/controller/container/DockerMosquittoContainer.java
@@ -26,6 +26,13 @@ public class DockerMosquittoContainer extends AbstractStreamPipesDockerContainer
     public DockerContainer declareDockerContainer() {
         return DockerContainerBuilder.create(StreamPipesDockerServiceID.SP_SVC_MOSQUITTO_ID)
                 .withImage("eclipse-mosquitto:1.6.12")
+                .supportedArchitectures(
+                        SupportedArchitectures.amd(),
+                        SupportedArchitectures.arm32v7(),
+                        SupportedArchitectures.arm64v8())
+                .supportedOperatingSystemTypes(
+                        SupportedOsType.linux(),
+                        SupportedOsType.darwin())
                 .withContainerName("streampipes-node-broker")
                 .withExposedPorts(Ports.withMapping("1883"))
                 .withEnvironmentVariables(ContainerEnvBuilder.create()
diff --git a/streampipes-node-controller/src/main/java/org/apache/streampipes/node/controller/container/DockerZookeeperContainer.java b/streampipes-node-controller/src/main/java/org/apache/streampipes/node/controller/container/DockerZookeeperContainer.java
index b83407a..b861084 100644
--- a/streampipes-node-controller/src/main/java/org/apache/streampipes/node/controller/container/DockerZookeeperContainer.java
+++ b/streampipes-node-controller/src/main/java/org/apache/streampipes/node/controller/container/DockerZookeeperContainer.java
@@ -26,6 +26,13 @@ public class DockerZookeeperContainer extends AbstractStreamPipesDockerContainer
     public DockerContainer declareDockerContainer() {
         return DockerContainerBuilder.create(StreamPipesDockerServiceID.SP_SVC_ZOOKEEPER_ID)
                 .withImage("fogsyio/zookeeper:3.4.13")
+                .supportedArchitectures(
+                        SupportedArchitectures.amd(),
+                        SupportedArchitectures.arm32v7(),
+                        SupportedArchitectures.arm64v8())
+                .supportedOperatingSystemTypes(
+                        SupportedOsType.linux(),
+                        SupportedOsType.darwin())
                 .withContainerName("streampipes-node-zookeeper")
                 .withExposedPorts(Ports.withMapping("2181"))
                 .withEnvironmentVariables(ContainerEnvBuilder.create()
diff --git a/streampipes-node-controller/src/main/java/org/apache/streampipes/node/controller/management/NodeControllerSubmitter.java b/streampipes-node-controller/src/main/java/org/apache/streampipes/node/controller/management/NodeControllerSubmitter.java
index 2b2d22d..399ebe9 100644
--- a/streampipes-node-controller/src/main/java/org/apache/streampipes/node/controller/management/NodeControllerSubmitter.java
+++ b/streampipes-node-controller/src/main/java/org/apache/streampipes/node/controller/management/NodeControllerSubmitter.java
@@ -26,7 +26,7 @@ import org.apache.streampipes.node.controller.container.DockerMosquittoContainer
 import org.apache.streampipes.node.controller.container.DockerZookeeperContainer;
 import org.apache.streampipes.node.controller.management.janitor.JanitorManager;
 import org.apache.streampipes.node.controller.management.node.NodeManager;
-import org.apache.streampipes.node.controller.management.orchestrator.DockerOrchestratorManager;
+import org.apache.streampipes.node.controller.management.orchestrator.DockerEngineManager;
 import org.apache.streampipes.node.controller.management.orchestrator.docker.DockerContainerDeclarerSingleton;
 import org.apache.streampipes.node.controller.management.resource.ResourceManager;
 import org.slf4j.Logger;
@@ -75,7 +75,7 @@ public abstract class NodeControllerSubmitter {
                         .register(new DockerZookeeperContainer());
 
                 LOG.info("Auto-deploy extensions and selected broker container");
-                DockerOrchestratorManager.getInstance().init();
+                DockerEngineManager.getInstance().init();
 
                 LOG.info("Start janitor manager");
                 JanitorManager.getInstance().run();
diff --git a/streampipes-node-controller/src/main/java/org/apache/streampipes/node/controller/management/node/NodeConstants.java b/streampipes-node-controller/src/main/java/org/apache/streampipes/node/controller/management/node/NodeConstants.java
index cd88a79..965d230 100644
--- a/streampipes-node-controller/src/main/java/org/apache/streampipes/node/controller/management/node/NodeConstants.java
+++ b/streampipes-node-controller/src/main/java/org/apache/streampipes/node/controller/management/node/NodeConstants.java
@@ -81,7 +81,7 @@ public class NodeConstants {
                 .forEach(rc -> {
                     DockerContainer c = new DockerContainer();
                     c.setContainerName(rc.names().get(0).replace("/", ""));
-                    c.setImageUri(rc.image());
+                    c.setImageTag(rc.image());
 
                     Optional<String> serviceId = rc.labels().entrySet().stream()
                             .filter(l -> l.getKey().contains("org.apache.streampipes.service.id"))
diff --git a/streampipes-node-controller/src/main/java/org/apache/streampipes/node/controller/management/orchestrator/DockerOrchestratorManager.java b/streampipes-node-controller/src/main/java/org/apache/streampipes/node/controller/management/orchestrator/DockerEngineManager.java
similarity index 91%
rename from streampipes-node-controller/src/main/java/org/apache/streampipes/node/controller/management/orchestrator/DockerOrchestratorManager.java
rename to streampipes-node-controller/src/main/java/org/apache/streampipes/node/controller/management/orchestrator/DockerEngineManager.java
index af6f06d..40a98f6 100644
--- a/streampipes-node-controller/src/main/java/org/apache/streampipes/node/controller/management/orchestrator/DockerOrchestratorManager.java
+++ b/streampipes-node-controller/src/main/java/org/apache/streampipes/node/controller/management/orchestrator/DockerEngineManager.java
@@ -35,21 +35,21 @@ import org.slf4j.LoggerFactory;
 
 import java.util.*;
 
-public class DockerOrchestratorManager implements IContainerOrchestrator {
+public class DockerEngineManager implements IContainerEngine {
 
     private static final Logger LOG =
-            LoggerFactory.getLogger(DockerOrchestratorManager.class.getCanonicalName());
+            LoggerFactory.getLogger(DockerEngineManager.class.getCanonicalName());
 
     private final DockerUtils docker = DockerUtils.getInstance();
-    private static DockerOrchestratorManager instance = null;
+    private static DockerEngineManager instance = null;
 
-    private DockerOrchestratorManager() {}
+    private DockerEngineManager() {}
 
-    public static DockerOrchestratorManager getInstance() {
+    public static DockerEngineManager getInstance() {
         if (instance == null) {
-            synchronized (DockerOrchestratorManager.class) {
+            synchronized (DockerEngineManager.class) {
                 if (instance == null)
-                    instance = new DockerOrchestratorManager();
+                    instance = new DockerEngineManager();
             }
         }
         return instance;
@@ -69,10 +69,10 @@ public class DockerOrchestratorManager implements IContainerOrchestrator {
     @Override
     public ContainerDeploymentStatus deploy(DockerContainer container) {
 
-        LOG.info("Pull image and deploy pipeline element container {}", container.getImageUri());
+        LOG.info("Pull image and deploy pipeline element container {}", container.getImageTag());
         Optional<Container> containerOptional = docker.getContainer(container.getContainerName());
         if (!containerOptional.isPresent()) {
-            LOG.info("Deploy pipeline element container \"" + container.getImageUri() + "\"");
+            LOG.info("Deploy pipeline element container \"" + container.getImageTag() + "\"");
             String containerId = "";
             try {
                 containerId = deployPipelineElementContainer(container);
@@ -89,7 +89,7 @@ public class DockerOrchestratorManager implements IContainerOrchestrator {
 
     @Override
     public ContainerDeploymentStatus remove(DockerContainer container) {
-        LOG.info("Remove pipeline element container: {}", container.getImageUri());
+        LOG.info("Remove pipeline element container: {}", container.getImageTag());
 
         Optional<com.spotify.docker.client.messages.Container> containerOptional = docker.getContainer(container.getContainerName());
         if(containerOptional.isPresent()) {
@@ -136,13 +136,13 @@ public class DockerOrchestratorManager implements IContainerOrchestrator {
     private String deployPipelineElementContainer(DockerContainer container, boolean pullImage) throws Exception {
         if (pullImage) {
             try {
-                docker.pullImage(container.getImageUri(), false);
+                docker.pullImage(container.getImageTag(), false);
             } catch (DockerException | InterruptedException e) {
                 LOG.error("unable to pull pipeline element container image {}", e.toString());
                 deployPipelineElementContainer(container, false);
             }
         }
-        if (!pullImage && !docker.findLocalImage(container.getImageUri())) {
+        if (!pullImage && !docker.findLocalImage(container.getImageTag())) {
             throw new NotFoundException("Image not found locally");
         }
         String containerId = docker.createContainer(container);
diff --git a/streampipes-node-controller/src/main/java/org/apache/streampipes/node/controller/management/orchestrator/IContainerOrchestrator.java b/streampipes-node-controller/src/main/java/org/apache/streampipes/node/controller/management/orchestrator/IContainerEngine.java
similarity index 96%
rename from streampipes-node-controller/src/main/java/org/apache/streampipes/node/controller/management/orchestrator/IContainerOrchestrator.java
rename to streampipes-node-controller/src/main/java/org/apache/streampipes/node/controller/management/orchestrator/IContainerEngine.java
index 5a53b92..18a09f2 100644
--- a/streampipes-node-controller/src/main/java/org/apache/streampipes/node/controller/management/orchestrator/IContainerOrchestrator.java
+++ b/streampipes-node-controller/src/main/java/org/apache/streampipes/node/controller/management/orchestrator/IContainerEngine.java
@@ -20,7 +20,7 @@ package org.apache.streampipes.node.controller.management.orchestrator;
 import org.apache.streampipes.model.node.container.DockerContainer;
 import org.apache.streampipes.node.controller.management.orchestrator.status.ContainerDeploymentStatus;
 
-public interface IContainerOrchestrator {
+public interface IContainerEngine {
 
    void init();
 
diff --git a/streampipes-node-controller/src/main/java/org/apache/streampipes/node/controller/management/orchestrator/docker/DockerContainerDeclarerSingleton.java b/streampipes-node-controller/src/main/java/org/apache/streampipes/node/controller/management/orchestrator/docker/DockerContainerDeclarerSingleton.java
index 1f1fae4..b113cd4 100644
--- a/streampipes-node-controller/src/main/java/org/apache/streampipes/node/controller/management/orchestrator/docker/DockerContainerDeclarerSingleton.java
+++ b/streampipes-node-controller/src/main/java/org/apache/streampipes/node/controller/management/orchestrator/docker/DockerContainerDeclarerSingleton.java
@@ -45,7 +45,7 @@ public class DockerContainerDeclarerSingleton {
     }
 
     public DockerContainerDeclarerSingleton register(AbstractStreamPipesDockerContainer container) {
-        LOG.info("Register container for auto-deploy: " + container.declareDockerContainer().getServiceId());
+        LOG.info("Register container descriptions for auto-deployment: " + container.declareDockerContainer().getServiceId());
         this.dockerContainers.put(
                 container.declareDockerContainer().getServiceId(),
                 container.declareDockerContainer());
diff --git a/streampipes-node-controller/src/main/java/org/apache/streampipes/node/controller/management/orchestrator/docker/utils/DockerUtils.java b/streampipes-node-controller/src/main/java/org/apache/streampipes/node/controller/management/orchestrator/docker/utils/DockerUtils.java
index ebfb2c2..6748799 100644
--- a/streampipes-node-controller/src/main/java/org/apache/streampipes/node/controller/management/orchestrator/docker/utils/DockerUtils.java
+++ b/streampipes-node-controller/src/main/java/org/apache/streampipes/node/controller/management/orchestrator/docker/utils/DockerUtils.java
@@ -135,7 +135,7 @@ public class DockerUtils {
         return ContainerConfig.builder()
                 .hostname(p.getContainerName())
                 .tty(true)
-                .image(p.getImageUri())
+                .image(p.getImageTag())
                 .labels(p.getLabels())
                 .env(p.getEnvVars())
                 .hostConfig(getHostConfig(SP_CONTAINER_NETWORK, p.getContainerPorts(), p.getVolumes()))
diff --git a/streampipes-node-controller/src/main/java/org/apache/streampipes/node/controller/management/pe/InvocableElementManager.java b/streampipes-node-controller/src/main/java/org/apache/streampipes/node/controller/management/pe/InvocableElementManager.java
index 401abc7..3e2ad80 100644
--- a/streampipes-node-controller/src/main/java/org/apache/streampipes/node/controller/management/pe/InvocableElementManager.java
+++ b/streampipes-node-controller/src/main/java/org/apache/streampipes/node/controller/management/pe/InvocableElementManager.java
@@ -134,7 +134,7 @@ public class InvocableElementManager implements IPipelineElementLifeCycle {
         // TODO: unregister element from Consul and
         setSupportedPipelineElements(Collections.emptyList());
         try {
-            String url = generateBackendEndpoint();
+            String url = generateNodeManagementUpdateEndpoint();
             String desc = toJson(getNodeInfoDescription());
             Request.Put(url)
                     .bodyString(desc, ContentType.APPLICATION_JSON)
@@ -165,7 +165,7 @@ public class InvocableElementManager implements IPipelineElementLifeCycle {
 
     public void postOffloadRequest(InvocableStreamPipesEntity instanceToOffload){
         try {
-            String url = generateBackendOffloadEndpoint();
+            String url = generatePipelineManagementOffloadEndpoint();
             String desc = toJson(instanceToOffload);
             org.apache.http.client.fluent.Response resp = Request.Post(url)
                     .bodyString(desc, ContentType.APPLICATION_JSON)
@@ -260,7 +260,7 @@ public class InvocableElementManager implements IPipelineElementLifeCycle {
     private void updateAndSyncNodeInfoDescription(InvocableRegistration registration) {
         setSupportedPipelineElements(registration.getSupportedPipelineElementAppIds());
         try {
-            String url = generateBackendEndpoint();
+            String url = generateNodeManagementUpdateEndpoint();
             String desc = toJson(getNodeInfoDescription());
             Request.Put(url)
                     .bodyString(desc, ContentType.APPLICATION_JSON)
@@ -294,7 +294,7 @@ public class InvocableElementManager implements IPipelineElementLifeCycle {
         return new SimpleTopicDefinition( RECONFIGURATION_TOPIC + DOT + runningInstanceId);
     }
 
-    private String generateBackendEndpoint() {
+    private String generateNodeManagementUpdateEndpoint() {
         return HTTP_PROTOCOL
                 + NodeConfiguration.getBackendHost()
                 + COLON
@@ -305,13 +305,13 @@ public class InvocableElementManager implements IPipelineElementLifeCycle {
                 + NodeConfiguration.getNodeControllerId();
     }
 
-    private String generateBackendOffloadEndpoint() {
+    private String generatePipelineManagementOffloadEndpoint() {
         return HTTP_PROTOCOL
                 + NodeConfiguration.getBackendHost()
                 + COLON
                 + NodeConfiguration.getBackendPort()
                 + SLASH
-                + "streampipes-backend/api/v2/users/admin@streampipes.org/nodes/offload";
+                + "streampipes-backend/api/v2/users/admin@streampipes.org/pipelines/offload";
     }
 
     private void registerAtConsul(InvocableRegistration registration) {
diff --git a/streampipes-node-management/pom.xml b/streampipes-node-management/pom.xml
new file mode 100644
index 0000000..457329a
--- /dev/null
+++ b/streampipes-node-management/pom.xml
@@ -0,0 +1,50 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+  ~ Licensed to the Apache Software Foundation (ASF) under one or more
+  ~ contributor license agreements.  See the NOTICE file distributed with
+  ~ this work for additional information regarding copyright ownership.
+  ~ The ASF licenses this file to You under the Apache License, Version 2.0
+  ~ (the "License"); you may not use this file except in compliance with
+  ~ the License.  You may obtain a copy of the License at
+  ~
+  ~    http://www.apache.org/licenses/LICENSE-2.0
+  ~
+  ~ Unless required by applicable law or agreed to in writing, software
+  ~ distributed under the License is distributed on an "AS IS" BASIS,
+  ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  ~ See the License for the specific language governing permissions and
+  ~ limitations under the License.
+  ~
+  -->
+
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+    <parent>
+        <artifactId>streampipes-parent</artifactId>
+        <groupId>org.apache.streampipes</groupId>
+        <version>0.68.0-SNAPSHOT</version>
+    </parent>
+    <modelVersion>4.0.0</modelVersion>
+
+    <artifactId>streampipes-node-management</artifactId>
+    <dependencies>
+        <dependency>
+            <groupId>org.apache.streampipes</groupId>
+            <artifactId>streampipes-model</artifactId>
+            <version>0.68.0-SNAPSHOT</version>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.streampipes</groupId>
+            <artifactId>streampipes-storage-api</artifactId>
+            <version>0.68.0-SNAPSHOT</version>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.streampipes</groupId>
+            <artifactId>streampipes-storage-management</artifactId>
+            <version>0.68.0-SNAPSHOT</version>
+        </dependency>
+    </dependencies>
+
+
+</project>
\ No newline at end of file
diff --git a/streampipes-node-management/src/main/java/org/apache/streampipes/node/management/NodeManagement.java b/streampipes-node-management/src/main/java/org/apache/streampipes/node/management/NodeManagement.java
new file mode 100644
index 0000000..c4add7d
--- /dev/null
+++ b/streampipes-node-management/src/main/java/org/apache/streampipes/node/management/NodeManagement.java
@@ -0,0 +1,126 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.apache.streampipes.node.management;
+
+
+import org.apache.streampipes.commons.exceptions.SpRuntimeException;
+import org.apache.streampipes.model.message.Message;
+import org.apache.streampipes.model.message.NotificationType;
+import org.apache.streampipes.model.message.Notifications;
+import org.apache.streampipes.model.node.NodeCondition;
+import org.apache.streampipes.model.node.NodeInfoDescription;
+import org.apache.streampipes.node.management.operation.monitor.health.ClusterHealthCheckMonitor;
+import org.apache.streampipes.node.management.operation.relay.RelayHandler;
+import org.apache.streampipes.node.management.operation.sync.SynchronizationFactory;
+import org.apache.streampipes.node.management.operation.sync.SynchronizationType;
+import org.apache.streampipes.node.management.utils.StorageUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.List;
+import java.util.Optional;
+
+public class NodeManagement {
+    private static final Logger LOG = LoggerFactory.getLogger(NodeManagement.class.getCanonicalName());
+
+    public static List<NodeInfoDescription> getOnlineNodes() {
+        return ClusterHealthCheckMonitor.getInstance().getAllHealthyNodes();
+    }
+
+    public static List<NodeInfoDescription> getAllNodes() {
+        return StorageUtils.getAllNodes();
+    }
+
+    public static boolean updateNodeCondition(String nodeControllerId, NodeCondition condition) {
+        Optional<NodeInfoDescription> storedNode = StorageUtils.getNode(nodeControllerId);
+        boolean status = false;
+
+        switch (condition) {
+            case ACTIVE:
+                if (storedNode.isPresent()) {
+                    StorageUtils.activateNode(nodeControllerId);
+                    status = SynchronizationFactory.synchronize(storedNode.get(), SynchronizationType.ACTIVATE_NODE);
+                }
+                break;
+            case INACTIVE:
+                if (storedNode.isPresent()) {
+                    StorageUtils.deactivateNode(nodeControllerId);
+                    status = SynchronizationFactory.synchronize(storedNode.get(), SynchronizationType.DEACTIVATE_NODE);
+                }
+                break;
+            default:
+                throw new SpRuntimeException("Node condition not supported " + condition);
+        }
+        return status;
+    }
+
+    public static Message updateNode(NodeInfoDescription desc) {
+        boolean successfullyUpdated = SynchronizationFactory.synchronize(desc, SynchronizationType.UPDATE_NODE);
+        if (successfullyUpdated) {
+            StorageUtils.updateNode(desc);
+            return Notifications.success("Node updated");
+        }
+        return Notifications.error("Could not update node");
+    }
+
+    public static void deleteNode(String nodeControllerId) {
+        StorageUtils.deleteNode(nodeControllerId);
+    }
+
+    public static Message syncRemoteNodeUpdateRequest(NodeInfoDescription desc) {
+        StorageUtils.updateNode(desc);
+        return Notifications.success("Node updated");
+    }
+
+    public static Message addOrRejoin(NodeInfoDescription desc) {
+        Optional<NodeInfoDescription> latestDesc = StorageUtils.getLatestNodeOrElseEmpty(desc.getNodeControllerId());
+
+        boolean alreadyRegistered = false;
+        if (latestDesc.isPresent()) {
+            alreadyRegistered = true;
+        }
+
+        if (!alreadyRegistered) {
+            LOG.info("New cluster node join request from http://{}:{}", desc.getHostname(), desc.getPort());
+            return addNewNode(desc);
+        } else {
+            LOG.info("Re-joined cluster node from http://{}:{}", desc.getHostname(), desc.getPort());
+            return rejoinAndSyncNode(latestDesc.get());
+        }
+    }
+
+    private static Message addNewNode(NodeInfoDescription desc) throws RuntimeException {
+        try {
+            StorageUtils.storeNode(desc);
+            LOG.info("New cluster node successfully joined http://{}:{}", desc.getHostname(), desc.getPort());
+            return Notifications.success(NotificationType.NODE_JOIN_SUCCESS);
+        } catch (Exception e) {
+            return Notifications.success(NotificationType.NODE_JOIN_ERROR);
+        }
+    }
+
+    private static Message rejoinAndSyncNode(NodeInfoDescription desc) {
+        LOG.info("Sync latest node description to http://{}:{}", desc.getHostname(), desc.getPort());
+        boolean success = SynchronizationFactory.synchronize(desc, SynchronizationType.UPDATE_NODE);
+        if (success) {
+            return new RelayHandler(desc).restart();
+        }
+        return Notifications.success(NotificationType.NODE_JOIN_ERROR);
+    }
+
+}
diff --git a/streampipes-node-management/src/main/java/org/apache/streampipes/node/management/operation/monitor/health/ClusterHealthCheckMonitor.java b/streampipes-node-management/src/main/java/org/apache/streampipes/node/management/operation/monitor/health/ClusterHealthCheckMonitor.java
new file mode 100644
index 0000000..70c8fba
--- /dev/null
+++ b/streampipes-node-management/src/main/java/org/apache/streampipes/node/management/operation/monitor/health/ClusterHealthCheckMonitor.java
@@ -0,0 +1,189 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.apache.streampipes.node.management.operation.monitor.health;
+
+import org.apache.streampipes.model.NodeHealthStatus;
+import org.apache.streampipes.model.node.NodeCondition;
+import org.apache.streampipes.model.node.NodeInfoDescription;
+import org.apache.streampipes.node.management.NodeManagement;
+import org.apache.streampipes.node.management.utils.HttpUtils;
+import org.apache.streampipes.node.management.utils.StorageUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.*;
+import java.util.concurrent.*;
+
+public class ClusterHealthCheckMonitor {
+    private static final Logger LOG = LoggerFactory.getLogger(ClusterHealthCheckMonitor.class.getCanonicalName());
+
+    private static final String HEALTH_CHECK_ROUTE = "/healthy";
+    private static final int HEALTH_CHECK_INTERVAL_SECS = 10;
+    private static final Map<String, NodeLiveness> inMemoryNodeLivenessStore = new HashMap<>();
+
+    private static ClusterHealthCheckMonitor instance = null;
+
+    private ClusterHealthCheckMonitor() {
+    }
+
+    public static ClusterHealthCheckMonitor getInstance() {
+        if (instance == null) {
+            synchronized (ClusterHealthCheckMonitor.class) {
+                if (instance == null)
+                    instance = new ClusterHealthCheckMonitor();
+            }
+        }
+        return instance;
+    }
+
+    public void run() {
+        ScheduledExecutorService executor = Executors.newSingleThreadScheduledExecutor();
+        executor.scheduleAtFixedRate(healthCheck, 0, HEALTH_CHECK_INTERVAL_SECS, TimeUnit.SECONDS);
+    }
+
+    private final Runnable healthCheck = () -> {
+        StorageUtils.persistentNodeAPI().getAllNodes().forEach(node -> {
+            String nodeControllerId = node.getNodeControllerId();
+
+            // add to inMemoryNodeLivenessStore if not already existing
+            addNodeToInMemoryStorage(node);
+
+            NodeLiveness nodeLiveness = inMemoryNodeLivenessStore.get(nodeControllerId);
+
+            int failedChecks = nodeLiveness.getNumFailedLivenessChecks();
+            int maxFailedChecks = nodeLiveness.getMaxNumFailedLivenessChecks();
+            NodeCondition lastNodeCondition = nodeLiveness.getCondition();
+
+            // conduct actual health check
+            String endpoint = HttpUtils.generateEndpoint(node, HEALTH_CHECK_ROUTE);
+            NodeHealthStatus healthCheck = new NodeHealthCheck(endpoint).execute();
+
+            switch(lastNodeCondition) {
+                case ONLINE:
+                    // node is still online with successful healthcheck
+                    if (healthCheck.isSuccess()) {
+
+                        // reset failed checks (if any)
+                        if (failedChecks > 0) {
+                            LOG.info(String.format("Healthcheck successful - %s is back online", endpoint));
+                            nodeLiveness.resetNumFailedLivenessChecks();
+                        }
+
+                        LOG.debug(String.format("Healthcheck successful - %s is still online", endpoint));
+                        // set timestamp from healthcheck
+                        node.setLastHeartBeatTime(healthCheck.getTimestamp());
+                        // update node description in DB
+                        persistNodeUpdate(node, false);
+
+                    } else {
+                        // node was online and had unsuccessful healthcheck
+                        // check if failedChecks exceeds allowed limit
+                        if (failedChecks < maxFailedChecks) {
+                            // increase failed checks
+                            nodeLiveness.increaseFailedChecks();
+                            LOG.info(String.format("Healthcheck failed - (%s/%s) for %s ",
+                                    ++failedChecks,
+                                    maxFailedChecks,
+                                    endpoint));
+
+                        } else {
+                            // node is considered offline
+                            LOG.info(String.format("Too many failed healtchecks - %s is considered offline",
+                                    nodeControllerId));
+
+                            // set node offline
+                            nodeLiveness.setOffline();
+                            node.setCondition(NodeCondition.OFFLINE);
+
+                            // update node description in DB
+                            persistNodeUpdate(node, false);
+                        }
+                    }
+                    break;
+
+                case OFFLINE:
+                    // node was considered online and is now online again
+                    if (healthCheck.isSuccess()) {
+                        LOG.info(String.format("Healthcheck successful - %s is online", endpoint));
+
+                        // set node online
+                        nodeLiveness.setOnline();
+                        node.setCondition(NodeCondition.ONLINE);
+                        // set timestamp from healthcheck
+                        node.setLastHeartBeatTime(healthCheck.getTimestamp());
+                        // update node description in DB
+                        persistNodeUpdate(node, false);
+                    } else {
+                        // still offline ... await reconnection
+                        break;
+                    }
+                    break;
+
+                case CREATED:
+                    // newly initialized nodes are in CREATED state
+                    if (healthCheck.isSuccess()) {
+                        // set online
+                        LOG.info(String.format("Healthcheck successful - %s is online", endpoint));
+
+                        // set node online
+                        nodeLiveness.setOnline();
+                        node.setCondition(NodeCondition.ONLINE);
+                        // set timestamp from healthcheck
+                        node.setLastHeartBeatTime(healthCheck.getTimestamp());
+                        // update node description in DB and sync initial online status to node controller as feedback
+                        persistNodeUpdate(node, true);
+                    }
+
+                    break;
+                default:
+                    throw new RuntimeException("Node condition unsupported " + lastNodeCondition);
+            }
+
+            // update in-memory node liveness storage
+            inMemoryNodeLivenessStore.put(nodeControllerId, nodeLiveness);
+
+        });
+    };
+
+    private void persistNodeUpdate(NodeInfoDescription node, boolean syncWithNodeController) {
+        if (syncWithNodeController) {
+            NodeManagement.updateNode(node);
+        } else {
+            StorageUtils.updateNode(node);
+        }
+    }
+
+    private void addNodeToInMemoryStorage(NodeInfoDescription node) {
+        String nodeControllerId = node.getNodeControllerId();
+        if (inMemoryNodeLivenessStore.get(nodeControllerId) == null) {
+            inMemoryNodeLivenessStore.put(nodeControllerId, new NodeLiveness(nodeControllerId,  NodeCondition.CREATED,3));
+        }
+    }
+
+    public List<NodeInfoDescription> getAllHealthyNodes() {
+        List<NodeInfoDescription> onlineNodes = new ArrayList<>();
+        inMemoryNodeLivenessStore.forEach((id, value) -> {
+            if (value.getCondition() == NodeCondition.ONLINE) {
+                Optional<NodeInfoDescription> node = StorageUtils.getNode(id);
+                node.ifPresent(onlineNodes::add);
+            }
+        });
+        return onlineNodes;
+    }
+
+}
diff --git a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/node/management/cluster/NodeSyncOptions.java b/streampipes-node-management/src/main/java/org/apache/streampipes/node/management/operation/monitor/health/HealthCheck.java
similarity index 73%
copy from streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/node/management/cluster/NodeSyncOptions.java
copy to streampipes-node-management/src/main/java/org/apache/streampipes/node/management/operation/monitor/health/HealthCheck.java
index ae67076..784cb9c 100644
--- a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/node/management/cluster/NodeSyncOptions.java
+++ b/streampipes-node-management/src/main/java/org/apache/streampipes/node/management/operation/monitor/health/HealthCheck.java
@@ -15,8 +15,13 @@
  * limitations under the License.
  *
  */
-package org.apache.streampipes.manager.node.management.cluster;
+package org.apache.streampipes.node.management.operation.monitor.health;
 
-public enum NodeSyncOptions {
-    ACTIVATE_NODE, DEACTIVATE_NODE, UPDATE_NODE,RESTART_RELAYS, HEALTHY;
+import org.apache.streampipes.model.NodeHealthStatus;
+
+import java.util.concurrent.Callable;
+
+public interface HealthCheck {
+    Callable<NodeHealthStatus> nodeHealthStatusCallable();
+    NodeHealthStatus execute();
 }
diff --git a/streampipes-node-management/src/main/java/org/apache/streampipes/node/management/operation/monitor/health/NodeHealthCheck.java b/streampipes-node-management/src/main/java/org/apache/streampipes/node/management/operation/monitor/health/NodeHealthCheck.java
new file mode 100644
index 0000000..a0a0dc3
--- /dev/null
+++ b/streampipes-node-management/src/main/java/org/apache/streampipes/node/management/operation/monitor/health/NodeHealthCheck.java
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.apache.streampipes.node.management.operation.monitor.health;
+
+import org.apache.streampipes.model.NodeHealthStatus;
+import org.apache.streampipes.node.management.utils.HttpUtils;
+
+import java.util.concurrent.*;
+
+public class NodeHealthCheck implements HealthCheck {
+
+    private static final int HEALTH_CHECK_FUTURE_TIMEOUT_SECS = 3;
+    private final String healthCheckEndpoint;
+
+    public NodeHealthCheck(String endpoint) {
+        this.healthCheckEndpoint = endpoint;
+    }
+
+    @Override
+    public NodeHealthStatus execute() {
+        ExecutorService executorService = Executors.newSingleThreadExecutor();
+
+        final Future<NodeHealthStatus> future = executorService.submit(nodeHealthStatusCallable());
+
+        NodeHealthStatus nodeHealthStatus;
+        try {
+            // blocking call until timeout is reached
+            nodeHealthStatus = future.get(HEALTH_CHECK_FUTURE_TIMEOUT_SECS, TimeUnit.SECONDS);
+        } catch (InterruptedException | ExecutionException e) {
+            nodeHealthStatus = new NodeHealthStatus(false);
+        } catch (TimeoutException e) {
+            future.cancel(true);
+            nodeHealthStatus = new NodeHealthStatus(false);
+        }
+        executorService.shutdown();
+        return nodeHealthStatus;
+    }
+
+    @Override
+    public Callable<NodeHealthStatus> nodeHealthStatusCallable() {
+        return () -> {
+            if (!Thread.currentThread().isInterrupted()) {
+                return HttpUtils.get(healthCheckEndpoint, NodeHealthStatus.class);
+            }
+            return new NodeHealthStatus(false);
+        };
+    }
+}
diff --git a/streampipes-node-management/src/main/java/org/apache/streampipes/node/management/operation/monitor/health/NodeLiveness.java b/streampipes-node-management/src/main/java/org/apache/streampipes/node/management/operation/monitor/health/NodeLiveness.java
new file mode 100644
index 0000000..37edb95
--- /dev/null
+++ b/streampipes-node-management/src/main/java/org/apache/streampipes/node/management/operation/monitor/health/NodeLiveness.java
@@ -0,0 +1,102 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.apache.streampipes.node.management.operation.monitor.health;
+
+import org.apache.streampipes.model.node.NodeCondition;
+
+public class NodeLiveness {
+
+    private String nodeControllerId;
+    private NodeCondition condition;
+    private long lastHeartBeatTime;
+    private int numFailedLivenessChecks;
+    private int maxNumFailedLivenessChecks;
+
+    public NodeLiveness() {
+    }
+
+    public NodeLiveness(String nodeControllerId, NodeCondition condition, int maxNumFailedLivenessChecks) {
+        this.nodeControllerId = nodeControllerId;
+        this.condition = condition;
+        this.maxNumFailedLivenessChecks = maxNumFailedLivenessChecks;
+    }
+
+    public NodeLiveness(String nodeControllerId, int maxNumFailedLivenessChecks) {
+        this.nodeControllerId = nodeControllerId;
+        this.maxNumFailedLivenessChecks = maxNumFailedLivenessChecks;
+        this.condition = NodeCondition.CREATED;
+    }
+
+    public String getNodeControllerId() {
+        return nodeControllerId;
+    }
+
+    public void setNodeControllerId(String nodeControllerId) {
+        this.nodeControllerId = nodeControllerId;
+    }
+
+    public NodeCondition getCondition() {
+        return condition;
+    }
+
+    public void setCondition(NodeCondition condition) {
+        this.condition = condition;
+    }
+
+    public long getLastHeartBeatTime() {
+        return lastHeartBeatTime;
+    }
+
+    public void setLastHeartBeatTime(long lastHeartBeatTime) {
+        this.lastHeartBeatTime = lastHeartBeatTime;
+    }
+
+    public int getNumFailedLivenessChecks() {
+        return numFailedLivenessChecks;
+    }
+
+    public void setNumFailedLivenessChecks(int numFailedLivenessChecks) {
+        this.numFailedLivenessChecks = numFailedLivenessChecks;
+    }
+
+    public int getMaxNumFailedLivenessChecks() {
+        return maxNumFailedLivenessChecks;
+    }
+
+    public void setOnline() {
+        this.setCondition(NodeCondition.ONLINE);
+        this.setLastHeartBeatTime(System.currentTimeMillis());
+        this.resetNumFailedLivenessChecks();
+    }
+
+    public boolean reachMaxNumFailedLivenessChecks() {
+        return this.numFailedLivenessChecks > this.maxNumFailedLivenessChecks;
+    }
+
+    public void increaseFailedChecks() {
+        this.numFailedLivenessChecks++;
+    }
+
+    public void setOffline() {
+        this.setCondition(NodeCondition.OFFLINE);
+    }
+
+    public void resetNumFailedLivenessChecks() {
+        this.setNumFailedLivenessChecks(0);
+    }
+}
diff --git a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/node/management/resources/ClusterResourceManager.java b/streampipes-node-management/src/main/java/org/apache/streampipes/node/management/operation/monitor/resources/ClusterResourceMonitor.java
similarity index 77%
rename from streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/node/management/resources/ClusterResourceManager.java
rename to streampipes-node-management/src/main/java/org/apache/streampipes/node/management/operation/monitor/resources/ClusterResourceMonitor.java
index 7b75c19..2d1b2ff 100644
--- a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/node/management/resources/ClusterResourceManager.java
+++ b/streampipes-node-management/src/main/java/org/apache/streampipes/node/management/operation/monitor/resources/ClusterResourceMonitor.java
@@ -15,11 +15,11 @@
  * limitations under the License.
  *
  */
-package org.apache.streampipes.manager.node.management.resources;
+package org.apache.streampipes.node.management.operation.monitor.resources;
+
 
 import org.apache.streampipes.model.node.NodeInfoDescription;
-import org.apache.streampipes.storage.api.INodeInfoStorage;
-import org.apache.streampipes.storage.management.StorageDispatcher;
+import org.apache.streampipes.node.management.utils.StorageUtils;
 
 import java.net.InetSocketAddress;
 import java.net.MalformedURLException;
@@ -27,19 +27,19 @@ import java.net.Socket;
 import java.net.URL;
 import java.util.List;
 
-public class ClusterResourceManager {
+public class ClusterResourceMonitor {
 
     private static final int RESOURCE_RETRIEVE_FREQUENCY_MS = 60000;
     private static final int SOCKET_TIMEOUT_MS = 500;
-    private static ClusterResourceManager instance = null;
+    private static ClusterResourceMonitor instance = null;
 
-    private ClusterResourceManager() {}
+    private ClusterResourceMonitor() {}
 
-    public static ClusterResourceManager getInstance() {
+    public static ClusterResourceMonitor getInstance() {
         if (instance == null) {
-            synchronized (ClusterResourceManager.class) {
+            synchronized (ClusterResourceMonitor.class) {
                 if (instance == null)
-                    instance = new ClusterResourceManager();
+                    instance = new ClusterResourceMonitor();
             }
         }
         return instance;
@@ -52,7 +52,7 @@ public class ClusterResourceManager {
     private final Runnable getNodes = () -> {
         while (true) {
             try {
-                List<NodeInfoDescription> nodes =  getNodeStorageApi().getAllNodes();
+                List<NodeInfoDescription> nodes =  StorageUtils.persistentNodeAPI().getAllNodes();
                 if (nodes.size() > 0) {
                     nodes.forEach(node -> {
                         try {
@@ -87,10 +87,4 @@ public class ClusterResourceManager {
         }
         return isAlive;
     }
-
-    // Helpers
-
-    private static INodeInfoStorage getNodeStorageApi() {
-        return StorageDispatcher.INSTANCE.getNoSqlStore().getNodeStorage();
-    }
 }
diff --git a/streampipes-node-management/src/main/java/org/apache/streampipes/node/management/operation/relay/RelayHandler.java b/streampipes-node-management/src/main/java/org/apache/streampipes/node/management/operation/relay/RelayHandler.java
new file mode 100644
index 0000000..b154e5e
--- /dev/null
+++ b/streampipes-node-management/src/main/java/org/apache/streampipes/node/management/operation/relay/RelayHandler.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.apache.streampipes.node.management.operation.relay;
+
+import org.apache.streampipes.model.eventrelay.SpDataStreamRelayContainer;
+import org.apache.streampipes.model.message.Message;
+import org.apache.streampipes.model.message.NotificationType;
+import org.apache.streampipes.model.message.Notifications;
+import org.apache.streampipes.model.node.NodeInfoDescription;
+import org.apache.streampipes.node.management.operation.sync.SynchronizationFactory;
+import org.apache.streampipes.node.management.operation.sync.SynchronizationType;
+import org.apache.streampipes.node.management.utils.StorageUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.List;
+
+public class RelayHandler implements RelayOperation {
+    private static final Logger LOG = LoggerFactory.getLogger(RelayHandler.class.getCanonicalName());
+
+    private final NodeInfoDescription node;
+
+    public RelayHandler(NodeInfoDescription node) {
+        this.node = node;
+    }
+
+    @Override
+    public Message restart() {
+        String nodeControllerId = node.getNodeControllerId();
+        List<SpDataStreamRelayContainer> runningRelays = StorageUtils.getAllRelaysFromNode(nodeControllerId);
+
+        if (runningRelays.size() > 0) {
+            runningRelays.forEach(relay -> {
+
+                String relayName = relay.getName();
+                String host = node.getHostname();
+                int port = node.getPort();
+
+                LOG.info("Sync active relays name={} to http://{}:{}", relayName, host, port);
+                SynchronizationFactory.synchronize(relay, SynchronizationType.RESTART_RELAYS);
+            });
+        }
+        return Notifications.success(NotificationType.NODE_JOIN_SUCCESS);
+    }
+}
diff --git a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/node/management/cluster/NodeSyncOptions.java b/streampipes-node-management/src/main/java/org/apache/streampipes/node/management/operation/relay/RelayOperation.java
similarity index 82%
copy from streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/node/management/cluster/NodeSyncOptions.java
copy to streampipes-node-management/src/main/java/org/apache/streampipes/node/management/operation/relay/RelayOperation.java
index ae67076..7a12f89 100644
--- a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/node/management/cluster/NodeSyncOptions.java
+++ b/streampipes-node-management/src/main/java/org/apache/streampipes/node/management/operation/relay/RelayOperation.java
@@ -15,8 +15,10 @@
  * limitations under the License.
  *
  */
-package org.apache.streampipes.manager.node.management.cluster;
+package org.apache.streampipes.node.management.operation.relay;
 
-public enum NodeSyncOptions {
-    ACTIVATE_NODE, DEACTIVATE_NODE, UPDATE_NODE,RESTART_RELAYS, HEALTHY;
+import org.apache.streampipes.model.message.Message;
+
+public interface RelayOperation {
+    Message restart();
 }
diff --git a/streampipes-node-management/src/main/java/org/apache/streampipes/node/management/operation/sync/SynchronizationFactory.java b/streampipes-node-management/src/main/java/org/apache/streampipes/node/management/operation/sync/SynchronizationFactory.java
new file mode 100644
index 0000000..5863f9a
--- /dev/null
+++ b/streampipes-node-management/src/main/java/org/apache/streampipes/node/management/operation/sync/SynchronizationFactory.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.apache.streampipes.node.management.operation.sync;
+
+import org.apache.streampipes.node.management.utils.HttpRequest;
+
+public class SynchronizationFactory {
+
+    private static final String BASE_ROUTE = "/api/v2/node";
+
+    public static <T> boolean synchronize(T element, SynchronizationType type) {
+        switch (type) {
+            case ACTIVATE_NODE:
+                return new SynchronizationHandler<T>(element, BASE_ROUTE + "/info/activate",
+                        HttpRequest.POST, false).synchronize();
+            case DEACTIVATE_NODE:
+                return new SynchronizationHandler<T>(element, BASE_ROUTE +"/info/deactivate",
+                        HttpRequest.POST, false).synchronize();
+            case UPDATE_NODE:
+                return new SynchronizationHandler<T>(element, BASE_ROUTE + "/info",
+                        HttpRequest.PUT, true).synchronize();
+            case RESTART_RELAYS:
+                return new SynchronizationHandler<T>(element, BASE_ROUTE + "/stream/relay/invoke",
+                        HttpRequest.POST, true).synchronize();
+            default:
+                return false;
+        }
+    }
+}
diff --git a/streampipes-node-management/src/main/java/org/apache/streampipes/node/management/operation/sync/SynchronizationHandler.java b/streampipes-node-management/src/main/java/org/apache/streampipes/node/management/operation/sync/SynchronizationHandler.java
new file mode 100644
index 0000000..478ec6a
--- /dev/null
+++ b/streampipes-node-management/src/main/java/org/apache/streampipes/node/management/operation/sync/SynchronizationHandler.java
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.apache.streampipes.node.management.operation.sync;
+
+import org.apache.streampipes.node.management.utils.HttpRequest;
+import org.apache.streampipes.node.management.utils.HttpUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class SynchronizationHandler<T> {
+    private static final Logger LOG = LoggerFactory.getLogger(SynchronizationHandler.class.getCanonicalName());
+
+    private static final long RETRY_INTERVAL_MS = 5000;
+    private final T element;
+    private final String route;
+    private final HttpRequest httpRequest;
+    private final boolean withBody;
+
+    public SynchronizationHandler(T element, String route, HttpRequest httpRequest, boolean withBody) {
+        this.element = element;
+        this.route = route;
+        this.httpRequest = httpRequest;
+        this.withBody = withBody;
+    }
+
+    public boolean synchronize() {
+        boolean synced = false;
+
+        String body = "{}";
+        if (withBody) {
+            body = HttpUtils.serialize(element);
+        }
+
+        String url = HttpUtils.generateEndpoint(element, route);
+        LOG.info("Trying to sync with node controller=" + url);
+
+        boolean connected = false;
+        while (!connected) {
+            // call node controller REST endpoints
+            switch (httpRequest) {
+                case POST:
+                    connected = HttpUtils.post(url, body);
+                    break;
+                case PUT :
+                    connected = HttpUtils.put(url, body);
+                    break;
+            }
+
+            if (!connected) {
+                LOG.debug("Retrying in {} seconds", (RETRY_INTERVAL_MS / 10000));
+                try {
+                    Thread.sleep(RETRY_INTERVAL_MS);
+                } catch (InterruptedException e) {
+                    e.printStackTrace();
+                }
+            }
+            synced = true;
+        }
+        LOG.info("Successfully synced with node controller=" + url);
+        return synced;
+    }
+}
diff --git a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/node/management/cluster/NodeSyncOptions.java b/streampipes-node-management/src/main/java/org/apache/streampipes/node/management/operation/sync/SynchronizationType.java
similarity index 82%
copy from streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/node/management/cluster/NodeSyncOptions.java
copy to streampipes-node-management/src/main/java/org/apache/streampipes/node/management/operation/sync/SynchronizationType.java
index ae67076..225133e 100644
--- a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/node/management/cluster/NodeSyncOptions.java
+++ b/streampipes-node-management/src/main/java/org/apache/streampipes/node/management/operation/sync/SynchronizationType.java
@@ -15,8 +15,11 @@
  * limitations under the License.
  *
  */
-package org.apache.streampipes.manager.node.management.cluster;
+package org.apache.streampipes.node.management.operation.sync;
 
-public enum NodeSyncOptions {
-    ACTIVATE_NODE, DEACTIVATE_NODE, UPDATE_NODE,RESTART_RELAYS, HEALTHY;
+public enum SynchronizationType {
+    ACTIVATE_NODE,
+    DEACTIVATE_NODE,
+    UPDATE_NODE,
+    RESTART_RELAYS;
 }
diff --git a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/node/management/cluster/NodeSyncOptions.java b/streampipes-node-management/src/main/java/org/apache/streampipes/node/management/utils/HttpRequest.java
similarity index 82%
rename from streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/node/management/cluster/NodeSyncOptions.java
rename to streampipes-node-management/src/main/java/org/apache/streampipes/node/management/utils/HttpRequest.java
index ae67076..5054bad 100644
--- a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/node/management/cluster/NodeSyncOptions.java
+++ b/streampipes-node-management/src/main/java/org/apache/streampipes/node/management/utils/HttpRequest.java
@@ -15,8 +15,8 @@
  * limitations under the License.
  *
  */
-package org.apache.streampipes.manager.node.management.cluster;
+package org.apache.streampipes.node.management.utils;
 
-public enum NodeSyncOptions {
-    ACTIVATE_NODE, DEACTIVATE_NODE, UPDATE_NODE,RESTART_RELAYS, HEALTHY;
+public enum HttpRequest {
+    GET,POST,PUT,DELETE
 }
diff --git a/streampipes-node-management/src/main/java/org/apache/streampipes/node/management/utils/HttpUtils.java b/streampipes-node-management/src/main/java/org/apache/streampipes/node/management/utils/HttpUtils.java
new file mode 100644
index 0000000..1cfe0a6
--- /dev/null
+++ b/streampipes-node-management/src/main/java/org/apache/streampipes/node/management/utils/HttpUtils.java
@@ -0,0 +1,130 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.apache.streampipes.node.management.utils;
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+import org.apache.http.client.fluent.Request;
+import org.apache.http.client.fluent.Response;
+import org.apache.http.entity.ContentType;
+import org.apache.streampipes.commons.exceptions.SpRuntimeException;
+import org.apache.streampipes.model.eventrelay.SpDataStreamRelayContainer;
+import org.apache.streampipes.model.node.NodeInfoDescription;
+import org.apache.streampipes.serializers.json.JacksonSerializer;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+
+public class HttpUtils {
+    private static final Logger LOG = LoggerFactory.getLogger(HttpUtils.class.getCanonicalName());
+
+    private static final String HTTP_PROTOCOL = "http";
+    private static final int CONNECT_TIMEOUT = 1000;
+
+    public static Response get(String url) {
+        try {
+            return Request.Get(url)
+                    .connectTimeout(CONNECT_TIMEOUT)
+                    .execute();
+        } catch (IOException e) {
+            throw new SpRuntimeException("Something went wrong during GET request", e);
+        }
+    }
+
+    public static <T>T get(String url, Class<T> clazz) {
+        try {
+            return deserialize(Request.Get(url)
+                            .connectTimeout(CONNECT_TIMEOUT)
+                            .execute()
+                            .returnContent().asString(), clazz);
+        } catch (IOException e) {
+            throw new SpRuntimeException("Something went wrong during GET request", e);
+        }
+    }
+
+    public static boolean put(String url, String body) {
+        try {
+            Request.Put(url)
+                    .bodyString(body, ContentType.APPLICATION_JSON)
+                    .connectTimeout(CONNECT_TIMEOUT)
+                    .execute();
+            return true;
+        } catch (IOException e) {
+            LOG.error("Something went wrong during PUT request", e);
+            return false;
+        }
+    }
+
+    public static boolean post(String url, String body) {
+        try {
+            Request.Post(url)
+                    .bodyString(body, ContentType.APPLICATION_JSON)
+                    .connectTimeout(CONNECT_TIMEOUT)
+                    .execute();
+            return true;
+        } catch (IOException e) {
+            LOG.error("Something went wrong during POST request", e);
+            return false;
+        }
+    }
+
+    public static <T> String generateEndpoint(T object, String route) {
+        String host = null;
+        int port = -1;
+
+        if (route.startsWith("/")) {
+            route = route.substring(1);
+        }
+
+        if (object instanceof NodeInfoDescription) {
+            NodeInfoDescription node = (NodeInfoDescription) object;
+            host = node.getHostname();
+            port = node.getPort();
+        } else if (object instanceof SpDataStreamRelayContainer) {
+            SpDataStreamRelayContainer relay = (SpDataStreamRelayContainer) object;
+            host = relay.getDeploymentTargetNodeHostname();
+            port = relay.getDeploymentTargetNodePort();
+        } else {
+            throw new SpRuntimeException("Object class not supported " + object.getClass());
+        }
+
+        if (host != null && port != -1) {
+            return String.format("%s://%s:%s/%s", HTTP_PROTOCOL, host, port, route);
+        } else {
+            throw new SpRuntimeException("Could not generate endpoint");
+        }
+    }
+
+    public static <T> String serialize(T object) {
+        try {
+            return JacksonSerializer.getObjectMapper().writeValueAsString(object);
+        } catch (JsonProcessingException e) {
+            throw new SpRuntimeException("Could not serialize object");
+        }
+    }
+
+    public static <T>T deserialize(String objectString, Class<T> clazz) {
+        try {
+            return JacksonSerializer
+                    .getObjectMapper()
+                    .readValue(objectString, clazz);
+        } catch (JsonProcessingException e) {
+            throw new SpRuntimeException("Could not deserialize object");
+        }
+    }
+}
diff --git a/streampipes-node-management/src/main/java/org/apache/streampipes/node/management/utils/StorageUtils.java b/streampipes-node-management/src/main/java/org/apache/streampipes/node/management/utils/StorageUtils.java
new file mode 100644
index 0000000..922f334
--- /dev/null
+++ b/streampipes-node-management/src/main/java/org/apache/streampipes/node/management/utils/StorageUtils.java
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.apache.streampipes.node.management.utils;
+
+import org.apache.streampipes.model.eventrelay.SpDataStreamRelayContainer;
+import org.apache.streampipes.model.node.NodeInfoDescription;
+import org.apache.streampipes.storage.api.INodeDataStreamRelay;
+import org.apache.streampipes.storage.api.INodeInfoStorage;
+import org.apache.streampipes.storage.api.IPipelineStorage;
+import org.apache.streampipes.storage.management.StorageDispatcher;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.List;
+import java.util.Optional;
+
+public class StorageUtils {
+    private static final Logger LOG = LoggerFactory.getLogger(StorageUtils.class.getCanonicalName());
+
+    public static INodeInfoStorage persistentNodeAPI() {
+        return StorageDispatcher.INSTANCE.getNoSqlStore().getNodeStorage();
+    }
+
+    public static INodeDataStreamRelay persistentRelayAPI(){
+        return StorageDispatcher.INSTANCE.getNoSqlStore().getNodeDataStreamRelayStorage();
+    }
+
+    public static IPipelineStorage persistentPipelineAPI() {
+        return StorageDispatcher.INSTANCE.getNoSqlStore().getPipelineStorageAPI();
+    }
+
+    public static void storeNode(NodeInfoDescription node) {
+        persistentNodeAPI().storeNode(node);
+    }
+
+    public static Optional<NodeInfoDescription> getNode(String nodeControllerId){
+        return persistentNodeAPI().getNode(nodeControllerId);
+    }
+
+    public static List<NodeInfoDescription> getAllNodes() {
+        return StorageUtils.persistentNodeAPI().getAllNodes();
+    }
+
+    public static Optional<NodeInfoDescription> getLatestNodeOrElseEmpty(String nodeControllerId) {
+        return StorageUtils.persistentNodeAPI().getAllNodes().stream()
+                .filter(n -> n.getNodeControllerId().equals(nodeControllerId))
+                .findAny();
+    }
+
+    public static void updateNode(NodeInfoDescription node){
+        persistentNodeAPI().updateNode(node);
+    }
+
+    public static void deleteNode(String nodeControllerId){
+        persistentNodeAPI().deleteNode(nodeControllerId);
+    }
+
+    public static List<SpDataStreamRelayContainer> getAllRelaysFromNode(String nodeControllerId) {
+        return persistentRelayAPI().getAllByNodeControllerId(nodeControllerId);
+    }
+
+    public static void activateNode(String nodeControllerId) {
+        persistentNodeAPI().activateNode(nodeControllerId);
+    }
+
+    public static void deactivateNode(String nodeControllerId) {
+        persistentNodeAPI().deactivateNode(nodeControllerId);
+    }
+}
diff --git a/streampipes-pipeline-management/pom.xml b/streampipes-pipeline-management/pom.xml
index 5cf9721..22f2abc 100644
--- a/streampipes-pipeline-management/pom.xml
+++ b/streampipes-pipeline-management/pom.xml
@@ -117,6 +117,11 @@
             <artifactId>streampipes-user-management</artifactId>
             <version>0.68.0-SNAPSHOT</version>
         </dependency>
+        <dependency>
+            <groupId>org.apache.streampipes</groupId>
+            <artifactId>streampipes-node-management</artifactId>
+            <version>0.68.0-SNAPSHOT</version>
+        </dependency>
 
         <!-- External dependencies -->
         <dependency>
diff --git a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/pipeline/AbstractPipelineExecutor.java b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/pipeline/AbstractPipelineExecutor.java
index c770121..2853a14 100644
--- a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/pipeline/AbstractPipelineExecutor.java
+++ b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/pipeline/AbstractPipelineExecutor.java
@@ -21,7 +21,7 @@ import org.apache.streampipes.commons.exceptions.SpRuntimeException;
 import org.apache.streampipes.manager.data.PipelineGraph;
 import org.apache.streampipes.manager.data.PipelineGraphHelpers;
 import org.apache.streampipes.manager.execution.http.GraphSubmitter;
-import org.apache.streampipes.manager.node.StreamPipesClusterManager;
+
 import org.apache.streampipes.manager.util.TemporaryGraphStorage;
 import org.apache.streampipes.model.SpDataSet;
 import org.apache.streampipes.model.SpDataStream;
@@ -33,9 +33,11 @@ import org.apache.streampipes.model.graph.DataProcessorInvocation;
 import org.apache.streampipes.model.graph.DataSinkInvocation;
 import org.apache.streampipes.model.grounding.EventGrounding;
 import org.apache.streampipes.model.grounding.KafkaTransportProtocol;
+import org.apache.streampipes.model.grounding.TransportProtocol;
 import org.apache.streampipes.model.pipeline.Pipeline;
 import org.apache.streampipes.model.pipeline.PipelineOperationStatus;
 import org.apache.streampipes.model.staticproperty.SecretStaticProperty;
+import org.apache.streampipes.storage.api.INodeDataStreamRelay;
 import org.apache.streampipes.storage.api.IPipelineStorage;
 import org.apache.streampipes.storage.management.StorageDispatcher;
 import org.apache.streampipes.user.management.encryption.CredentialsManager;
@@ -85,15 +87,18 @@ public abstract class AbstractPipelineExecutor {
     }
 
     protected void storeDataStreamRelayContainer(List<SpDataStreamRelayContainer> relays) {
-        relays.forEach(StreamPipesClusterManager::persistDataStreamRelay);
+        //relays.forEach(StreamPipesClusterManager::persistDataStreamRelay);
+        relays.forEach(relay -> getDataStreamRelayApi().addRelayContainer(relay));
     }
 
     protected void deleteDataStreamRelayContainer(List<SpDataStreamRelayContainer> relays) {
-        relays.forEach(StreamPipesClusterManager::deleteDataStreamRelay);
+        //relays.forEach(StreamPipesClusterManager::deleteDataStreamRelay);
+        relays.forEach(relay -> getDataStreamRelayApi().deleteRelayContainer(relay));
     }
 
     protected void updateDataStreamRelayContainer(List<SpDataStreamRelayContainer> relays) {
-        relays.forEach(StreamPipesClusterManager::updateDataStreamRelay);
+        //relays.forEach(StreamPipesClusterManager::updateDataStreamRelay);
+        relays.forEach(relay -> getDataStreamRelayApi().updateRelayContainer(relay));
     }
 
 
@@ -125,6 +130,70 @@ public abstract class AbstractPipelineExecutor {
                 relays).detachRelaysOnMigration();
     }
 
+    protected List<SpDataStreamRelayContainer> findRelaysWhenStopping(List<NamedStreamPipesEntity> predecessors,
+                                                          InvocableStreamPipesEntity target){
+
+        List<SpDataStreamRelayContainer> relays = new ArrayList<>();
+
+        predecessors.forEach(pred -> {
+            List<SpDataStreamRelay> dataStreamRelays = new ArrayList<>();
+            SpDataStreamRelayContainer relayContainer = new SpDataStreamRelayContainer();
+
+            if (pred instanceof DataProcessorInvocation){
+                //Data Processor
+                DataProcessorInvocation graph = (DataProcessorInvocation) pred;
+                if (differentDeploymentTargets(pred, target)) {
+
+                    // TODO only add if no other processor or sink depends on relay
+                    String predDOMId = pred.getDOM();
+                    String targetRunningInstanceId = target.getDeploymentRunningInstanceId();
+                    Optional<DataProcessorInvocation> foundProcessor = pipeline.getSepas().stream()
+                            .filter(processor -> processor.getConnectedTo().contains(predDOMId))
+                            .filter(processor -> !processor.getDeploymentRunningInstanceId().equals(targetRunningInstanceId))
+                            .findAny();
+
+                    Optional<DataSinkInvocation> foundSink = pipeline.getActions().stream()
+                            .filter(action -> action.getConnectedTo().contains(predDOMId))
+                            .findAny();
+
+                    boolean foundDependencyOnDifferentTarget = false;
+                    if (foundProcessor.isPresent()) {
+                        foundDependencyOnDifferentTarget =  differentDeploymentTargets(foundProcessor.get(), target);
+                    }
+
+                    if (foundSink.isPresent()) {
+                        foundDependencyOnDifferentTarget =  differentDeploymentTargets(foundSink.get(), target);
+                    }
+
+                    if (foundDependencyOnDifferentTarget) {
+                        dataStreamRelays.addAll(findRelaysWithMatchingTopic(graph, target));
+
+                        relayContainer = new SpDataStreamRelayContainer(graph);
+                        relayContainer.setOutputStreamRelays(dataStreamRelays);
+
+                        relays.add(relayContainer);
+                    }
+
+                }
+            } else if (pred instanceof SpDataStream){
+                //DataStream
+                SpDataStream stream = (SpDataStream) pred;
+                if (differentDeploymentTargets(stream, target)){
+
+                    String id = extractUniqueAdpaterId(stream.getElementId());
+                    //There is a relay that needs to be removed
+                    dataStreamRelays.add(new SpDataStreamRelay(new EventGrounding(target.getInputStreams()
+                            .get(getIndex(pred.getDOM(), target))
+                            .getEventGrounding())));
+                    String relayStrategy = pipeline.getEventRelayStrategy();
+                    relays.add(new SpDataStreamRelayContainer(id, relayStrategy, stream, dataStreamRelays));
+                }
+            }
+        });
+        return relays;
+    }
+
+
     protected List<SpDataStreamRelayContainer> findRelays(List<NamedStreamPipesEntity> predecessors,
                                                           InvocableStreamPipesEntity target){
 
@@ -138,13 +207,23 @@ public abstract class AbstractPipelineExecutor {
                 //Data Processor
                 DataProcessorInvocation graph = (DataProcessorInvocation) pred;
                 if (differentDeploymentTargets(pred, target)) {
-                    dataStreamRelays.addAll(findRelaysWithMatchingTopic(graph, target));
 
-                    //dsRelayContainer.setRunningStreamRelayInstanceId(pipeline.getPipelineId());
-                    relayContainer = new SpDataStreamRelayContainer(graph);
-                    relayContainer.setOutputStreamRelays(dataStreamRelays);
+                    String runningRelayId = ((DataProcessorInvocation) pred).getDeploymentRunningInstanceId();
+                    Optional<SpDataStreamRelayContainer> existingRelay = getRelayContainerById(runningRelayId);
+
+                    // only add relay if not existing - prevent from duplicate relays with same topic to same target
+                    Collection<? extends SpDataStreamRelay> foundRelays = findRelaysWithMatchingTopic(graph, target);
+
+                    if (!existingRelay.isPresent() || missingRelayToTarget(existingRelay.get(), foundRelays)) {
+                        dataStreamRelays.addAll(findRelaysWithMatchingTopic(graph, target));
+
+                        //dsRelayContainer.setRunningStreamRelayInstanceId(pipeline.getPipelineId());
+                        relayContainer = new SpDataStreamRelayContainer(graph);
+                        relayContainer.setOutputStreamRelays(dataStreamRelays);
+
+                        relays.add(relayContainer);
+                    }
 
-                    relays.add(relayContainer);
                 }
             } else if (pred instanceof SpDataStream){
                 //DataStream
@@ -154,7 +233,7 @@ public abstract class AbstractPipelineExecutor {
                     String id = extractUniqueAdpaterId(stream.getElementId());
                     Optional<SpDataStreamRelayContainer> existingRelay = getRelayContainerById(id);
 
-                    // only add relay if not existing
+                    // only add relay if not existing - prevent from duplicate relays with same topic
                     if(!existingRelay.isPresent()) {
                         //There is a relay that needs to be removed
                         dataStreamRelays.add(new SpDataStreamRelay(new EventGrounding(target.getInputStreams()
@@ -162,6 +241,12 @@ public abstract class AbstractPipelineExecutor {
                                 .getEventGrounding())));
                         String relayStrategy = pipeline.getEventRelayStrategy();
                         relays.add(new SpDataStreamRelayContainer(id, relayStrategy, stream, dataStreamRelays));
+                    } else {
+                        // generate relays for adapter streams to remote processors
+                        List<SpDataStreamRelayContainer> generatedRelays =
+                                generateDataStreamRelays(Collections.singletonList(target));
+
+                        relays.addAll(generatedRelays);
                     }
                 }
             }
@@ -169,6 +254,36 @@ public abstract class AbstractPipelineExecutor {
         return relays;
     }
 
+    private boolean missingRelayToTarget(SpDataStreamRelayContainer existingRelayContainer,
+                                         Collection<? extends SpDataStreamRelay> foundRelays) {
+
+        List<TransportProtocol> set = foundRelays.stream()
+                .map(SpDataStreamRelay::getEventGrounding)
+                .map(EventGrounding::getTransportProtocol)
+                .collect(Collectors.toList());
+
+        List<TransportProtocol> relay = existingRelayContainer.getOutputStreamRelays().stream()
+                .map(SpDataStreamRelay::getEventGrounding)
+                .map(EventGrounding::getTransportProtocol)
+                .collect(Collectors.toList());
+
+        boolean missing = true;
+        for (TransportProtocol tp: set) {
+            for (TransportProtocol r: relay) {
+                String targetTopic = tp.getTopicDefinition().getActualTopicName();
+                String rTopic = r.getTopicDefinition().getActualTopicName();
+                String targetHost = tp.getBrokerHostname();
+                String rHost = r.getBrokerHostname();
+
+                if (targetHost.equals(rHost) && targetTopic.equals(rTopic)) {
+                    missing = false;
+                }
+            }
+        }
+
+        return missing;
+    }
+
     protected List<NamedStreamPipesEntity> getPredecessors(NamedStreamPipesEntity source,
                                                            InvocableStreamPipesEntity target,
                                                            PipelineGraph pipelineGraph,
@@ -251,6 +366,9 @@ public abstract class AbstractPipelineExecutor {
             for (DataProcessorInvocation processor : pipeline.getSepas()) {
                 if (differentDeploymentTargets(processor, graph) && connected(processor, graph)) {
                     if (!relayExists(relays, processor.getDeploymentRunningInstanceId())) {
+//                        String previousId = processor.getDeploymentRunningInstanceId();
+//                        String modifiedId = previousId + "-" + processor.getDeploymentTargetNodeId();
+//                        processor.setDeploymentRunningInstanceId(modifiedId);
                         SpDataStreamRelayContainer processorRelay = new SpDataStreamRelayContainer(processor);
                         relays.add(processorRelay);
                     }
@@ -341,6 +459,15 @@ public abstract class AbstractPipelineExecutor {
     }
 
     /**
+     * Get data stream relay storage dispatcher API
+     *
+     * @return INodeDataStreamRelay NoSQL storage interface for data stream relays
+     */
+    private INodeDataStreamRelay getDataStreamRelayApi() {
+        return StorageDispatcher.INSTANCE.getNoSqlStore().getNodeDataStreamRelayStorage();
+    }
+
+    /**
      * Extract topic name
      *
      * @param entity
@@ -363,15 +490,17 @@ public abstract class AbstractPipelineExecutor {
      * Compare deployment targets of two pipeline elements, namely data stream/processor (source) and data
      * processor/sink (target)
      *
-     * @param source    data stream/processor
-     * @param target    data processor/sink
+     * @param e1
+     * @param e2
      * @return boolean value that returns true if source and target share the same deployment target, else false
      */
-    private boolean differentDeploymentTargets(NamedStreamPipesEntity source, InvocableStreamPipesEntity target) {
-        if (source instanceof SpDataStream) {
-            return !((SpDataStream) source).getDeploymentTargetNodeId().equals(target.getDeploymentTargetNodeId());
-        } else if (source instanceof DataProcessorInvocation) {
-            return !((DataProcessorInvocation) source).getDeploymentTargetNodeId().equals(target.getDeploymentTargetNodeId());
+    private boolean differentDeploymentTargets(NamedStreamPipesEntity e1, InvocableStreamPipesEntity e2) {
+        if (e1 instanceof SpDataStream) {
+            return !((SpDataStream) e1).getDeploymentTargetNodeId().equals(e2.getDeploymentTargetNodeId());
+        } else if (e1 instanceof DataProcessorInvocation) {
+            return !((DataProcessorInvocation) e1).getDeploymentTargetNodeId().equals(e2.getDeploymentTargetNodeId());
+        } else if (e1 instanceof DataSinkInvocation) {
+            return !((DataSinkInvocation) e1).getDeploymentTargetNodeId().equals(e2.getDeploymentTargetNodeId());
         }
         throw new SpRuntimeException("Matching deployment targets check failed");
     }
diff --git a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/pipeline/PipelineMigrationExecutor.java b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/pipeline/PipelineMigrationExecutor.java
index 49cf71f..f5ead26 100644
--- a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/pipeline/PipelineMigrationExecutor.java
+++ b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/pipeline/PipelineMigrationExecutor.java
@@ -128,8 +128,8 @@ public class PipelineMigrationExecutor extends AbstractPipelineExecutor {
 
         // store new pipeline and relays
         storeInvocationGraphs(pipeline.getPipelineId(), graphs, dataSets);
-        storeDataStreamRelayContainer(relaysToBePersisted);
         deleteDataStreamRelayContainer(relaysToBeDeleted);
+        storeDataStreamRelayContainer(relaysToBePersisted);
 
         // set global status
         status.setSuccess(status.getElementStatus().stream().allMatch(PipelineElementStatus::isSuccess));
@@ -243,7 +243,7 @@ public class PipelineMigrationExecutor extends AbstractPipelineExecutor {
     }
 
     private PipelineOperationStatus stopRelaysFromPredecessorsBeforeMigration(PipelineOperationStatus status) {
-        List<SpDataStreamRelayContainer> relays = findRelays(predecessorsBeforeMigration,
+        List<SpDataStreamRelayContainer> relays = findRelaysWhenStopping(predecessorsBeforeMigration,
                 migrationEntity.getSourceElement());
 
         updateRelaysToBeDeleted(relays);
@@ -343,6 +343,11 @@ public class PipelineMigrationExecutor extends AbstractPipelineExecutor {
     private List<SpDataStreamRelayContainer> extractRelaysFromDataProcessor(List<InvocableStreamPipesEntity> graphs) {
         return graphs.stream()
                 .map(DataProcessorInvocation.class::cast)
+//                .map(p -> {
+//                    String modifiedId = p.getDeploymentRunningInstanceId() + "-" + p.getDeploymentTargetNodeId();
+//                    p.setDeploymentRunningInstanceId(modifiedId);
+//                    return p;
+//                })
                 .map(SpDataStreamRelayContainer::new)
                 .collect(Collectors.toList());
     }
diff --git a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/matching/InvocationGraphBuilder.java b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/matching/InvocationGraphBuilder.java
index 37e5f8f..1766227 100644
--- a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/matching/InvocationGraphBuilder.java
+++ b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/matching/InvocationGraphBuilder.java
@@ -48,10 +48,6 @@ import java.util.stream.Collectors;
 
 public class InvocationGraphBuilder {
 
-  private static final String CONSUL_NODE_CONTROLLER_ROUTE = "sp/v1/node/org.apache.streampipes.node.controller";
-  private static final String NODE_BROKER_CONTAINER_HOST = "SP_NODE_BROKER_CONTAINER_HOST";
-  private static final String NODE_BROKER_CONTAINER_PORT = "SP_NODE_BROKER_CONTAINER_PORT";
-  private static final String SLASH ="/";
   private static final String DEFAULT_TAG = "default";
   private final PipelineGraph pipelineGraph;
   private final String pipelineId;
@@ -206,11 +202,13 @@ public class InvocationGraphBuilder {
     if(edgeToEdgeRelay){
       inputStream.setEventGrounding(grounding);
     }else{
-      inputStream.getEventGrounding()
-              .getTransportProtocol()
-              .getTopicDefinition()
-              .setActualTopicName(extractTopic(grounding));
-      inputStream.getEventGrounding().setElementId(grounding.getElementId());
+      inputStream.setEventGrounding(grounding);
+
+//      inputStream.getEventGrounding()
+//              .getTransportProtocol()
+//              .getTopicDefinition()
+//              .setActualTopicName(extractTopic(grounding));
+//      inputStream.getEventGrounding().setElementId(grounding.getElementId());
     }
   }
 
@@ -266,18 +264,10 @@ public class InvocationGraphBuilder {
     return outputStream;
   }
 
-  private String getTargetNodeBrokerHost(InvocableStreamPipesEntity t) {
-    Optional<NodeInfoDescription> targetNode = getNodeStorageApi().getNode(t.getDeploymentTargetNodeId());
-    if (targetNode.isPresent()) {
-      return targetNode.get().getHostname();
-    }
-    throw new SpRuntimeException("Deployment target node not found");
-  }
-
-  private int getTargetNodeBrokerPort(InvocableStreamPipesEntity t) {
+  private TransportProtocol getTargetNodeTransportProtocol(InvocableStreamPipesEntity t) {
     Optional<NodeInfoDescription> targetNode = getNodeStorageApi().getNode(t.getDeploymentTargetNodeId());
     if (targetNode.isPresent()) {
-      return targetNode.get().getPort();
+      return targetNode.get().getNodeBroker().getNodeTransportProtocol();
     }
     throw new SpRuntimeException("Deployment target node not found");
   }
@@ -426,12 +416,9 @@ public class InvocationGraphBuilder {
 
   private KafkaTransportProtocol kafkaTransportProtocol(String topic, InvocableStreamPipesEntity target) {
     if (target != null) {
-      return new KafkaTransportProtocol(
-              getTargetNodeBrokerHost(target),
-              getTargetNodeBrokerPort(target),
-              topic);
-//              getTargetNodeZookeeperHost(target),
-//              getTargetNodeZookeeperPort(target));
+      KafkaTransportProtocol ktp = (KafkaTransportProtocol) getTargetNodeTransportProtocol(target);
+      ktp.setTopicDefinition(new SimpleTopicDefinition(topic));
+      return ktp;
     }
     return new KafkaTransportProtocol(
             BackendConfig.INSTANCE.getKafkaHost(),
@@ -447,10 +434,9 @@ public class InvocationGraphBuilder {
 
   private MqttTransportProtocol mqttTransportProtocol(String topic, InvocableStreamPipesEntity target) {
     if (target != null) {
-      return new MqttTransportProtocol(
-              getTargetNodeBrokerHost(target),
-              getTargetNodeBrokerPort(target),
-              topic);
+      MqttTransportProtocol mtp = (MqttTransportProtocol) getTargetNodeTransportProtocol(target);
+      mtp.setTopicDefinition(new SimpleTopicDefinition(topic));
+      return mtp;
     }
     return new MqttTransportProtocol(
             BackendConfig.INSTANCE.getMqttHost(),
diff --git a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/migration/MigrationPipelineGenerator.java b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/migration/MigrationPipelineGenerator.java
index b8a688e..bfb209c 100644
--- a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/migration/MigrationPipelineGenerator.java
+++ b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/migration/MigrationPipelineGenerator.java
@@ -18,11 +18,11 @@
 package org.apache.streampipes.manager.migration;
 
 import com.fasterxml.jackson.core.JsonProcessingException;
-import org.apache.streampipes.manager.node.StreamPipesClusterManager;
 import org.apache.streampipes.model.base.InvocableStreamPipesEntity;
 import org.apache.streampipes.model.graph.DataProcessorInvocation;
 import org.apache.streampipes.model.node.NodeInfoDescription;
 import org.apache.streampipes.model.pipeline.Pipeline;
+import org.apache.streampipes.node.management.NodeManagement;
 
 
 import java.util.ArrayList;
@@ -35,7 +35,7 @@ public class MigrationPipelineGenerator {
     public static Pipeline generateMigrationPipeline(InvocableStreamPipesEntity entityToMigrate, Pipeline correspondingPipeline){
 
         List<NodeInfoDescription> possibleTargetNodes = new ArrayList<>();
-        List<NodeInfoDescription> nodeInfo = StreamPipesClusterManager.getAllActiveAndHealthyNodes();
+        List<NodeInfoDescription> nodeInfo = NodeManagement.getOnlineNodes();
         nodeInfo.forEach(desc ->{
             if(desc.getSupportedElements().stream().anyMatch(element -> element.equals(entityToMigrate.getAppId()))
                 && !desc.getNodeControllerId().equals(entityToMigrate.getDeploymentTargetNodeId()))
diff --git a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/migration/PipelineElementOffloadHandler.java b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/migration/PipelineElementOffloadHandler.java
new file mode 100644
index 0000000..01ade43
--- /dev/null
+++ b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/migration/PipelineElementOffloadHandler.java
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.apache.streampipes.manager.migration;
+
+import org.apache.streampipes.manager.operations.Operations;
+import org.apache.streampipes.model.base.InvocableStreamPipesEntity;
+import org.apache.streampipes.model.message.Message;
+import org.apache.streampipes.model.message.NotificationType;
+import org.apache.streampipes.model.message.Notifications;
+import org.apache.streampipes.model.pipeline.Pipeline;
+import org.apache.streampipes.model.pipeline.PipelineOperationStatus;
+import org.apache.streampipes.storage.api.IPipelineStorage;
+import org.apache.streampipes.storage.management.StorageDispatcher;
+
+public class PipelineElementOffloadHandler {
+
+    private final InvocableStreamPipesEntity graph;
+
+    public PipelineElementOffloadHandler(InvocableStreamPipesEntity elementToOffload) {
+        this.graph = elementToOffload;
+    }
+
+    public Message handleOffloading() {
+        Pipeline currentPipeline = getPipelineById(graph.getCorrespondingPipeline());
+        Pipeline offloadPipeline = MigrationPipelineGenerator.generateMigrationPipeline(graph, currentPipeline);
+          //TODO: Handle this case properly
+          if(offloadPipeline == null)
+            return Notifications.error(NotificationType.UNKNOWN_ERROR);
+
+          try {
+            PipelineOperationStatus status = Operations.handlePipelineElementMigration(offloadPipeline,
+                    true, true, true);
+            if (status.isSuccess()) {
+              return Notifications.success(NotificationType.OFFLOADING_SUCCESS);
+            } else {
+              return Notifications.success(NotificationType.OFFLOADING_ERROR);
+            }
+
+          } catch (Exception e) {
+            e.printStackTrace();
+            return Notifications.error(NotificationType.UNKNOWN_ERROR);
+          }
+    }
+
+    // Helpers
+
+    private Pipeline getPipelineById(String pipelineId) {
+        return getPipelineStorageApi().getPipeline(pipelineId);
+    }
+
+    private IPipelineStorage getPipelineStorageApi() {
+        return StorageDispatcher.INSTANCE.getNoSqlStore().getPipelineStorageAPI();
+    }
+}
diff --git a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/node/StreamPipesClusterManager.java b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/node/StreamPipesClusterManager.java
deleted file mode 100644
index 3ab464b..0000000
--- a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/node/StreamPipesClusterManager.java
+++ /dev/null
@@ -1,208 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-package org.apache.streampipes.manager.node;
-
-import org.apache.streampipes.manager.migration.MigrationPipelineGenerator;
-import org.apache.streampipes.manager.node.management.cluster.AbstractClusterManager;
-import org.apache.streampipes.manager.node.management.cluster.NodeSyncOptions;
-import org.apache.streampipes.manager.node.management.healthcheck.ClusterHealthCheck;
-import org.apache.streampipes.manager.operations.Operations;
-import org.apache.streampipes.model.Response;
-import org.apache.streampipes.model.base.InvocableStreamPipesEntity;
-import org.apache.streampipes.model.eventrelay.SpDataStreamRelayContainer;
-import org.apache.streampipes.model.message.Message;
-import org.apache.streampipes.model.message.NotificationType;
-import org.apache.streampipes.model.message.Notifications;
-import org.apache.streampipes.model.node.NodeInfoDescription;
-import org.apache.streampipes.model.pipeline.Pipeline;
-import org.apache.streampipes.model.pipeline.PipelineOperationStatus;
-import org.apache.streampipes.storage.api.INodeDataStreamRelay;
-import org.apache.streampipes.storage.api.INodeInfoStorage;
-import org.apache.streampipes.storage.api.IPipelineStorage;
-import org.apache.streampipes.storage.management.StorageDispatcher;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Optional;
-
-public class StreamPipesClusterManager extends AbstractClusterManager {
-    private static final Logger LOG = LoggerFactory.getLogger(StreamPipesClusterManager.class.getCanonicalName());
-
-    public static List<NodeInfoDescription> getAllActiveAndHealthyNodes() {
-        List<NodeInfoDescription> activeAndHealthyNodes = new ArrayList<>();
-        getNodeStorageApi().getAllActiveNodes().forEach(node -> {
-            if (ClusterHealthCheck.check(node)) {
-                activeAndHealthyNodes.add(node);
-            }
-        });
-        return activeAndHealthyNodes;
-    }
-
-    public static List<NodeInfoDescription> getAllNodes() {
-        return getNodeStorageApi().getAllNodes();
-    }
-
-    public static Message updateNode(NodeInfoDescription desc) {
-        boolean successfullyUpdated = syncWithNodeController(desc, NodeSyncOptions.UPDATE_NODE);
-        if (successfullyUpdated) {
-            getNodeStorageApi().updateNode(desc);
-            return Notifications.success("Node updated");
-        }
-        return Notifications.error("Could not update node");
-    }
-
-    public static Message syncRemoteNodeUpdateRequest(NodeInfoDescription desc) {
-        getNodeStorageApi().updateNode(desc);
-        return Notifications.success("Node updated");
-    }
-
-    public static boolean deactivateNode(String nodeControllerId) {
-        Optional<NodeInfoDescription> storedNode = getNodeStorageApi().getNode(nodeControllerId);
-        boolean status = false;
-        if (storedNode.isPresent()) {
-            getNodeStorageApi().deactivateNode(nodeControllerId);
-            status = syncWithNodeController(storedNode.get(), NodeSyncOptions.DEACTIVATE_NODE);
-        }
-        return status;
-    }
-
-    public static boolean activateNode(String nodeControllerId) {
-        Optional<NodeInfoDescription> storedNode = getNodeStorageApi().getNode(nodeControllerId);
-        boolean status = false;
-        if (storedNode.isPresent()) {
-            getNodeStorageApi().activateNode(nodeControllerId);
-            status = syncWithNodeController(storedNode.get(), NodeSyncOptions.ACTIVATE_NODE);
-        }
-        return status;
-    }
-
-    public static Message addOrRejoin(NodeInfoDescription desc) {
-        Optional<NodeInfoDescription> latestDesc = getLatestNodeOrElseEmpty(desc.getNodeControllerId());
-
-        boolean alreadyRegistered = false;
-        if (latestDesc.isPresent()) {
-            alreadyRegistered = true;
-        }
-
-        if (!alreadyRegistered) {
-            LOG.info("New cluster node join request from http://{}:{}", desc.getHostname(), desc.getPort());
-            return addNewNode(desc);
-        } else {
-            LOG.info("Re-joined cluster node from http://{}:{}", desc.getHostname(), desc.getPort());
-            return rejoinAndSyncNode(latestDesc.get());
-        }
-    }
-
-    private static Optional<NodeInfoDescription> getLatestNodeOrElseEmpty(String nodeControllerId) {
-        return getNodeStorageApi().getAllNodes().stream()
-                .filter(n -> n.getNodeControllerId().equals(nodeControllerId))
-                .findAny();
-    }
-
-    private static Message addNewNode(NodeInfoDescription desc) throws RuntimeException {
-        try {
-            getNodeStorageApi().storeNode(desc);
-            LOG.info("New cluster node successfully joined http://{}:{}", desc.getHostname(), desc.getPort());
-            return Notifications.success(NotificationType.NODE_JOIN_SUCCESS);
-        } catch (Exception e) {
-            return Notifications.success(NotificationType.NODE_JOIN_ERROR);
-        }
-    }
-
-    private static Message rejoinAndSyncNode(NodeInfoDescription desc) {
-        LOG.info("Sync latest node description to http://{}:{}", desc.getHostname(), desc.getPort());
-        boolean success = syncWithNodeController(desc, NodeSyncOptions.UPDATE_NODE);
-        if (success) {
-            return restartRelays(desc);
-        }
-        return Notifications.success(NotificationType.NODE_JOIN_ERROR);
-    }
-
-    private static Message restartRelays(NodeInfoDescription desc) {
-        List<SpDataStreamRelayContainer> runningRelays = getDataStreamRelay(desc.getNodeControllerId());
-        if (runningRelays.size() > 0) {
-            runningRelays.forEach(relay -> {
-                LOG.info("Sync active relays name={} to http://{}:{}", relay.getName(), desc.getHostname(),
-                        desc.getPort());
-                syncWithNodeController(relay, NodeSyncOptions.RESTART_RELAYS);
-            });
-        }
-        return Notifications.success(NotificationType.NODE_JOIN_SUCCESS);
-    }
-
-    public static void deleteNode(String nodeControllerId) {
-        getNodeStorageApi().deleteNode(nodeControllerId);
-    }
-
-
-    public static void persistDataStreamRelay(SpDataStreamRelayContainer relayContainer) {
-        getNodeDataStreamRelayStorageApi().addRelayContainer(relayContainer);
-    }
-
-    public static List<SpDataStreamRelayContainer> getDataStreamRelay(String nodeControllerId) {
-        return getNodeDataStreamRelayStorageApi().getAllByNodeControllerId(nodeControllerId);
-    }
-
-    public static void updateDataStreamRelay(SpDataStreamRelayContainer relayContainer) {
-        getNodeDataStreamRelayStorageApi().updateRelayContainer(relayContainer);
-    }
-
-    public static void deleteDataStreamRelay(SpDataStreamRelayContainer relayContainer) {
-        getNodeDataStreamRelayStorageApi().deleteRelayContainer(relayContainer);
-    }
-
-    public static Message handleOffloadRequest(InvocableStreamPipesEntity elementToMigrate) {
-        Pipeline currentPipeline = getPipelineStorageApi().getPipeline(elementToMigrate.getCorrespondingPipeline());
-        Pipeline offloadPipeline = MigrationPipelineGenerator.generateMigrationPipeline(elementToMigrate,
-                currentPipeline);
-        //TODO: Handle this case properly
-        if(offloadPipeline == null)
-            return Notifications.error(NotificationType.UNKNOWN_ERROR);
-
-        try {
-            PipelineOperationStatus status = Operations.handlePipelineElementMigration(offloadPipeline,
-                    true, true, true);
-            if (status.isSuccess()) {
-                return Notifications.success(NotificationType.OFFLOADING_SUCCESS);
-            } else {
-                return Notifications.success(NotificationType.OFFLOADING_ERROR);
-            }
-
-        } catch (Exception e) {
-            e.printStackTrace();
-            return Notifications.error(NotificationType.UNKNOWN_ERROR);
-        }
-    }
-
-    // Helpers
-
-    private static INodeInfoStorage getNodeStorageApi() {
-        return StorageDispatcher.INSTANCE.getNoSqlStore().getNodeStorage();
-    }
-
-    private static INodeDataStreamRelay getNodeDataStreamRelayStorageApi(){
-        return StorageDispatcher.INSTANCE.getNoSqlStore().getNodeDataStreamRelayStorage();
-    }
-
-    private static IPipelineStorage getPipelineStorageApi() {
-        return StorageDispatcher.INSTANCE.getNoSqlStore().getPipelineStorageAPI();
-    }
-
-}
diff --git a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/node/management/cluster/AbstractClusterManager.java b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/node/management/cluster/AbstractClusterManager.java
deleted file mode 100644
index 36c7a2b..0000000
--- a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/node/management/cluster/AbstractClusterManager.java
+++ /dev/null
@@ -1,162 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-package org.apache.streampipes.manager.node.management.cluster;
-
-import com.fasterxml.jackson.core.JsonProcessingException;
-import org.apache.http.client.fluent.Request;
-import org.apache.http.entity.ContentType;
-import org.apache.streampipes.commons.exceptions.SpRuntimeException;
-import org.apache.streampipes.model.eventrelay.SpDataStreamRelayContainer;
-import org.apache.streampipes.model.node.NodeInfoDescription;
-import org.apache.streampipes.serializers.json.JacksonSerializer;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.IOException;
-import java.net.InetSocketAddress;
-import java.net.Socket;
-
-public abstract class AbstractClusterManager {
-    private static final Logger LOG = LoggerFactory.getLogger(AbstractClusterManager.class.getCanonicalName());
-
-    private static final String PROTOCOL = "http://";
-    private static final String COLON = ":";
-    private static final long RETRY_INTERVAL_MS = 5000;
-    private static final int CONNECT_TIMEOUT = 1000;
-
-    public enum RequestOptions {
-        GET,POST,PUT,DELETE
-    }
-
-    protected static <T> boolean syncWithNodeController(T element, NodeSyncOptions sync) {
-        switch (sync) {
-            case ACTIVATE_NODE:
-                return sync(element, "/api/v2/node/info/activate", RequestOptions.POST, false);
-            case DEACTIVATE_NODE:
-                return sync(element, "/api/v2/node/info/deactivate", RequestOptions.POST, false);
-            case UPDATE_NODE:
-                return sync(element, "/api/v2/node/info", RequestOptions.PUT, true);
-            case RESTART_RELAYS:
-                return sync(element, "/api/v2/node/stream/relay/invoke", RequestOptions.POST, true);
-            default:
-                return false;
-        }
-    }
-
-    private static <T> boolean sync(T element, String route, RequestOptions request, boolean withBody) {
-        boolean synced = false;
-
-        String body = "{}";
-        if (withBody) {
-            body = jackson(element);
-        }
-
-        String url = generateEndpoint(element, route);
-        LOG.info("Trying to sync with node controller=" + url);
-
-        boolean connected = false;
-        while (!connected) {
-            // call node controller REST endpoints
-            switch (request) {
-                case POST:
-                    connected = post(url, body);
-                    break;
-                case PUT :
-                    connected = put(url, body);
-                    break;
-            }
-
-            if (!connected) {
-                LOG.info("Retrying in {} seconds", (RETRY_INTERVAL_MS / 10000));
-                try {
-                    Thread.sleep(RETRY_INTERVAL_MS);
-                } catch (InterruptedException e) {
-                    e.printStackTrace();
-                }
-            }
-            synced = true;
-        }
-        LOG.info("Successfully synced with node controller=" + url);
-        return synced;
-    }
-
-    // Helpers
-
-    private static <T> String generateEndpoint(T desc, String route) {
-        if (desc instanceof NodeInfoDescription) {
-            NodeInfoDescription d = (NodeInfoDescription) desc;
-            return PROTOCOL
-                    + d.getHostname()
-                    + COLON
-                    + d.getPort()
-                    + route;
-        } else {
-            SpDataStreamRelayContainer d = (SpDataStreamRelayContainer) desc;
-            return PROTOCOL
-                    + d.getDeploymentTargetNodeHostname()
-                    + COLON
-                    + d.getDeploymentTargetNodePort()
-                    + route;
-        }
-    }
-
-    private static <T> String jackson(T desc) {
-        try {
-            return JacksonSerializer.getObjectMapper().writeValueAsString(desc);
-        } catch (JsonProcessingException e) {
-            throw new SpRuntimeException("Could not serialize node controller description");
-        }
-    }
-
-    private static String get(String url) {
-        try {
-            return Request.Get(url)
-                    .connectTimeout(CONNECT_TIMEOUT)
-                    .execute().returnContent().toString();
-        } catch (IOException e) {
-            throw new SpRuntimeException("Something went wrong during GET request to node controller", e);
-        }
-    }
-
-    private static boolean put(String url, String body) {
-        try {
-            Request.Put(url)
-                    .bodyString(body, ContentType.APPLICATION_JSON)
-                    .connectTimeout(CONNECT_TIMEOUT)
-                    .execute();
-            return true;
-        } catch (IOException e) {
-            e.printStackTrace();
-        }
-        return false;
-    }
-
-    private static boolean post(String url, String body) {
-        try {
-            Request.Post(url)
-                    .bodyString(body, ContentType.APPLICATION_JSON)
-                    .connectTimeout(CONNECT_TIMEOUT)
-                    .execute();
-            return true;
-        } catch (IOException e) {
-            e.printStackTrace();
-        }
-        return false;
-    }
-
-}
diff --git a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/node/management/cluster/AvailableNodesFetcher.java b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/node/management/cluster/AvailableNodesFetcher.java
deleted file mode 100644
index e0c42a8..0000000
--- a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/node/management/cluster/AvailableNodesFetcher.java
+++ /dev/null
@@ -1,66 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-package org.apache.streampipes.manager.node.management.cluster;
-
-import org.apache.http.client.fluent.Request;
-import org.apache.streampipes.container.util.ConsulUtil;
-import org.apache.streampipes.model.node.NodeInfoDescription;
-import org.apache.streampipes.serializers.json.JacksonSerializer;
-
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.List;
-
-import javax.ws.rs.core.MediaType;
-
-@Deprecated
-public class AvailableNodesFetcher {
-
-    public AvailableNodesFetcher() {
-
-    }
-
-    public List<NodeInfoDescription> fetchNodes() {
-        List<String> activeNodes = getActiveNodesFromConsul();
-        List<NodeInfoDescription> nodeInfos = new ArrayList<>();
-        activeNodes.forEach(activeNode -> {
-            try {
-                nodeInfos.add(fetchNodeInfo(activeNode));
-            } catch (IOException e) {
-                e.printStackTrace();
-            }
-        });
-
-        return nodeInfos;
-    }
-
-    private List<String> getActiveNodesFromConsul() {
-        return ConsulUtil.getActiveNodeEndpoints();
-    }
-
-    private NodeInfoDescription fetchNodeInfo(String activeNode) throws IOException {
-        String response = Request
-                .Get(activeNode + "/api/v2/node/info")
-                .addHeader("Accept", MediaType.APPLICATION_JSON)
-                .execute()
-                .returnContent()
-                .asString();
-
-        return JacksonSerializer.getObjectMapper().readValue(response, NodeInfoDescription.class);
-    }
-}
diff --git a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/node/management/healthcheck/ClusterHealthCheck.java b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/node/management/healthcheck/ClusterHealthCheck.java
deleted file mode 100644
index 9ca97c4..0000000
--- a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/node/management/healthcheck/ClusterHealthCheck.java
+++ /dev/null
@@ -1,82 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-package org.apache.streampipes.manager.node.management.healthcheck;
-
-import org.apache.streampipes.model.node.NodeInfoDescription;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.IOException;
-import java.net.InetSocketAddress;
-import java.net.Socket;
-
-public class ClusterHealthCheck {
-    private static final Logger LOG = LoggerFactory.getLogger(ClusterHealthCheck.class.getCanonicalName());
-
-    private static final String PROTOCOL = "http://";
-    private static final String COLON = ":";
-    private static final long RETRY_INTERVAL_MS = 5000;
-    private static final int CONNECT_TIMEOUT = 1000;
-
-    public static boolean check(NodeInfoDescription node) {
-        return healthCheck(node);
-    }
-
-    private static boolean healthCheck(NodeInfoDescription desc) {
-        String url = generateHealthCheckEndpoint(desc);
-        // call node controller REST endpoints
-        //return get(url).contains("PONG");
-
-        String nodeCtlId = desc.getNodeControllerId();
-        boolean isAlive = false;
-        String host = desc.getHostname();
-        int port = desc.getPort();
-
-        int retries = 5;
-        for (int i = 0 ; i < retries ; i++) {
-            try {
-                LOG.info("Trying to health check node controller={} ({})", nodeCtlId, i+1 + "/" + retries);
-                InetSocketAddress sa = new InetSocketAddress(host, port);
-                Socket ss = new Socket();
-                ss.connect(sa, 500);
-                ss.close();
-                if (ss.isConnected()) {
-                    isAlive = true;
-                    break;
-                }
-                Thread.sleep(1000);
-            } catch (IOException | InterruptedException e) {
-                continue;
-            }
-            isAlive = true;
-        }
-        LOG.info(isAlive ? "Successfully health check node controller=" + url :
-                "Could not perform health check node with controller=" + url);
-        return isAlive;
-    }
-
-    // Helpers
-
-    private static String generateHealthCheckEndpoint(NodeInfoDescription desc) {
-        return PROTOCOL
-                + desc.getHostname()
-                + COLON
-                + desc.getPort()
-                + "/healthy";
-    }
-}
diff --git a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/operations/Operations.java b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/operations/Operations.java
index 2b56c93..ec8e337 100644
--- a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/operations/Operations.java
+++ b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/operations/Operations.java
@@ -26,7 +26,9 @@ import org.apache.streampipes.manager.execution.pipeline.PipelineExecutor;
 import org.apache.streampipes.manager.execution.pipeline.PipelineStorageService;
 import org.apache.streampipes.manager.matching.DataSetGroundingSelector;
 import org.apache.streampipes.manager.matching.PipelineVerificationHandler;
+import org.apache.streampipes.manager.migration.MigrationPipelineGenerator;
 import org.apache.streampipes.manager.migration.PipelineElementMigrationHandler;
+import org.apache.streampipes.manager.migration.PipelineElementOffloadHandler;
 import org.apache.streampipes.manager.recommender.ElementRecommender;
 import org.apache.streampipes.manager.reconfiguration.PipelineElementReconfigurationHandler;
 import org.apache.streampipes.manager.remote.ContainerProvidedOptionsHandler;
@@ -38,11 +40,10 @@ import org.apache.streampipes.manager.topic.WildcardTopicGenerator;
 import org.apache.streampipes.manager.verification.extractor.TypeExtractor;
 import org.apache.streampipes.model.SpDataSet;
 import org.apache.streampipes.model.SpDataStream;
+import org.apache.streampipes.model.base.InvocableStreamPipesEntity;
 import org.apache.streampipes.model.client.endpoint.RdfEndpoint;
 import org.apache.streampipes.model.client.endpoint.RdfEndpointItem;
-import org.apache.streampipes.model.message.DataSetModificationMessage;
-import org.apache.streampipes.model.message.Message;
-import org.apache.streampipes.model.message.PipelineModificationMessage;
+import org.apache.streampipes.model.message.*;
 import org.apache.streampipes.model.pipeline.*;
 import org.apache.streampipes.model.runtime.RuntimeOptionsRequest;
 import org.apache.streampipes.model.runtime.RuntimeOptionsResponse;
@@ -196,4 +197,8 @@ public class Operations {
   public static PipelineOperationStatus handlePipelineElementReconfiguration(Pipeline reconfiguredPipeline) {
     return new PipelineElementReconfigurationHandler(reconfiguredPipeline).handleReconfiguration();
   }
+
+  public static Message handlePipelineElementOffloadRequest(InvocableStreamPipesEntity elementToOffload) {
+    return new PipelineElementOffloadHandler(elementToOffload).handleOffloading();
+  }
 }
diff --git a/streampipes-rest/pom.xml b/streampipes-rest/pom.xml
index 2b14a26..75a3a84 100644
--- a/streampipes-rest/pom.xml
+++ b/streampipes-rest/pom.xml
@@ -140,5 +140,11 @@
             <artifactId>powermock-api-mockito2</artifactId>
             <scope>test</scope>
         </dependency>
+        <dependency>
+            <groupId>org.apache.streampipes</groupId>
+            <artifactId>streampipes-node-management</artifactId>
+            <version>0.68.0-SNAPSHOT</version>
+            <scope>compile</scope>
+        </dependency>
     </dependencies>
 </project>
diff --git a/streampipes-rest/src/main/java/org/apache/streampipes/rest/api/INode.java b/streampipes-rest/src/main/java/org/apache/streampipes/rest/api/INodeManagement.java
similarity index 88%
rename from streampipes-rest/src/main/java/org/apache/streampipes/rest/api/INode.java
rename to streampipes-rest/src/main/java/org/apache/streampipes/rest/api/INodeManagement.java
index 18322d7..7a5f601 100644
--- a/streampipes-rest/src/main/java/org/apache/streampipes/rest/api/INode.java
+++ b/streampipes-rest/src/main/java/org/apache/streampipes/rest/api/INodeManagement.java
@@ -21,19 +21,12 @@ import org.apache.streampipes.model.node.NodeInfoDescription;
 
 import javax.ws.rs.core.Response;
 
-public interface INode {
-
+public interface INodeManagement {
     Response addNode(String username, NodeInfoDescription desc);
-
     Response updateNode(String username, String nodeControllerId, NodeInfoDescription desc);
-
     Response syncRemoteUpdateFromNodeController(String username, NodeInfoDescription desc);
-
     Response deleteNode(String username, String nodeControllerId);
-
-    Response changeNodeState(String action, String username, String nodeControllerId);
-
-    Response getAvailableNodes();
-
+    Response changeNodeCondition(String condition, String username, String nodeControllerId);
+    Response getOnlineNodes();
     Response getNodes();
 }
diff --git a/streampipes-rest/src/main/java/org/apache/streampipes/rest/impl/Node.java b/streampipes-rest/src/main/java/org/apache/streampipes/rest/impl/NodeManagementResource.java
similarity index 60%
rename from streampipes-rest/src/main/java/org/apache/streampipes/rest/impl/Node.java
rename to streampipes-rest/src/main/java/org/apache/streampipes/rest/impl/NodeManagementResource.java
index 5ccd75b..c9acd31 100644
--- a/streampipes-rest/src/main/java/org/apache/streampipes/rest/impl/Node.java
+++ b/streampipes-rest/src/main/java/org/apache/streampipes/rest/impl/NodeManagementResource.java
@@ -17,16 +17,13 @@
  */
 package org.apache.streampipes.rest.impl;
 
-import org.apache.streampipes.manager.migration.MigrationPipelineGenerator;
-import org.apache.streampipes.manager.node.StreamPipesClusterManager;
-import org.apache.streampipes.manager.operations.Operations;
-import org.apache.streampipes.model.base.InvocableStreamPipesEntity;
+
 import org.apache.streampipes.model.message.NotificationType;
 import org.apache.streampipes.model.message.Notifications;
+import org.apache.streampipes.model.node.NodeCondition;
 import org.apache.streampipes.model.node.NodeInfoDescription;
-import org.apache.streampipes.model.pipeline.Pipeline;
-import org.apache.streampipes.model.pipeline.PipelineOperationStatus;
-import org.apache.streampipes.rest.api.INode;
+import org.apache.streampipes.node.management.NodeManagement;
+import org.apache.streampipes.rest.api.INodeManagement;
 import org.apache.streampipes.rest.shared.annotation.JacksonSerialized;
 
 import javax.ws.rs.*;
@@ -34,10 +31,7 @@ import javax.ws.rs.core.MediaType;
 import javax.ws.rs.core.Response;
 
 @Path("/v2/users/{username}/nodes")
-public class Node extends AbstractRestResource implements INode {
-
-    private static final String ACTIVATE = "activate";
-    private static final String DEACTIVATE = "deactivate";
+public class NodeManagementResource extends AbstractRestResource implements INodeManagement {
 
     @POST
     @JacksonSerialized
@@ -45,7 +39,7 @@ public class Node extends AbstractRestResource implements INode {
     @Produces(MediaType.APPLICATION_JSON)
     @Override
     public Response addNode(@PathParam("username") String username, NodeInfoDescription desc) {
-        return statusMessage(StreamPipesClusterManager.addOrRejoin(desc));
+        return statusMessage(NodeManagement.addOrRejoin(desc));
     }
 
     @PUT
@@ -57,7 +51,7 @@ public class Node extends AbstractRestResource implements INode {
     public Response updateNode(@PathParam("username") String username,
                                @PathParam("nodeControllerId") String nodeControllerId,
                                NodeInfoDescription desc) {
-        return statusMessage(StreamPipesClusterManager.updateNode(desc));
+        return statusMessage(NodeManagement.updateNode(desc));
     }
 
     @POST
@@ -68,24 +62,20 @@ public class Node extends AbstractRestResource implements INode {
     @Override
     public Response syncRemoteUpdateFromNodeController(@PathParam("username") String username,
                                                        NodeInfoDescription desc) {
-        return statusMessage(StreamPipesClusterManager.syncRemoteNodeUpdateRequest(desc));
+        return statusMessage(NodeManagement.syncRemoteNodeUpdateRequest(desc));
     }
 
     @POST
     @JacksonSerialized
-    @Path("/{action}/{nodeControllerId}")
+    @Path("/{condition}/{nodeControllerId}")
     @Consumes(MediaType.APPLICATION_JSON)
     @Produces(MediaType.APPLICATION_JSON)
     @Override
-    public Response changeNodeState(@PathParam("action") String action,
-                                    @PathParam("username") String username,
-                                    @PathParam("nodeControllerId") String nodeControllerId) {
-        boolean success = false;
-        if (action.equals(ACTIVATE)) {
-            success = StreamPipesClusterManager.activateNode(nodeControllerId);
-        } else if (action.equals(DEACTIVATE)) {
-            success = StreamPipesClusterManager.deactivateNode(nodeControllerId);
-        }
+    public Response changeNodeCondition(@PathParam("condition") String condition,
+                                        @PathParam("username") String username,
+                                        @PathParam("nodeControllerId") String nodeControllerId) {
+        NodeCondition nodeCondition = NodeCondition.valueOf(condition.toUpperCase());
+        boolean success = NodeManagement.updateNodeCondition(nodeControllerId, nodeCondition);
         if (success) {
             return statusMessage(Notifications.success(NotificationType.OPERATION_SUCCESS));
         } else {
@@ -98,17 +88,17 @@ public class Node extends AbstractRestResource implements INode {
     @Override
     public Response deleteNode(@PathParam("username") String username,
                                @PathParam("nodeControllerId") String nodeControllerId) {
-        StreamPipesClusterManager.deleteNode(nodeControllerId);
+        NodeManagement.deleteNode(nodeControllerId);
         return statusMessage(Notifications.success(NotificationType.REMOVED_NODE));
     }
 
     @GET
-    @Path("/available")
+    @Path("/online")
     @JacksonSerialized
     @Produces(MediaType.APPLICATION_JSON)
     @Override
-    public Response getAvailableNodes() {
-        return ok(StreamPipesClusterManager.getAllActiveAndHealthyNodes());
+    public Response getOnlineNodes() {
+        return ok(NodeManagement.getOnlineNodes());
     }
 
     @GET
@@ -116,15 +106,6 @@ public class Node extends AbstractRestResource implements INode {
     @Produces(MediaType.APPLICATION_JSON)
     @Override
     public Response getNodes() {
-        return ok(StreamPipesClusterManager.getAllNodes());
-    }
-
-
-    @POST
-    @Path("/offload")
-    @Produces(MediaType.APPLICATION_JSON)
-    @JacksonSerialized
-    public Response migrateProcessorToOtherNode(InvocableStreamPipesEntity elementToMigrate) {
-        return statusMessage(StreamPipesClusterManager.handleOffloadRequest(elementToMigrate));
+        return ok(NodeManagement.getAllNodes());
     }
 }
diff --git a/streampipes-rest/src/main/java/org/apache/streampipes/rest/impl/PipelineResource.java b/streampipes-rest/src/main/java/org/apache/streampipes/rest/impl/PipelineResource.java
index d86439e..fb9b769 100644
--- a/streampipes-rest/src/main/java/org/apache/streampipes/rest/impl/PipelineResource.java
+++ b/streampipes-rest/src/main/java/org/apache/streampipes/rest/impl/PipelineResource.java
@@ -29,6 +29,7 @@ import org.apache.streampipes.commons.exceptions.*;
 import org.apache.streampipes.manager.execution.status.PipelineStatusManager;
 import org.apache.streampipes.manager.operations.Operations;
 import org.apache.streampipes.model.SpDataSet;
+import org.apache.streampipes.model.base.InvocableStreamPipesEntity;
 import org.apache.streampipes.model.client.exception.InvalidConnectionException;
 import org.apache.streampipes.model.message.Notification;
 import org.apache.streampipes.model.message.NotificationType;
@@ -305,4 +306,15 @@ public class PipelineResource extends AbstractAuthGuardedRestResource {
     }
   }
 
+  @POST
+  @Path("/offload")
+  @Produces(MediaType.APPLICATION_JSON)
+  @JacksonSerialized
+  @Operation(summary = "Offload pipeline elements to new node",
+          tags = {"Pipeline"})
+  public Response offloadRequestFromNodeController(InvocableStreamPipesEntity elementToOffload) {
+    //return statusMessage(StreamPipesClusterManager.handleOffloadRequest(elementToMigrate));
+    return statusMessage(Operations.handlePipelineElementOffloadRequest(elementToOffload));
+  }
+
 }
diff --git a/streampipes-storage-couchdb/src/main/java/org/apache/streampipes/storage/couchdb/impl/NodeInfoStorageImpl.java b/streampipes-storage-couchdb/src/main/java/org/apache/streampipes/storage/couchdb/impl/NodeInfoStorageImpl.java
index 5107bfa..243dff6 100644
--- a/streampipes-storage-couchdb/src/main/java/org/apache/streampipes/storage/couchdb/impl/NodeInfoStorageImpl.java
+++ b/streampipes-storage-couchdb/src/main/java/org/apache/streampipes/storage/couchdb/impl/NodeInfoStorageImpl.java
@@ -57,7 +57,7 @@ public class NodeInfoStorageImpl extends AbstractDao<NodeInfoDescription> implem
     public void deactivateNode(String nodeControllerId) {
         Optional<NodeInfoDescription> storedNode = getNode(nodeControllerId);
         if (storedNode.isPresent()) {
-            LOG.info("Deactivate node controller={} at url=http://{}:{}",
+            LOG.debug("Deactivate node controller={} at url=http://{}:{}",
                     storedNode.get().getNodeControllerId(),
                     storedNode.get().getHostname(),
                     storedNode.get().getPort());
@@ -70,7 +70,7 @@ public class NodeInfoStorageImpl extends AbstractDao<NodeInfoDescription> implem
     public void activateNode(String nodeControllerId) {
         Optional<NodeInfoDescription> storedNode = getNode(nodeControllerId);
         if (storedNode.isPresent()) {
-            LOG.info("Activate node controller={} at url=http://{}:{}",
+            LOG.debug("Activate node controller={} at url=http://{}:{}",
                     storedNode.get().getNodeControllerId(),
                     storedNode.get().getHostname(),
                     storedNode.get().getPort());
@@ -81,7 +81,7 @@ public class NodeInfoStorageImpl extends AbstractDao<NodeInfoDescription> implem
 
     @Override
     public void updateNode(NodeInfoDescription desc) {
-        LOG.info("Update node description for node id={}, url={}", desc.getNodeControllerId(),
+        LOG.debug("Update node description for node id={}, url={}", desc.getNodeControllerId(),
                 desc.getHostname() + ":" + desc.getPort());
 
         Optional<NodeInfoDescription> storedNode = getNode(desc.getNodeControllerId());
@@ -103,7 +103,7 @@ public class NodeInfoStorageImpl extends AbstractDao<NodeInfoDescription> implem
 
     @Override
     public void deleteNode(String nodeControllerId) {
-        LOG.info("Delete node with node id={}", nodeControllerId);
+        LOG.debug("Delete node with node id={}", nodeControllerId);
         Optional<NodeInfoDescription> storedNode = getNode(nodeControllerId);
 
         storedNode.ifPresent(nodeInfoDescription -> delete(nodeInfoDescription.getId()));
diff --git a/streampipes-vocabulary/src/main/java/org/apache/streampipes/vocabulary/StreamPipes.java b/streampipes-vocabulary/src/main/java/org/apache/streampipes/vocabulary/StreamPipes.java
index 62ba1eb..51a6cb9 100644
--- a/streampipes-vocabulary/src/main/java/org/apache/streampipes/vocabulary/StreamPipes.java
+++ b/streampipes-vocabulary/src/main/java/org/apache/streampipes/vocabulary/StreamPipes.java
@@ -362,7 +362,7 @@ public class StreamPipes {
   public static final String IS_ENCRYPTED = NS + "isEncrypted";
 
   public static final String DEPLOYMENT_DOCKER_CONTAINER = NS + "pipelineElementDockerContainer";
-  public static final String DEPLOYMENT_CONTAINER_IMAGE_URI = NS + "dockerContainerImageUri";
+  public static final String DEPLOYMENT_CONTAINER_IMAGE_TAG = NS + "dockerContainerImageTag";
   public static final String DEPLOYMENT_CONTAINER_NAME = NS + "dockerContainerName";
   public static final String DEPLOYMENT_CONTAINER_SERVICE_ID = NS + "dockerContainerServiceId";
   public static final String DEPLOYMENT_CONTAINER_PORTS = NS + "dockerContainerPorts";
@@ -370,6 +370,8 @@ public class StreamPipes {
   public static final String DEPLOYMENT_CONTAINER_LABELS = NS + "dockerContainerLabels";
   public static final String DEPLOYMENT_CONTAINER_VOLUMES = NS + "dockerContainerVolumes";
   public static final String DEPLOYMENT_CONTAINER_DEPENDENCIES = NS + "dockerContainerDependencies";
+  public static final String DEPLOYMENT_SUPPORTED_ARCHITECTURES = NS + "dockerContainerSupportedArchitectures";
+  public static final String DEPLOYMENT_SUPPORTED_OS_TYPES = NS + "dockerContainerSupportedOperatingSystemTypes";
 
   // UI Rendering
 
diff --git a/ui/src/app/configuration/node-configuration/node-configuration.component.html b/ui/src/app/configuration/node-configuration/node-configuration.component.html
index e94dcf2..ba8bdd4 100644
--- a/ui/src/app/configuration/node-configuration/node-configuration.component.html
+++ b/ui/src/app/configuration/node-configuration/node-configuration.component.html
@@ -48,6 +48,7 @@
                                 <mat-menu #menu="matMenu" style="font-size: 10pt">
                                     <button mat-menu-item
                                             *ngIf="!node.active"
+                                            [disabled]="node.condition === 'OFFLINE' || node.condition === 'CONNECTING'"
                                             (click)="changeNodeState(node, true)">
                                         <mat-icon
                                                 [ngClass]="node.active ? 'node-activated' : 'node-inactive'"
@@ -59,6 +60,7 @@
                                     </button>
                                     <button mat-menu-item
                                             *ngIf="node.active"
+                                            [disabled]="node.condition === 'OFFLINE' || node.condition === 'CONNECTING'"
                                             (click)="changeNodeState(node, false)">
                                         <mat-icon
                                                 [ngClass]="node.active ? 'node-inactive' : 'node-deactivated'"
@@ -81,6 +83,7 @@
 <!--                                        <span>evict</span>-->
 <!--                                    </button>-->
                                     <button mat-menu-item
+                                            [disabled]="!node.active || node.condition === 'OFFLINE'"
                                             matTooltip="Edit node configuration"
                                             matTooltipPosition="above"
                                             (click)="settings(node)">
@@ -104,9 +107,21 @@
                             <mat-card-subtitle style="font-size: 10pt">
                                 {{node.nodeControllerId}} | <b>{{node.nodeResources.softwareResource.os}}</b>
                                 <div style="color: lightgrey;">
-                                    last contact: {{currentTimestamp}}
-                                    <span *ngIf="node.active" class="span-node-active">active</span>
-                                    <span *ngIf="!node.active" class="span-node-inactive">inactive</span>
+                                    last contact: {{this.toHumanReadableDate(node.lastHeartBeatTime)}}
+                                    <span *ngIf="node.condition === 'CONNECTING' && node.active"
+                                          class="span-node-active">
+                                        connecting</span>
+                                    <span *ngIf="node.condition === 'ONLINE' && node.active" class="span-node-active">
+                                        online</span>
+                                    <span *ngIf="node.condition === 'ONLINE' && !node.active"
+                                          class="span-node-inactive">
+                                        deactivated</span>
+                                    <span *ngIf="node.condition === 'OFFLINE' && node.active"
+                                          class="span-node-inactive">offline
+                                    </span>
+                                    <span *ngIf="node.condition === 'OFFLINE' && !node.active"
+                                          class="span-node-inactive">lost
+                                    </span>
                                 </div>
                             </mat-card-subtitle>
                         </mat-card-header>
diff --git a/ui/src/app/configuration/node-configuration/node-configuration.component.scss b/ui/src/app/configuration/node-configuration/node-configuration.component.scss
index 4c0a4ff..a51354f 100644
--- a/ui/src/app/configuration/node-configuration/node-configuration.component.scss
+++ b/ui/src/app/configuration/node-configuration/node-configuration.component.scss
@@ -76,6 +76,8 @@ li {
     background: $sp-color-primary;
     border: 1px solid $sp-color-primary;
     padding: 2px;
+    margin-left: 2px;
+    margin-right: 2px;
 }
 
 .span-node-inactive{
@@ -85,6 +87,8 @@ li {
     background: white;
     border: 1px solid #f1c40f;
     padding: 2px;
+    margin-left: 2px;
+    margin-right: 2px;
 }
 
 .div-sa-tag{
diff --git a/ui/src/app/configuration/node-configuration/node-configuration.component.ts b/ui/src/app/configuration/node-configuration/node-configuration.component.ts
index 0e0369e..5e2ad46 100644
--- a/ui/src/app/configuration/node-configuration/node-configuration.component.ts
+++ b/ui/src/app/configuration/node-configuration/node-configuration.component.ts
@@ -17,12 +17,7 @@
  */
 
 import {Component, OnInit, ViewEncapsulation} from "@angular/core";
-import {
-    ContainerRuntime, ContainerRuntimeUnion, DockerContainerRuntime,
-    FieldDeviceAccessResource,
-    NodeInfoDescription,
-    NvidiaContainerRuntime, UnnamedStreamPipesEntity
-} from "../../core-model/gen/streampipes-model";
+import {FieldDeviceAccessResource, NodeInfoDescription} from "../../core-model/gen/streampipes-model";
 import {MatSnackBar} from "@angular/material/snack-bar";
 import {zip} from "rxjs";
 import {DialogService} from "../../core-ui/dialog/base-dialog/base-dialog.service";
@@ -56,6 +51,7 @@ export class NodeConfigurationComponent implements OnInit{
     locationTags: String[];
     fieldDevices: FieldDeviceAccessResource[];
     currentTimestamp: string;
+    runningPipelines: boolean = false;
 
     constructor(private nodeService: NodeService,
                 private dataMarketplaceService: DataMarketplaceService,
@@ -65,7 +61,6 @@ export class NodeConfigurationComponent implements OnInit{
 
     ngOnInit() {
         this.getNodes();
-        this.getDate();
     }
 
     getNodes() {
@@ -157,6 +152,22 @@ export class NodeConfigurationComponent implements OnInit{
         })
     }
 
+    checkNodeForSinks(nodeControllerId: string) {
+        return new Promise<boolean>(resolve => {
+            var detectedSinks = false;
+            zip(this.pipelineService.getOwnPipelines(),
+                this.pipelineService.getSystemPipelines()).subscribe(allPipelines => {
+                allPipelines.forEach((pipelines, index) => {
+                    pipelines.forEach(pipeline => {
+                        detectedSinks = pipeline.running && pipeline
+                            .actions.some(action => action.deploymentTargetNodeId === nodeControllerId);
+                    })
+                })
+                resolve(detectedSinks);
+            });
+        });
+    }
+
     openSnackBar(message: string) {
         this._snackBar.open(message, "close", {
             duration: 3000,
@@ -190,4 +201,9 @@ export class NodeConfigurationComponent implements OnInit{
         this.currentTimestamp = new Date().toLocaleTimeString(['en-US'],
             { hour: '2-digit', minute: "2-digit", second: "2-digit" });
     }
+
+    toHumanReadableDate(heartbeat: number){
+        return new Date(heartbeat).toLocaleTimeString(['en-US'],
+            { hour: '2-digit', minute: "2-digit", second: "2-digit" });
+    }
 }
\ No newline at end of file
diff --git a/ui/src/app/connect/components/data-marketplace/data-marketplace.component.ts b/ui/src/app/connect/components/data-marketplace/data-marketplace.component.ts
index ed7c992..f97a8dc 100644
--- a/ui/src/app/connect/components/data-marketplace/data-marketplace.component.ts
+++ b/ui/src/app/connect/components/data-marketplace/data-marketplace.component.ts
@@ -246,10 +246,10 @@ export class DataMarketplaceComponent implements OnInit {
     async filterForActiveNodes(adapters: AdapterDescriptionUnion[]) {
         return new Promise<AdapterDescriptionUnion[]>(resolve => {
             var activeAdapters: AdapterDescriptionUnion[] = [];
-            this.nodeService.getAvailableNodes().subscribe(activeNodes => {
+            this.nodeService.getOnlineNodes().subscribe(activeNodes => {
                 activeNodes.forEach(nodes => {
                     adapters.forEach(adapter => {
-                        if (nodes.active && nodes.nodeControllerId === adapter.deploymentTargetNodeId) {
+                        if (nodes.condition === 'ONLINE' && nodes.nodeControllerId === adapter.deploymentTargetNodeId) {
                             activeAdapters.push(adapter);
                         }
                     })
diff --git a/ui/src/app/core-model/gen/streampipes-model.ts b/ui/src/app/core-model/gen/streampipes-model.ts
index 5968a83..67b2985 100644
--- a/ui/src/app/core-model/gen/streampipes-model.ts
+++ b/ui/src/app/core-model/gen/streampipes-model.ts
@@ -19,7 +19,7 @@
 /* tslint:disable */
 /* eslint-disable */
 // @ts-nocheck
-// Generated using typescript-generator version 2.27.744 on 2021-04-23 19:28:53.
+// Generated using typescript-generator version 2.27.744 on 2021-04-30 09:41:31.
 
 export class AbstractStreamPipesEntity {
     "@class": "org.apache.streampipes.model.base.AbstractStreamPipesEntity" | "org.apache.streampipes.model.base.NamedStreamPipesEntity" | "org.apache.streampipes.model.connect.adapter.AdapterDescription" | "org.apache.streampipes.model.connect.adapter.AdapterSetDescription" | "org.apache.streampipes.model.connect.adapter.GenericAdapterSetDescription" | "org.apache.streampipes.model.connect.adapter.SpecificAdapterSetDescription" | "org.apache.streampipes.model.connect.adapter.AdapterStre [...]
@@ -1224,9 +1224,11 @@ export class DeploymentContainer extends UnnamedStreamPipesEntity {
     containerPorts: string[];
     dependsOnContainers: string[];
     envVars: string[];
-    imageUri: string;
+    imageTag: string;
     labels: { [index: string]: string };
     serviceId: string;
+    supportedArchitectures: string[];
+    supportedOperatingSystemTypes: string[];
     volumes: string[];
 
     static fromData(data: DeploymentContainer, target?: DeploymentContainer): DeploymentContainer {
@@ -1235,13 +1237,15 @@ export class DeploymentContainer extends UnnamedStreamPipesEntity {
         }
         const instance = target || new DeploymentContainer();
         super.fromData(data, instance);
-        instance.imageUri = data.imageUri;
+        instance.imageTag = data.imageTag;
         instance.containerName = data.containerName;
         instance.serviceId = data.serviceId;
         instance.containerPorts = __getCopyArrayFn(__identity<string>())(data.containerPorts);
         instance.envVars = __getCopyArrayFn(__identity<string>())(data.envVars);
         instance.labels = __getCopyObjectFn(__identity<string>())(data.labels);
         instance.volumes = __getCopyArrayFn(__identity<string>())(data.volumes);
+        instance.supportedArchitectures = __getCopyArrayFn(__identity<string>())(data.supportedArchitectures);
+        instance.supportedOperatingSystemTypes = __getCopyArrayFn(__identity<string>())(data.supportedOperatingSystemTypes);
         instance.dependsOnContainers = __getCopyArrayFn(__identity<string>())(data.dependsOnContainers);
         return instance;
     }
@@ -2296,7 +2300,9 @@ export class NodeInfoDescription extends UnnamedStreamPipesEntity {
     _id: string;
     _rev: string;
     active: boolean;
+    condition: NodeCondition;
     hostname: string;
+    lastHeartBeatTime: number;
     nodeBroker: NodeBrokerDescription;
     nodeControllerId: string;
     nodeResources: NodeResource;
@@ -2312,6 +2318,8 @@ export class NodeInfoDescription extends UnnamedStreamPipesEntity {
         const instance = target || new NodeInfoDescription();
         super.fromData(data, instance);
         instance.active = data.active;
+        instance.condition = data.condition;
+        instance.lastHeartBeatTime = data.lastHeartBeatTime;
         instance.nodeControllerId = data.nodeControllerId;
         instance.hostname = data.hostname;
         instance.port = data.port;
@@ -3426,6 +3434,8 @@ export type MappingPropertyUnion = MappingPropertyNary | MappingPropertyUnary;
 
 export type MeasurementPropertyUnion = EventPropertyQualityDefinition | EventStreamQualityDefinition;
 
+export type NodeCondition = "ACTIVE" | "INACTIVE" | "ONLINE" | "OFFLINE" | "CREATED" | "CONNECTING" | "PAUSED";
+
 export type NodeResourceRequirementUnion = Hardware;
 
 export type OneOfStaticPropertyUnion = RuntimeResolvableOneOfStaticProperty;
diff --git a/ui/src/app/editor/dialog/migrate-pipeline-processors/migrate-pipeline-processors.component.ts b/ui/src/app/editor/dialog/migrate-pipeline-processors/migrate-pipeline-processors.component.ts
index c94f7bb..cbd6276 100644
--- a/ui/src/app/editor/dialog/migrate-pipeline-processors/migrate-pipeline-processors.component.ts
+++ b/ui/src/app/editor/dialog/migrate-pipeline-processors/migrate-pipeline-processors.component.ts
@@ -116,7 +116,7 @@ export class MigratePipelineProcessorsComponent implements OnInit {
   }
 
   loadAndPrepareEdgeNodes() {
-    this.nodeService.getAvailableNodes().subscribe(response => {
+    this.nodeService.getOnlineNodes().subscribe(response => {
       this.edgeNodes = response;
       this.addAppIds(this.tmpPipeline.sepas, this.edgeNodes);
       this.addAppIds(this.tmpPipeline.actions, this.edgeNodes);
diff --git a/ui/src/app/editor/dialog/save-pipeline/save-pipeline.component.ts b/ui/src/app/editor/dialog/save-pipeline/save-pipeline.component.ts
index 7373f67..50d6a09 100644
--- a/ui/src/app/editor/dialog/save-pipeline/save-pipeline.component.ts
+++ b/ui/src/app/editor/dialog/save-pipeline/save-pipeline.component.ts
@@ -98,8 +98,6 @@ export class SavePipelineComponent implements OnInit {
   ngOnInit() {
     this.tmpPipeline = this.deepCopy(this.pipeline);
 
-    console.log(this.pipeline);
-
     this.priorityForm = this.formBuilder.group({
       priorityForm: [null, Validators.required]
     });
@@ -186,7 +184,7 @@ export class SavePipelineComponent implements OnInit {
         this.deploymentOptions[processor.appId] = [];
         this.deploymentOptions[processor.appId].push(this.makeDefaultNodeInfo());
 
-        this.nodeService.getAvailableNodes().subscribe((response : NodeInfoDescription []) => {
+        this.nodeService.getOnlineNodes().subscribe((response : NodeInfoDescription []) => {
           this.edgeNodes = response;
           this.edgeNodes.forEach(nodeInfo => {
             // only show nodes that actually have supported pipeline elements registered
@@ -224,6 +222,18 @@ export class SavePipelineComponent implements OnInit {
         })
       })
 
+      // this.tmpPipeline.actions.forEach(actions => {
+      //   this.deploymentOptions[actions.appId] = [];
+      //
+      //   filteredNodes.forEach(filteredNode => {
+      //
+      //     if (filteredNode.supportedElements.length != 0 &&
+      //         filteredNode.supportedElements.some(appId => appId === actions.appId)) {
+      //       this.deploymentOptions[actions.appId].push(filteredNode);
+      //     }
+      //   })
+      // })
+
     } else {
       this.addAppIds(this.tmpPipeline.sepas, this.edgeNodes);
       this.addAppIds(this.tmpPipeline.actions, this.edgeNodes);
@@ -238,14 +248,15 @@ export class SavePipelineComponent implements OnInit {
       this.deploymentOptions[processor.appId].push(this.makeDefaultNodeInfo());
     });
 
-    // this.tmpPipeline.actions.forEach(p => {
-    //   p.deploymentTargetNodeId = "default";
-    //   // this.deploymentOptions[p.appId].push(this.makeDefaultNodeInfo());
-    // });
+    this.tmpPipeline.actions.forEach(action => {
+      action.deploymentTargetNodeId = "default";
+      this.deploymentOptions[action.appId] = []
+      this.deploymentOptions[action.appId].push(this.makeDefaultNodeInfo());
+    });
   }
 
   loadAndPrepareEdgeNodes() {
-    this.nodeService.getAvailableNodes().subscribe((response : NodeInfoDescription []) => {
+    this.nodeService.getOnlineNodes().subscribe((response : NodeInfoDescription []) => {
       this.edgeNodes = response;
       this.addAppIds(this.tmpPipeline.sepas, this.edgeNodes);
       this.addAppIds(this.tmpPipeline.actions, this.edgeNodes);
@@ -256,17 +267,17 @@ export class SavePipelineComponent implements OnInit {
     pipelineElements.forEach(p => {
       this.deploymentOptions[p.appId] = [];
 
-      // if (p instanceof DataSinkInvocation) {
-      //   if (p.deploymentTargetNodeId == null) {
-      //     p.deploymentTargetNodeId = "default";
-      //   }
-      //   this.deploymentOptions[p.appId].push(this.makeDefaultNodeInfo());
-      // }
-
-      if (p.deploymentTargetNodeId == null) {
-        p.deploymentTargetNodeId = "default";
+      if (p instanceof DataSinkInvocation) {
+        if (p.deploymentTargetNodeId == null) {
+          p.deploymentTargetNodeId = "default";
+        }
+        this.deploymentOptions[p.appId].push(this.makeDefaultNodeInfo());
       }
-      this.deploymentOptions[p.appId].push(this.makeDefaultNodeInfo());
+
+      // if (p.deploymentTargetNodeId == null) {
+      //   p.deploymentTargetNodeId = "default";
+      // }
+      // this.deploymentOptions[p.appId].push(this.makeDefaultNodeInfo());
 
       edgeNodes.forEach(nodeInfo => {
         // only show nodes that actually have supported pipeline elements registered
@@ -333,7 +344,7 @@ export class SavePipelineComponent implements OnInit {
       } else {
         this.tmpPipeline.priorityScore = 0;
       }
-      if (this.selectedNodeTags.length > 0 && this.selectedPipelineExecutionPolicy === "custom") {
+      if (this.selectedNodeTags?.length > 0 && this.selectedPipelineExecutionPolicy === "custom") {
         this.tmpPipeline.nodeTags = this.selectedNodeTags;
       } else {
         this.tmpPipeline.nodeTags = null;
@@ -353,7 +364,7 @@ export class SavePipelineComponent implements OnInit {
       } else {
         this.tmpPipeline.priorityScore = 0;
       }
-      if (this.selectedNodeTags.length > 0 && this.selectedPipelineExecutionPolicy === "custom") {
+      if (this.selectedNodeTags?.length > 0 && this.selectedPipelineExecutionPolicy === "custom") {
         this.tmpPipeline.nodeTags = this.selectedNodeTags;
       } else {
         this.tmpPipeline.nodeTags = null;
@@ -405,6 +416,8 @@ export class SavePipelineComponent implements OnInit {
     if (value == "custom") {
       this.panelOpenState = true;
       this.disableNodeSelection.setValue(false);
+      //this.addAppIds(this.tmpPipeline.sepas, this.edgeNodes);
+      //this.addAppIds(this.tmpPipeline.actions, this.edgeNodes);
       // use same policy for initial mapping
       this.applyLocalityAwarePolicy()
     } else if (value == "locality-aware") {
diff --git a/ui/src/app/platform-services/apis/node.service.ts b/ui/src/app/platform-services/apis/node.service.ts
index 921edb5..84fb269 100644
--- a/ui/src/app/platform-services/apis/node.service.ts
+++ b/ui/src/app/platform-services/apis/node.service.ts
@@ -39,8 +39,8 @@ export class NodeService {
             }));
     }
 
-    getAvailableNodes(): Observable<NodeInfoDescription[]> {
-        return this.http.get(this.platformServicesCommons.authUserBasePath() + "/nodes/available")
+    getOnlineNodes(): Observable<NodeInfoDescription[]> {
+        return this.http.get(this.platformServicesCommons.authUserBasePath() + "/nodes/online")
             .pipe(map(response => {
                 return response as NodeInfoDescription[];
             }));
@@ -54,14 +54,14 @@ export class NodeService {
     }
 
     activateNode(nodeControllerId: string): Observable<Message> {
-        return this.http.post(this.platformServicesCommons.authUserBasePath() + '/nodes/activate/' + nodeControllerId, {})
+        return this.http.post(this.platformServicesCommons.authUserBasePath() + '/nodes/active/' + nodeControllerId, {})
             .pipe(map(response => {
                 return Message.fromData(response as Message);
             }));
     }
 
     deactivateNode(nodeControllerId: string): Observable<Message> {
-        return this.http.post(this.platformServicesCommons.authUserBasePath() + '/nodes/deactivate/' + nodeControllerId, {})
+        return this.http.post(this.platformServicesCommons.authUserBasePath() + '/nodes/inactive/' + nodeControllerId, {})
             .pipe(map(response => {
                 return Message.fromData(response as Message);
             }));

[incubator-streampipes] 02/02: add skeleton for node resource management

Posted by wi...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

wiener pushed a commit to branch edge-extensions
in repository https://gitbox.apache.org/repos/asf/incubator-streampipes.git

commit 3b5901be5c8e27f29dca8d04bbc8f768e7d24cd8
Author: Patrick Wiener <wi...@fzi.de>
AuthorDate: Mon May 3 17:22:19 2021 +0200

    add skeleton for node resource management
---
 .../backend/StreamPipesBackendApplication.java     |  5 +-
 .../model/node/monitor}/ResourceMetrics.java       |  2 +-
 .../api/NodeInfoDescriptionResource.java           |  1 -
 .../management/resource/ResourceManager.java       |  2 +-
 .../node/management/NodeManagement.java            | 43 ++++++++---
 .../{health/HealthCheck.java => NodeMonitor.java}  | 10 +--
 .../monitor/health/ClusterHealthCheckMonitor.java  |  2 +-
 .../operation/monitor/health/NodeHealthCheck.java  | 29 +++----
 .../monitor/resource/ClusterResourceMonitor.java   | 65 ++++++++++++++++
 .../NodeResourceCollector.java}                    | 49 ++++++------
 .../monitor/resources/ClusterResourceMonitor.java  | 90 ----------------------
 .../migration/MigrationPipelineGenerator.java      |  2 +-
 .../rest/impl/NodeManagementResource.java          | 14 ++--
 13 files changed, 157 insertions(+), 157 deletions(-)

diff --git a/streampipes-backend/src/main/java/org/apache/streampipes/backend/StreamPipesBackendApplication.java b/streampipes-backend/src/main/java/org/apache/streampipes/backend/StreamPipesBackendApplication.java
index c5ca79b..95263dcb 100644
--- a/streampipes-backend/src/main/java/org/apache/streampipes/backend/StreamPipesBackendApplication.java
+++ b/streampipes-backend/src/main/java/org/apache/streampipes/backend/StreamPipesBackendApplication.java
@@ -23,6 +23,7 @@ import org.apache.shiro.web.servlet.ShiroFilter;
 import org.apache.streampipes.manager.operations.Operations;
 import org.apache.streampipes.model.pipeline.Pipeline;
 import org.apache.streampipes.model.pipeline.PipelineOperationStatus;
+import org.apache.streampipes.node.management.NodeManagement;
 import org.apache.streampipes.node.management.operation.monitor.health.ClusterHealthCheckMonitor;
 import org.apache.streampipes.rest.notifications.NotificationListener;
 import org.apache.streampipes.storage.management.StorageDispatcher;
@@ -61,8 +62,8 @@ public class StreamPipesBackendApplication {
     ScheduledExecutorService executorService = Executors.newSingleThreadScheduledExecutor();
     executorService.schedule(this::startAllPreviouslyStoppedPipelines, 5, TimeUnit.SECONDS);
 
-    LOG.info("Starting StreamPipes cluster monitor...");
-    ClusterHealthCheckMonitor.getInstance().run();
+    LOG.info("Starting StreamPipes node management ...");
+    NodeManagement.getInstance().init();
   }
 
   @PreDestroy
diff --git a/streampipes-node-controller/src/main/java/org/apache/streampipes/node/controller/management/resource/model/ResourceMetrics.java b/streampipes-model/src/main/java/org/apache/streampipes/model/node/monitor/ResourceMetrics.java
similarity index 97%
rename from streampipes-node-controller/src/main/java/org/apache/streampipes/node/controller/management/resource/model/ResourceMetrics.java
rename to streampipes-model/src/main/java/org/apache/streampipes/model/node/monitor/ResourceMetrics.java
index ddaaff6..ac33845 100644
--- a/streampipes-node-controller/src/main/java/org/apache/streampipes/node/controller/management/resource/model/ResourceMetrics.java
+++ b/streampipes-model/src/main/java/org/apache/streampipes/model/node/monitor/ResourceMetrics.java
@@ -15,7 +15,7 @@
  * limitations under the License.
  *
  */
-package org.apache.streampipes.node.controller.management.resource.model;
+package org.apache.streampipes.model.node.monitor;
 
 public class ResourceMetrics {
 
diff --git a/streampipes-node-controller/src/main/java/org/apache/streampipes/node/controller/api/NodeInfoDescriptionResource.java b/streampipes-node-controller/src/main/java/org/apache/streampipes/node/controller/api/NodeInfoDescriptionResource.java
index 4e4aa76..d37e246 100644
--- a/streampipes-node-controller/src/main/java/org/apache/streampipes/node/controller/api/NodeInfoDescriptionResource.java
+++ b/streampipes-node-controller/src/main/java/org/apache/streampipes/node/controller/api/NodeInfoDescriptionResource.java
@@ -17,7 +17,6 @@
  */
 package org.apache.streampipes.node.controller.api;
 
-import org.apache.streampipes.model.node.NodeCondition;
 import org.apache.streampipes.model.node.NodeInfoDescription;
 import org.apache.streampipes.node.controller.management.node.NodeManager;
 import org.apache.streampipes.node.controller.management.relay.DataStreamRelayManager;
diff --git a/streampipes-node-controller/src/main/java/org/apache/streampipes/node/controller/management/resource/ResourceManager.java b/streampipes-node-controller/src/main/java/org/apache/streampipes/node/controller/management/resource/ResourceManager.java
index e0d185b..737f521 100644
--- a/streampipes-node-controller/src/main/java/org/apache/streampipes/node/controller/management/resource/ResourceManager.java
+++ b/streampipes-node-controller/src/main/java/org/apache/streampipes/node/controller/management/resource/ResourceManager.java
@@ -24,7 +24,7 @@ import org.apache.streampipes.node.controller.management.offloading.AutoOffloadi
 import org.apache.streampipes.node.controller.management.offloading.policies.Comparator;
 import org.apache.streampipes.node.controller.management.offloading.policies.MultiOccurrenceThresholdViolationPolicy;
 import org.apache.streampipes.node.controller.management.offloading.policies.OffloadingPolicy;
-import org.apache.streampipes.node.controller.management.resource.model.ResourceMetrics;
+import org.apache.streampipes.model.node.monitor.ResourceMetrics;
 import org.apache.streampipes.node.controller.management.resource.utils.DiskSpace;
 import org.apache.streampipes.node.controller.management.resource.utils.ResourceUtils;
 import org.apache.streampipes.serializers.json.JacksonSerializer;
diff --git a/streampipes-node-management/src/main/java/org/apache/streampipes/node/management/NodeManagement.java b/streampipes-node-management/src/main/java/org/apache/streampipes/node/management/NodeManagement.java
index c4add7d..db5dbcb 100644
--- a/streampipes-node-management/src/main/java/org/apache/streampipes/node/management/NodeManagement.java
+++ b/streampipes-node-management/src/main/java/org/apache/streampipes/node/management/NodeManagement.java
@@ -25,6 +25,7 @@ import org.apache.streampipes.model.message.Notifications;
 import org.apache.streampipes.model.node.NodeCondition;
 import org.apache.streampipes.model.node.NodeInfoDescription;
 import org.apache.streampipes.node.management.operation.monitor.health.ClusterHealthCheckMonitor;
+import org.apache.streampipes.node.management.operation.monitor.resource.ClusterResourceMonitor;
 import org.apache.streampipes.node.management.operation.relay.RelayHandler;
 import org.apache.streampipes.node.management.operation.sync.SynchronizationFactory;
 import org.apache.streampipes.node.management.operation.sync.SynchronizationType;
@@ -36,17 +37,41 @@ import java.util.List;
 import java.util.Optional;
 
 public class NodeManagement {
+
     private static final Logger LOG = LoggerFactory.getLogger(NodeManagement.class.getCanonicalName());
+    private static NodeManagement instance = null;
+
+    private NodeManagement() {
+    }
+
+    public static NodeManagement getInstance() {
+        if (instance == null) {
+            synchronized (NodeManagement.class) {
+                if (instance == null)
+                    instance = new NodeManagement();
+            }
+        }
+        return instance;
+    }
+
+
+    public void init() {
+        LOG.info("Starting StreamPipes healthcheck monitor...");
+        ClusterHealthCheckMonitor.getInstance().run();
+
+        LOG.info("Starting StreamPipes resource monitor...");
+        ClusterResourceMonitor.getInstance().run();
+    }
 
-    public static List<NodeInfoDescription> getOnlineNodes() {
+    public List<NodeInfoDescription> getOnlineNodes() {
         return ClusterHealthCheckMonitor.getInstance().getAllHealthyNodes();
     }
 
-    public static List<NodeInfoDescription> getAllNodes() {
+    public List<NodeInfoDescription> getAllNodes() {
         return StorageUtils.getAllNodes();
     }
 
-    public static boolean updateNodeCondition(String nodeControllerId, NodeCondition condition) {
+    public boolean updateNodeCondition(String nodeControllerId, NodeCondition condition) {
         Optional<NodeInfoDescription> storedNode = StorageUtils.getNode(nodeControllerId);
         boolean status = false;
 
@@ -69,7 +94,7 @@ public class NodeManagement {
         return status;
     }
 
-    public static Message updateNode(NodeInfoDescription desc) {
+    public Message updateNode(NodeInfoDescription desc) {
         boolean successfullyUpdated = SynchronizationFactory.synchronize(desc, SynchronizationType.UPDATE_NODE);
         if (successfullyUpdated) {
             StorageUtils.updateNode(desc);
@@ -78,16 +103,16 @@ public class NodeManagement {
         return Notifications.error("Could not update node");
     }
 
-    public static void deleteNode(String nodeControllerId) {
+    public void deleteNode(String nodeControllerId) {
         StorageUtils.deleteNode(nodeControllerId);
     }
 
-    public static Message syncRemoteNodeUpdateRequest(NodeInfoDescription desc) {
+    public Message syncRemoteNodeUpdateRequest(NodeInfoDescription desc) {
         StorageUtils.updateNode(desc);
         return Notifications.success("Node updated");
     }
 
-    public static Message addOrRejoin(NodeInfoDescription desc) {
+    public Message addOrRejoin(NodeInfoDescription desc) {
         Optional<NodeInfoDescription> latestDesc = StorageUtils.getLatestNodeOrElseEmpty(desc.getNodeControllerId());
 
         boolean alreadyRegistered = false;
@@ -104,7 +129,7 @@ public class NodeManagement {
         }
     }
 
-    private static Message addNewNode(NodeInfoDescription desc) throws RuntimeException {
+    private Message addNewNode(NodeInfoDescription desc) throws RuntimeException {
         try {
             StorageUtils.storeNode(desc);
             LOG.info("New cluster node successfully joined http://{}:{}", desc.getHostname(), desc.getPort());
@@ -114,7 +139,7 @@ public class NodeManagement {
         }
     }
 
-    private static Message rejoinAndSyncNode(NodeInfoDescription desc) {
+    private Message rejoinAndSyncNode(NodeInfoDescription desc) {
         LOG.info("Sync latest node description to http://{}:{}", desc.getHostname(), desc.getPort());
         boolean success = SynchronizationFactory.synchronize(desc, SynchronizationType.UPDATE_NODE);
         if (success) {
diff --git a/streampipes-node-management/src/main/java/org/apache/streampipes/node/management/operation/monitor/health/HealthCheck.java b/streampipes-node-management/src/main/java/org/apache/streampipes/node/management/operation/monitor/NodeMonitor.java
similarity index 83%
rename from streampipes-node-management/src/main/java/org/apache/streampipes/node/management/operation/monitor/health/HealthCheck.java
rename to streampipes-node-management/src/main/java/org/apache/streampipes/node/management/operation/monitor/NodeMonitor.java
index 784cb9c..f54ce73 100644
--- a/streampipes-node-management/src/main/java/org/apache/streampipes/node/management/operation/monitor/health/HealthCheck.java
+++ b/streampipes-node-management/src/main/java/org/apache/streampipes/node/management/operation/monitor/NodeMonitor.java
@@ -15,13 +15,11 @@
  * limitations under the License.
  *
  */
-package org.apache.streampipes.node.management.operation.monitor.health;
-
-import org.apache.streampipes.model.NodeHealthStatus;
+package org.apache.streampipes.node.management.operation.monitor;
 
 import java.util.concurrent.Callable;
 
-public interface HealthCheck {
-    Callable<NodeHealthStatus> nodeHealthStatusCallable();
-    NodeHealthStatus execute();
+public interface NodeMonitor<T> {
+    Callable<T> monitoringCallable();
+    T execute();
 }
diff --git a/streampipes-node-management/src/main/java/org/apache/streampipes/node/management/operation/monitor/health/ClusterHealthCheckMonitor.java b/streampipes-node-management/src/main/java/org/apache/streampipes/node/management/operation/monitor/health/ClusterHealthCheckMonitor.java
index 70c8fba..6fa823d 100644
--- a/streampipes-node-management/src/main/java/org/apache/streampipes/node/management/operation/monitor/health/ClusterHealthCheckMonitor.java
+++ b/streampipes-node-management/src/main/java/org/apache/streampipes/node/management/operation/monitor/health/ClusterHealthCheckMonitor.java
@@ -162,7 +162,7 @@ public class ClusterHealthCheckMonitor {
 
     private void persistNodeUpdate(NodeInfoDescription node, boolean syncWithNodeController) {
         if (syncWithNodeController) {
-            NodeManagement.updateNode(node);
+            NodeManagement.getInstance().updateNode(node);
         } else {
             StorageUtils.updateNode(node);
         }
diff --git a/streampipes-node-management/src/main/java/org/apache/streampipes/node/management/operation/monitor/health/NodeHealthCheck.java b/streampipes-node-management/src/main/java/org/apache/streampipes/node/management/operation/monitor/health/NodeHealthCheck.java
index a0a0dc3..531bedd 100644
--- a/streampipes-node-management/src/main/java/org/apache/streampipes/node/management/operation/monitor/health/NodeHealthCheck.java
+++ b/streampipes-node-management/src/main/java/org/apache/streampipes/node/management/operation/monitor/health/NodeHealthCheck.java
@@ -18,13 +18,14 @@
 package org.apache.streampipes.node.management.operation.monitor.health;
 
 import org.apache.streampipes.model.NodeHealthStatus;
+import org.apache.streampipes.node.management.operation.monitor.NodeMonitor;
 import org.apache.streampipes.node.management.utils.HttpUtils;
 
 import java.util.concurrent.*;
 
-public class NodeHealthCheck implements HealthCheck {
+public class NodeHealthCheck implements NodeMonitor<NodeHealthStatus> {
 
-    private static final int HEALTH_CHECK_FUTURE_TIMEOUT_SECS = 3;
+    private static final int FUTURE_TIMEOUT_SECS = 3;
     private final String healthCheckEndpoint;
 
     public NodeHealthCheck(String endpoint) {
@@ -32,15 +33,25 @@ public class NodeHealthCheck implements HealthCheck {
     }
 
     @Override
+    public Callable<NodeHealthStatus> monitoringCallable() {
+        return () -> {
+            if (!Thread.currentThread().isInterrupted()) {
+                return HttpUtils.get(healthCheckEndpoint, NodeHealthStatus.class);
+            }
+            return new NodeHealthStatus(false);
+        };
+    }
+
+    @Override
     public NodeHealthStatus execute() {
         ExecutorService executorService = Executors.newSingleThreadExecutor();
 
-        final Future<NodeHealthStatus> future = executorService.submit(nodeHealthStatusCallable());
+        final Future<NodeHealthStatus> future = executorService.submit(monitoringCallable());
 
         NodeHealthStatus nodeHealthStatus;
         try {
             // blocking call until timeout is reached
-            nodeHealthStatus = future.get(HEALTH_CHECK_FUTURE_TIMEOUT_SECS, TimeUnit.SECONDS);
+            nodeHealthStatus = future.get(FUTURE_TIMEOUT_SECS, TimeUnit.SECONDS);
         } catch (InterruptedException | ExecutionException e) {
             nodeHealthStatus = new NodeHealthStatus(false);
         } catch (TimeoutException e) {
@@ -50,14 +61,4 @@ public class NodeHealthCheck implements HealthCheck {
         executorService.shutdown();
         return nodeHealthStatus;
     }
-
-    @Override
-    public Callable<NodeHealthStatus> nodeHealthStatusCallable() {
-        return () -> {
-            if (!Thread.currentThread().isInterrupted()) {
-                return HttpUtils.get(healthCheckEndpoint, NodeHealthStatus.class);
-            }
-            return new NodeHealthStatus(false);
-        };
-    }
 }
diff --git a/streampipes-node-management/src/main/java/org/apache/streampipes/node/management/operation/monitor/resource/ClusterResourceMonitor.java b/streampipes-node-management/src/main/java/org/apache/streampipes/node/management/operation/monitor/resource/ClusterResourceMonitor.java
new file mode 100644
index 0000000..008c3c4
--- /dev/null
+++ b/streampipes-node-management/src/main/java/org/apache/streampipes/node/management/operation/monitor/resource/ClusterResourceMonitor.java
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.apache.streampipes.node.management.operation.monitor.resource;
+
+
+import org.apache.streampipes.model.node.monitor.ResourceMetrics;
+import org.apache.streampipes.node.management.operation.monitor.health.ClusterHealthCheckMonitor;
+import org.apache.streampipes.node.management.utils.HttpUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+
+public class ClusterResourceMonitor {
+    private static final Logger LOG = LoggerFactory.getLogger(ClusterResourceMonitor.class.getCanonicalName());
+
+    private static final String RESOURCE_COLLECTOR_ROUTE = "/api/v2/node/info/resources";
+    private static final int RESOURCE_COLLECTOR_INTERVAL_SECS = 60;
+    private static ClusterResourceMonitor instance = null;
+
+    private ClusterResourceMonitor() {}
+
+    public static ClusterResourceMonitor getInstance() {
+        if (instance == null) {
+            synchronized (ClusterResourceMonitor.class) {
+                if (instance == null)
+                    instance = new ClusterResourceMonitor();
+            }
+        }
+        return instance;
+    }
+
+    public void run() {
+        ScheduledExecutorService executor = Executors.newSingleThreadScheduledExecutor();
+        executor.scheduleAtFixedRate(resourceCollector, 0, RESOURCE_COLLECTOR_INTERVAL_SECS, TimeUnit.SECONDS);
+    }
+
+    private final Runnable resourceCollector = () -> {
+        // Only collect resources for online nodes, others might not be reachable
+        ClusterHealthCheckMonitor.getInstance().getAllHealthyNodes().forEach(node -> {
+            // collect resources from node
+            String endpoint = HttpUtils.generateEndpoint(node, RESOURCE_COLLECTOR_ROUTE);
+            ResourceMetrics resourceMetrics = new NodeResourceCollector(endpoint).execute();
+
+             // TODO: implement
+        });
+    };
+}
diff --git a/streampipes-node-management/src/main/java/org/apache/streampipes/node/management/operation/monitor/health/NodeHealthCheck.java b/streampipes-node-management/src/main/java/org/apache/streampipes/node/management/operation/monitor/resource/NodeResourceCollector.java
similarity index 57%
copy from streampipes-node-management/src/main/java/org/apache/streampipes/node/management/operation/monitor/health/NodeHealthCheck.java
copy to streampipes-node-management/src/main/java/org/apache/streampipes/node/management/operation/monitor/resource/NodeResourceCollector.java
index a0a0dc3..d2f574b 100644
--- a/streampipes-node-management/src/main/java/org/apache/streampipes/node/management/operation/monitor/health/NodeHealthCheck.java
+++ b/streampipes-node-management/src/main/java/org/apache/streampipes/node/management/operation/monitor/resource/NodeResourceCollector.java
@@ -15,49 +15,50 @@
  * limitations under the License.
  *
  */
-package org.apache.streampipes.node.management.operation.monitor.health;
+package org.apache.streampipes.node.management.operation.monitor.resource;
 
-import org.apache.streampipes.model.NodeHealthStatus;
+import org.apache.streampipes.model.node.monitor.ResourceMetrics;
+import org.apache.streampipes.node.management.operation.monitor.NodeMonitor;
 import org.apache.streampipes.node.management.utils.HttpUtils;
 
 import java.util.concurrent.*;
 
-public class NodeHealthCheck implements HealthCheck {
+public class NodeResourceCollector implements NodeMonitor<ResourceMetrics> {
 
-    private static final int HEALTH_CHECK_FUTURE_TIMEOUT_SECS = 3;
-    private final String healthCheckEndpoint;
+    private static final int FUTURE_TIMEOUT_SECS = 3;
+    private final String resourceCollectorEndpoint;
 
-    public NodeHealthCheck(String endpoint) {
-        this.healthCheckEndpoint = endpoint;
+    public NodeResourceCollector(String resourceCollectorEndpoint) {
+        this.resourceCollectorEndpoint = resourceCollectorEndpoint;
     }
 
     @Override
-    public NodeHealthStatus execute() {
+    public Callable<ResourceMetrics> monitoringCallable() {
+        return () -> {
+            if (!Thread.currentThread().isInterrupted()) {
+                return HttpUtils.get(resourceCollectorEndpoint, ResourceMetrics.class);
+            }
+            return new ResourceMetrics();
+        };
+    }
+
+    @Override
+    public ResourceMetrics execute() {
         ExecutorService executorService = Executors.newSingleThreadExecutor();
 
-        final Future<NodeHealthStatus> future = executorService.submit(nodeHealthStatusCallable());
+        final Future<ResourceMetrics> future = executorService.submit(monitoringCallable());
 
-        NodeHealthStatus nodeHealthStatus;
+        ResourceMetrics resourceMetrics;
         try {
             // blocking call until timeout is reached
-            nodeHealthStatus = future.get(HEALTH_CHECK_FUTURE_TIMEOUT_SECS, TimeUnit.SECONDS);
+            resourceMetrics = future.get(FUTURE_TIMEOUT_SECS, TimeUnit.SECONDS);
         } catch (InterruptedException | ExecutionException e) {
-            nodeHealthStatus = new NodeHealthStatus(false);
+            resourceMetrics = new ResourceMetrics();
         } catch (TimeoutException e) {
             future.cancel(true);
-            nodeHealthStatus = new NodeHealthStatus(false);
+            resourceMetrics = new ResourceMetrics();
         }
         executorService.shutdown();
-        return nodeHealthStatus;
-    }
-
-    @Override
-    public Callable<NodeHealthStatus> nodeHealthStatusCallable() {
-        return () -> {
-            if (!Thread.currentThread().isInterrupted()) {
-                return HttpUtils.get(healthCheckEndpoint, NodeHealthStatus.class);
-            }
-            return new NodeHealthStatus(false);
-        };
+        return resourceMetrics;
     }
 }
diff --git a/streampipes-node-management/src/main/java/org/apache/streampipes/node/management/operation/monitor/resources/ClusterResourceMonitor.java b/streampipes-node-management/src/main/java/org/apache/streampipes/node/management/operation/monitor/resources/ClusterResourceMonitor.java
deleted file mode 100644
index 2d1b2ff..0000000
--- a/streampipes-node-management/src/main/java/org/apache/streampipes/node/management/operation/monitor/resources/ClusterResourceMonitor.java
+++ /dev/null
@@ -1,90 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-package org.apache.streampipes.node.management.operation.monitor.resources;
-
-
-import org.apache.streampipes.model.node.NodeInfoDescription;
-import org.apache.streampipes.node.management.utils.StorageUtils;
-
-import java.net.InetSocketAddress;
-import java.net.MalformedURLException;
-import java.net.Socket;
-import java.net.URL;
-import java.util.List;
-
-public class ClusterResourceMonitor {
-
-    private static final int RESOURCE_RETRIEVE_FREQUENCY_MS = 60000;
-    private static final int SOCKET_TIMEOUT_MS = 500;
-    private static ClusterResourceMonitor instance = null;
-
-    private ClusterResourceMonitor() {}
-
-    public static ClusterResourceMonitor getInstance() {
-        if (instance == null) {
-            synchronized (ClusterResourceMonitor.class) {
-                if (instance == null)
-                    instance = new ClusterResourceMonitor();
-            }
-        }
-        return instance;
-    }
-
-    public void run() {
-        new Thread(getNodes, "nodes").start();
-    }
-
-    private final Runnable getNodes = () -> {
-        while (true) {
-            try {
-                List<NodeInfoDescription> nodes =  StorageUtils.persistentNodeAPI().getAllNodes();
-                if (nodes.size() > 0) {
-                    nodes.forEach(node -> {
-                        try {
-                            URL nodeUrl = generateNodeUrl(node);
-                            // TODO: gather current resources from all active node controller endpoints
-
-                        } catch (MalformedURLException e) {
-                            e.printStackTrace();
-                        }
-                    });
-                }
-                Thread.sleep(RESOURCE_RETRIEVE_FREQUENCY_MS);
-            } catch (InterruptedException e) {
-                e.printStackTrace();
-            }
-        }
-    };
-
-    private URL generateNodeUrl(NodeInfoDescription desc) throws MalformedURLException {
-        return new URL("http", desc.getHostname(), desc.getPort(), "");
-    }
-
-    private boolean healthCheck(URL url) {
-        boolean isAlive = true;
-        try {
-            InetSocketAddress sa = new InetSocketAddress(url.getHost(), url.getPort());
-            Socket ss = new Socket();
-            ss.connect(sa, SOCKET_TIMEOUT_MS);
-            ss.close();
-        } catch(Exception e) {
-            isAlive = false;
-        }
-        return isAlive;
-    }
-}
diff --git a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/migration/MigrationPipelineGenerator.java b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/migration/MigrationPipelineGenerator.java
index bfb209c..f69a3b9 100644
--- a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/migration/MigrationPipelineGenerator.java
+++ b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/migration/MigrationPipelineGenerator.java
@@ -35,7 +35,7 @@ public class MigrationPipelineGenerator {
     public static Pipeline generateMigrationPipeline(InvocableStreamPipesEntity entityToMigrate, Pipeline correspondingPipeline){
 
         List<NodeInfoDescription> possibleTargetNodes = new ArrayList<>();
-        List<NodeInfoDescription> nodeInfo = NodeManagement.getOnlineNodes();
+        List<NodeInfoDescription> nodeInfo = NodeManagement.getInstance().getOnlineNodes();
         nodeInfo.forEach(desc ->{
             if(desc.getSupportedElements().stream().anyMatch(element -> element.equals(entityToMigrate.getAppId()))
                 && !desc.getNodeControllerId().equals(entityToMigrate.getDeploymentTargetNodeId()))
diff --git a/streampipes-rest/src/main/java/org/apache/streampipes/rest/impl/NodeManagementResource.java b/streampipes-rest/src/main/java/org/apache/streampipes/rest/impl/NodeManagementResource.java
index c9acd31..65d58b8 100644
--- a/streampipes-rest/src/main/java/org/apache/streampipes/rest/impl/NodeManagementResource.java
+++ b/streampipes-rest/src/main/java/org/apache/streampipes/rest/impl/NodeManagementResource.java
@@ -39,7 +39,7 @@ public class NodeManagementResource extends AbstractRestResource implements INod
     @Produces(MediaType.APPLICATION_JSON)
     @Override
     public Response addNode(@PathParam("username") String username, NodeInfoDescription desc) {
-        return statusMessage(NodeManagement.addOrRejoin(desc));
+        return statusMessage(NodeManagement.getInstance().addOrRejoin(desc));
     }
 
     @PUT
@@ -51,7 +51,7 @@ public class NodeManagementResource extends AbstractRestResource implements INod
     public Response updateNode(@PathParam("username") String username,
                                @PathParam("nodeControllerId") String nodeControllerId,
                                NodeInfoDescription desc) {
-        return statusMessage(NodeManagement.updateNode(desc));
+        return statusMessage(NodeManagement.getInstance().updateNode(desc));
     }
 
     @POST
@@ -62,7 +62,7 @@ public class NodeManagementResource extends AbstractRestResource implements INod
     @Override
     public Response syncRemoteUpdateFromNodeController(@PathParam("username") String username,
                                                        NodeInfoDescription desc) {
-        return statusMessage(NodeManagement.syncRemoteNodeUpdateRequest(desc));
+        return statusMessage(NodeManagement.getInstance().syncRemoteNodeUpdateRequest(desc));
     }
 
     @POST
@@ -75,7 +75,7 @@ public class NodeManagementResource extends AbstractRestResource implements INod
                                         @PathParam("username") String username,
                                         @PathParam("nodeControllerId") String nodeControllerId) {
         NodeCondition nodeCondition = NodeCondition.valueOf(condition.toUpperCase());
-        boolean success = NodeManagement.updateNodeCondition(nodeControllerId, nodeCondition);
+        boolean success = NodeManagement.getInstance().updateNodeCondition(nodeControllerId, nodeCondition);
         if (success) {
             return statusMessage(Notifications.success(NotificationType.OPERATION_SUCCESS));
         } else {
@@ -88,7 +88,7 @@ public class NodeManagementResource extends AbstractRestResource implements INod
     @Override
     public Response deleteNode(@PathParam("username") String username,
                                @PathParam("nodeControllerId") String nodeControllerId) {
-        NodeManagement.deleteNode(nodeControllerId);
+        NodeManagement.getInstance().deleteNode(nodeControllerId);
         return statusMessage(Notifications.success(NotificationType.REMOVED_NODE));
     }
 
@@ -98,7 +98,7 @@ public class NodeManagementResource extends AbstractRestResource implements INod
     @Produces(MediaType.APPLICATION_JSON)
     @Override
     public Response getOnlineNodes() {
-        return ok(NodeManagement.getOnlineNodes());
+        return ok(NodeManagement.getInstance().getOnlineNodes());
     }
 
     @GET
@@ -106,6 +106,6 @@ public class NodeManagementResource extends AbstractRestResource implements INod
     @Produces(MediaType.APPLICATION_JSON)
     @Override
     public Response getNodes() {
-        return ok(NodeManagement.getAllNodes());
+        return ok(NodeManagement.getInstance().getAllNodes());
     }
 }