You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@streampipes.apache.org by wi...@apache.org on 2021/03/05 10:27:43 UTC

[incubator-streampipes] branch edge-extensions updated: add sinks to be hosted on edge nodes, add mapDB file persistence to nodectlr, fix rollback

This is an automated email from the ASF dual-hosted git repository.

wiener pushed a commit to branch edge-extensions
in repository https://gitbox.apache.org/repos/asf/incubator-streampipes.git


The following commit(s) were added to refs/heads/edge-extensions by this push:
     new 04c97b6  add sinks to be hosted on edge nodes, add mapDB file persistence to nodectlr, fix rollback
04c97b6 is described below

commit 04c97b64035b09af570f22dbba53f7dc9467a591
Author: Patrick Wiener <wi...@fzi.de>
AuthorDate: Fri Mar 5 11:27:17 2021 +0100

    add sinks to be hosted on edge nodes, add mapDB file persistence to nodectlr, fix rollback
---
 .../extensions/ExtensionsModelSubmitter.java       | 19 ++++--
 .../standalone/init/StandaloneModelSubmitter.java  | 16 ++++-
 .../container/util/NodeControllerUtil.java         | 13 ++--
 streampipes-node-controller-container/pom.xml      | 12 ++--
 ...ource.java => ContainerDeploymentResource.java} |  2 +-
 .../container/api/HealthCheckResource.java         |  2 +-
 .../container/api/InvocableEntityResource.java     |  1 -
 .../api/NodeControllerResourceConfig.java          |  2 +-
 .../container/config/NodeControllerConfig.java     | 13 ++--
 ...RunningInstances.java => RunningInstances.java} |  2 +-
 .../orchestrator/RunningContainerInstances.java    | 20 +++---
 .../management/pe/RunningInvocableInstances.java   | 22 ++++---
 .../management/relay/RunningRelayInstances.java    |  5 +-
 .../controller/container/storage/CRUDStorage.java  | 15 +++--
 .../controller/container/storage/MapDBImpl.java    | 73 ++++++++++++++++++++++
 .../manager/execution/http/ElementSubmitter.java   | 32 +++++++++-
 .../manager/execution/http/GraphSubmitter.java     |  2 +-
 .../pipeline/PipelineMigrationExecutor.java        | 15 +++--
 .../execution/pipeline/migration/Command.java      | 23 -------
 .../migration/PipelineElementStopCommand.java      | 30 ---------
 .../pipeline/migration/RelayStartCommand.java      | 30 ---------
 .../pipeline/migration/RelayStopCommand.java       | 30 ---------
 .../manager/node/AbstractClusterManager.java       | 72 ++++++++++++++++-----
 .../manager/node/NodeClusterManager.java           | 14 +++--
 .../streampipes/manager/node/NodeSyncOptions.java  |  2 +-
 .../node-configuration-details.component.html      |  7 ---
 .../node-configuration-details.component.ts        | 34 ++++++++--
 27 files changed, 299 insertions(+), 209 deletions(-)

diff --git a/streampipes-container-extensions/src/main/java/org/apache/streampipes/container/extensions/ExtensionsModelSubmitter.java b/streampipes-container-extensions/src/main/java/org/apache/streampipes/container/extensions/ExtensionsModelSubmitter.java
index 967519c..8d45a65 100644
--- a/streampipes-container-extensions/src/main/java/org/apache/streampipes/container/extensions/ExtensionsModelSubmitter.java
+++ b/streampipes-container-extensions/src/main/java/org/apache/streampipes/container/extensions/ExtensionsModelSubmitter.java
@@ -52,9 +52,8 @@ import org.springframework.context.annotation.Import;
 
 import javax.annotation.PreDestroy;
 import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.List;
+import java.util.*;
+import java.util.stream.Collectors;
 
 @Configuration
 @EnableAutoConfiguration
@@ -99,7 +98,7 @@ public abstract class ExtensionsModelSubmitter extends ModelSubmitter<EdgeExtens
                     conf.getId(),
                     conf.getHost(),
                     conf.getPort(),
-                    DeclarersSingleton.getInstance().getEpaDeclarers());
+                    generateSupportedAppIds());
 
             String nodeControllerUrl = PROTOCOL + conf.getNodeControllerHost() + COLON + conf.getNodeControllerPort();
 
@@ -149,6 +148,18 @@ public abstract class ExtensionsModelSubmitter extends ModelSubmitter<EdgeExtens
         }
     }
 
+    private List<String> generateSupportedAppIds() {
+        List<String> supportedAppIds = new ArrayList<>();
+
+        List<String> dataProcessorsAppIds = new ArrayList<>(DeclarersSingleton.getInstance().getEpaDeclarers().keySet());
+        List<String> dataSinksAppIds = new ArrayList<>(DeclarersSingleton.getInstance().getConsumerDeclarers().keySet());
+
+        supportedAppIds.addAll(dataProcessorsAppIds);
+        supportedAppIds.addAll(dataSinksAppIds);
+
+        return supportedAppIds;
+    }
+
     private ConnectWorkerContainer getContainerDescription(String connectWorkerHost, int connectWorkerPort,
                                                            boolean runsOnEdgeNode) {
 
diff --git a/streampipes-container-standalone/src/main/java/org/apache/streampipes/container/standalone/init/StandaloneModelSubmitter.java b/streampipes-container-standalone/src/main/java/org/apache/streampipes/container/standalone/init/StandaloneModelSubmitter.java
index 2fa53ae..f8e25ce 100644
--- a/streampipes-container-standalone/src/main/java/org/apache/streampipes/container/standalone/init/StandaloneModelSubmitter.java
+++ b/streampipes-container-standalone/src/main/java/org/apache/streampipes/container/standalone/init/StandaloneModelSubmitter.java
@@ -32,7 +32,9 @@ import org.apache.streampipes.container.init.RunningInstances;
 import org.apache.streampipes.container.model.PeConfig;
 import org.apache.streampipes.container.util.ConsulUtil;
 
+import java.util.ArrayList;
 import java.util.Collections;
+import java.util.List;
 
 import javax.annotation.PreDestroy;
 
@@ -65,7 +67,7 @@ public abstract class StandaloneModelSubmitter extends ModelSubmitter<PeConfig>
                     peConfig.getId(),
                     peConfig.getHost(),
                     peConfig.getPort(),
-                    DeclarersSingleton.getInstance().getEpaDeclarers());
+                    generateSupportedAppIds());
         } else {
             // primary
             ConsulUtil.registerPeService(
@@ -75,6 +77,18 @@ public abstract class StandaloneModelSubmitter extends ModelSubmitter<PeConfig>
         }
     }
 
+    private List<String> generateSupportedAppIds() {
+        List<String> supportedAppIds = new ArrayList<>();
+
+        List<String> dataProcessorsAppIds = new ArrayList<>(DeclarersSingleton.getInstance().getEpaDeclarers().keySet());
+        List<String> dataSinksAppIds = new ArrayList<>(DeclarersSingleton.getInstance().getConsumerDeclarers().keySet());
+
+        supportedAppIds.addAll(dataProcessorsAppIds);
+        supportedAppIds.addAll(dataSinksAppIds);
+
+        return supportedAppIds;
+    }
+
     @PreDestroy
     public void onExit() {
         LOG.info("Shutting down StreamPipes pipeline element container...");
diff --git a/streampipes-container/src/main/java/org/apache/streampipes/container/util/NodeControllerUtil.java b/streampipes-container/src/main/java/org/apache/streampipes/container/util/NodeControllerUtil.java
index be7ee90..497d537 100644
--- a/streampipes-container/src/main/java/org/apache/streampipes/container/util/NodeControllerUtil.java
+++ b/streampipes-container/src/main/java/org/apache/streampipes/container/util/NodeControllerUtil.java
@@ -44,19 +44,18 @@ public class NodeControllerUtil {
     private static final String NODE_CONTROLLER_CONTAINER_HOST = "SP_NODE_CONTROLLER_CONTAINER_HOST";
     private static final String NODE_CONTROLLER_CONTAINER_PORT = "SP_NODE_CONTROLLER_CONTAINER_PORT";
 
-    public static void register(String serviceID, String host, int port,
-                                Map<String, SemanticEventProcessingAgentDeclarer> epaDeclarers) {
+    public static void register(String serviceID, String host, int port, List<String> supportedAppIds) {
         register(PE_TAG, makeSvcId(host, serviceID), host, port,
-                Arrays.asList(PE_TAG, SECONDARY_PE_IDENTIFIER_TAG), epaDeclarers);
+                Arrays.asList(PE_TAG, SECONDARY_PE_IDENTIFIER_TAG), supportedAppIds);
     }
 
     public static void register(String svcName, String svcId, String host, int port, List<String> tag,
-                                Map<String, SemanticEventProcessingAgentDeclarer> epaDeclarers) {
+                                List<String> supportedAppIds) {
         boolean connected = false;
 
         while (!connected) {
             LOG.info("Trying to register pipeline element container at node controller: " + makeRegistrationEndpoint());
-            String body = createSvcBody(svcName, svcId, host, port, tag, epaDeclarers);
+            String body = createSvcBody(svcName, svcId, host, port, tag, supportedAppIds);
             connected = registerSvcHttpClient(body);
 
             if (!connected) {
@@ -87,7 +86,7 @@ public class NodeControllerUtil {
     }
 
     private static String createSvcBody(String name, String id, String host, int port, List<String> tags,
-                                        Map<String, SemanticEventProcessingAgentDeclarer> epaDeclarers) {
+                                        List<String> supportedAppIds) {
         try {
             ConsulServiceRegistrationBody body = new ConsulServiceRegistrationBody();
             String healthCheckURL = HTTP_PROTOCOL + host + COLON + port;
@@ -101,7 +100,7 @@ public class NodeControllerUtil {
 
             InvocableRegistration svcBody = new InvocableRegistration();
             svcBody.setConsulServiceRegistrationBody(body);
-            svcBody.setSupportedPipelineElementAppIds(new ArrayList<>(epaDeclarers.keySet()));
+            svcBody.setSupportedPipelineElementAppIds(supportedAppIds);
 
             return JacksonSerializer.getObjectMapper().writeValueAsString(svcBody);
         } catch (JsonProcessingException e) {
diff --git a/streampipes-node-controller-container/pom.xml b/streampipes-node-controller-container/pom.xml
index d5e0f49..8b8b60f 100644
--- a/streampipes-node-controller-container/pom.xml
+++ b/streampipes-node-controller-container/pom.xml
@@ -76,13 +76,6 @@
         </dependency>
 
         <!--external dependencies-->
-        <!-- parse yaml config -->
-<!--        <dependency>-->
-<!--            <groupId>org.yaml</groupId>-->
-<!--            <artifactId>snakeyaml</artifactId>-->
-<!--            <version>1.21</version>-->
-<!--        </dependency>-->
-        <!-- docker client for java -->
         <dependency>
             <groupId>com.spotify</groupId>
             <artifactId>docker-client</artifactId>
@@ -135,6 +128,11 @@
             <groupId>com.fasterxml.jackson.module</groupId>
             <artifactId>jackson-module-jaxb-annotations</artifactId>
         </dependency>
+        <dependency>
+            <groupId>org.mapdb</groupId>
+            <artifactId>mapdb</artifactId>
+            <version>3.0.8</version>
+        </dependency>
     </dependencies>
 
     <build>
diff --git a/streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/api/ContainerResource.java b/streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/api/ContainerDeploymentResource.java
similarity index 96%
rename from streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/api/ContainerResource.java
rename to streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/api/ContainerDeploymentResource.java
index 21733ef..b6899df 100644
--- a/streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/api/ContainerResource.java
+++ b/streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/api/ContainerDeploymentResource.java
@@ -25,7 +25,7 @@ import javax.ws.rs.*;
 import javax.ws.rs.core.MediaType;
 
 @Path("/api/v2/node/container")
-public class ContainerResource extends AbstractResource {
+public class ContainerDeploymentResource extends AbstractResource {
 
     @GET
     @Produces(MediaType.APPLICATION_JSON)
diff --git a/streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/api/HealthCheckResource.java b/streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/api/HealthCheckResource.java
index 8c593a4..6a4f88e 100644
--- a/streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/api/HealthCheckResource.java
+++ b/streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/api/HealthCheckResource.java
@@ -25,7 +25,7 @@ import javax.ws.rs.Produces;
 import javax.ws.rs.core.MediaType;
 import javax.ws.rs.core.Response;
 
-@Path("/")
+@Path("/healthy")
 public class HealthCheckResource extends AbstractResource {
 
     @GET
diff --git a/streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/api/InvocableEntityResource.java b/streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/api/InvocableEntityResource.java
index 6028438..03a0a4f 100644
--- a/streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/api/InvocableEntityResource.java
+++ b/streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/api/InvocableEntityResource.java
@@ -25,7 +25,6 @@ import org.apache.streampipes.model.base.InvocableStreamPipesEntity;
 import org.apache.streampipes.node.controller.container.management.pe.InvocableElementManager;
 import org.apache.streampipes.node.controller.container.management.pe.RunningInvocableInstances;
 import org.apache.streampipes.rest.shared.annotation.JacksonSerialized;
-import org.apache.streampipes.serializers.json.JacksonSerializer;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
diff --git a/streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/api/NodeControllerResourceConfig.java b/streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/api/NodeControllerResourceConfig.java
index ac277e8..893bcb3 100644
--- a/streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/api/NodeControllerResourceConfig.java
+++ b/streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/api/NodeControllerResourceConfig.java
@@ -29,6 +29,6 @@ public class NodeControllerResourceConfig extends ResourceConfig {
         register(InvocableEntityResource.class);
         register(DataStreamRelayResource.class);
         register(ConnectResource.class);
-        register(ContainerResource.class);
+        register(ContainerDeploymentResource.class);
     }
 }
diff --git a/streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/config/NodeControllerConfig.java b/streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/config/NodeControllerConfig.java
index dc780a4..9937254 100644
--- a/streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/config/NodeControllerConfig.java
+++ b/streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/config/NodeControllerConfig.java
@@ -20,8 +20,10 @@ package org.apache.streampipes.node.controller.container.config;
 import org.apache.streampipes.config.SpConfig;
 import org.apache.streampipes.model.node.resources.fielddevice.FieldDeviceAccessResource;
 
+import java.util.Arrays;
 import java.util.List;
 import java.util.stream.Collectors;
+import java.util.stream.Stream;
 
 public enum NodeControllerConfig {
     INSTANCE;
@@ -93,7 +95,6 @@ public enum NodeControllerConfig {
                 String.class);
     }
 
-    // TODO: should be flexibly set due to node broker technology used
     public int getNodeBrokerPort() {
         return getEnvOrDefault(
                 ConfigKeys.NODE_BROKER_CONTAINER_PORT,
@@ -102,15 +103,13 @@ public enum NodeControllerConfig {
     }
 
     public List<String> getNodeLocations() {
-        return System.getenv()
-                .entrySet()
-                .stream()
-                .filter(e -> (e.getKey().contains(ConfigKeys.NODE_LOCATION)))
-                .map(x ->  x.getKey().replace(ConfigKeys.NODE_LOCATION + "_", "").toLowerCase() + "=" + x.getValue())
+        return System.getenv().entrySet().stream()
+                .filter(e -> e.getKey().contains(ConfigKeys.NODE_LOCATION))
+                .map(v -> v.getValue().split(";"))
+                .flatMap(Stream::of)
                 .collect(Collectors.toList());
     }
 
-    // TODO: get supported PE programmatically instead of environment variables
     public List<String> getSupportedPipelineElements() {
         return System.getenv()
                 .entrySet()
diff --git a/streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/management/IRunningInstances.java b/streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/management/RunningInstances.java
similarity index 96%
rename from streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/management/IRunningInstances.java
rename to streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/management/RunningInstances.java
index afb797d..7091646 100644
--- a/streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/management/IRunningInstances.java
+++ b/streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/management/RunningInstances.java
@@ -17,7 +17,7 @@
  */
 package org.apache.streampipes.node.controller.container.management;
 
-public interface IRunningInstances<T> {
+public interface RunningInstances<T> {
 
     void add(String id, T value);
 
diff --git a/streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/management/orchestrator/RunningContainerInstances.java b/streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/management/orchestrator/RunningContainerInstances.java
index 3393a74..9b2e3a0 100644
--- a/streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/management/orchestrator/RunningContainerInstances.java
+++ b/streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/management/orchestrator/RunningContainerInstances.java
@@ -18,33 +18,39 @@
 package org.apache.streampipes.node.controller.container.management.orchestrator;
 
 import org.apache.streampipes.model.node.container.DockerContainer;
-import org.apache.streampipes.node.controller.container.management.IRunningInstances;
+import org.apache.streampipes.node.controller.container.management.RunningInstances;
+import org.apache.streampipes.node.controller.container.storage.MapDBImpl;
 
+import java.io.File;
 import java.util.HashMap;
 import java.util.Map;
 
-public enum RunningContainerInstances implements IRunningInstances<DockerContainer> {
+public enum RunningContainerInstances implements RunningInstances<DockerContainer> {
     INSTANCE;
 
-    private final Map<String, DockerContainer> runningInstances = new HashMap<>();
+    private final MapDBImpl mapDB;
+
+    RunningContainerInstances() {
+        this.mapDB = new MapDBImpl(new File("containers.db"));
+    }
 
     @Override
     public void add(String id, DockerContainer container) {
-        runningInstances.put(id, container);
+        mapDB.create(id, container);
     }
 
     @Override
     public boolean isRunning(String id) {
-        return runningInstances.get(id) != null;
+        return mapDB.retrieve(id) != null;
     }
 
     @Override
     public DockerContainer get(String id) {
-        return isRunning(id) ? runningInstances.get(id) : null;
+        return isRunning(id) ? mapDB.retrieve(id) : null;
     }
 
     @Override
     public void remove(String id) {
-        runningInstances.remove(id);
+        mapDB.delete(id);
     }
 }
diff --git a/streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/management/pe/RunningInvocableInstances.java b/streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/management/pe/RunningInvocableInstances.java
index 3067f71..bfa68a1 100644
--- a/streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/management/pe/RunningInvocableInstances.java
+++ b/streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/management/pe/RunningInvocableInstances.java
@@ -18,33 +18,37 @@
 package org.apache.streampipes.node.controller.container.management.pe;
 
 import org.apache.streampipes.model.base.InvocableStreamPipesEntity;
-import org.apache.streampipes.node.controller.container.management.IRunningInstances;
+import org.apache.streampipes.node.controller.container.storage.MapDBImpl;
+import org.apache.streampipes.node.controller.container.management.RunningInstances;
 
-import java.util.HashMap;
-import java.util.Map;
+import java.io.File;
 
-public enum RunningInvocableInstances implements IRunningInstances<InvocableStreamPipesEntity> {
+public enum RunningInvocableInstances implements RunningInstances<InvocableStreamPipesEntity> {
     INSTANCE;
 
-    private final Map<String, InvocableStreamPipesEntity> runningInvocableInstances = new HashMap<>();
+    private final MapDBImpl mapDB;
+
+    RunningInvocableInstances() {
+        this.mapDB = new MapDBImpl(new File("invocables.db"));
+    }
 
     @Override
     public void add(String id, InvocableStreamPipesEntity value) {
-        runningInvocableInstances.put(id, value);
+        mapDB.create(id, value);
     }
 
     @Override
     public boolean isRunning(String id) {
-        return runningInvocableInstances.get(id) != null;
+        return mapDB.retrieve(id) != null;
     }
 
     @Override
     public InvocableStreamPipesEntity get(String id) {
-        return runningInvocableInstances.get(id);
+        return mapDB.retrieve(id);
     }
 
     @Override
     public void remove(String id) {
-        runningInvocableInstances.remove(id);
+        mapDB.delete(id);
     }
 }
diff --git a/streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/management/relay/RunningRelayInstances.java b/streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/management/relay/RunningRelayInstances.java
index e95ab57..085ed82 100644
--- a/streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/management/relay/RunningRelayInstances.java
+++ b/streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/management/relay/RunningRelayInstances.java
@@ -17,12 +17,12 @@
  */
 package org.apache.streampipes.node.controller.container.management.relay;
 
-import org.apache.streampipes.node.controller.container.management.IRunningInstances;
+import org.apache.streampipes.node.controller.container.management.RunningInstances;
 
 import java.util.*;
 import java.util.stream.Collectors;
 
-public enum RunningRelayInstances implements IRunningInstances<Map<String,EventRelay>> {
+public enum RunningRelayInstances implements RunningInstances<Map<String,EventRelay>> {
     INSTANCE;
 
     private final Map<String, Map<String, EventRelay>> runningInstances = new HashMap<>();
@@ -53,6 +53,5 @@ public enum RunningRelayInstances implements IRunningInstances<Map<String,EventR
                 .map(Map::values)
                 .flatMap(Collection::stream)
                 .collect(Collectors.toList());
-        //return new ArrayList<>(runningInstances.values().forEach(e -> e.values()));
     }
 }
diff --git a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/pipeline/migration/PipelineElementStartCommand.java b/streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/storage/CRUDStorage.java
similarity index 75%
rename from streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/pipeline/migration/PipelineElementStartCommand.java
rename to streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/storage/CRUDStorage.java
index 4afbf46..c588487 100644
--- a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/pipeline/migration/PipelineElementStartCommand.java
+++ b/streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/storage/CRUDStorage.java
@@ -15,17 +15,16 @@
  * limitations under the License.
  *
  */
-package org.apache.streampipes.manager.execution.pipeline.migration;
+package org.apache.streampipes.node.controller.container.storage;
 
-public class PipelineElementStartCommand implements Command {
+import java.util.List;
 
-    @Override
-    public void execute() {
+public interface CRUDStorage {
+    <T> void create(String id, T value);
 
-    }
+    <T>T retrieve(String id);
 
-    @Override
-    public void rollback() {
+    <T> void update(String id, T value);
 
-    }
+    void delete(String id);
 }
diff --git a/streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/storage/MapDBImpl.java b/streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/storage/MapDBImpl.java
new file mode 100644
index 0000000..8483fbd
--- /dev/null
+++ b/streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/storage/MapDBImpl.java
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.apache.streampipes.node.controller.container.storage;
+
+import org.mapdb.DB;
+import org.mapdb.DBMaker;
+import org.mapdb.Serializer;
+
+import java.io.File;
+import java.util.concurrent.ConcurrentMap;
+
+public class MapDBImpl implements CRUDStorage {
+
+    private static final String DB_STORAGE_PATH = "/var/lib/streampipes/";
+
+    private DB db;
+    private ConcurrentMap<String, Object> map;
+
+    public MapDBImpl(File dbFile) {
+        if("true".equals(System.getenv("SP_DEBUG"))) {
+            db = DBMaker
+                    .memoryDB()
+                    .closeOnJvmShutdown()
+                    .make();
+        } else {
+            db = DBMaker
+                    .fileDB(DB_STORAGE_PATH + dbFile)
+                    .closeOnJvmShutdown()
+                    .make();
+        }
+        map = db.hashMap("nodectlcache", Serializer.STRING, Serializer.JAVA)
+                .createOrOpen();
+    }
+
+    @Override
+    public <T> void create(String id, T value) {
+        map.put(id, value);
+    }
+
+    @Override
+    public <T> T retrieve(String id) {
+        return (T) map.get(id);
+    }
+
+    @Override
+    public <T> void update(String id, T value) {
+        map.put(id, value);
+    }
+
+    @Override
+    public void delete(String id) {
+        map.remove(id);
+    }
+
+    public void close() {
+        db.close();
+    }
+}
diff --git a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/http/ElementSubmitter.java b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/http/ElementSubmitter.java
index 57715b9..c6de47c 100644
--- a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/http/ElementSubmitter.java
+++ b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/http/ElementSubmitter.java
@@ -130,11 +130,16 @@ public abstract class ElementSubmitter {
     private void rollbackInvokedEntities(PipelineOperationStatus status) {
         for (PipelineElementStatus s : status.getElementStatus()) {
             if (s.isSuccess()) {
-                Optional<InvocableStreamPipesEntity> graphs = findGraphs(s.getElementId());
+                Optional<InvocableStreamPipesEntity> graphs = compareAndFindGraphs(s.getElementId());
+                Optional<SpDataStreamRelayContainer> relays = compareAndFindRelays(s.getElementId());
                 graphs.ifPresent(graph -> {
-                    LOG.info("Rolling back element " + graph.getElementId());
+                    LOG.info("Rolling back element " + s.getElementId());
                     makeDetachHttpRequest(new InvocableEntityUrlGenerator(graph), graph);
                 });
+                relays.ifPresent(relay -> {
+                    LOG.info("Rolling back element " + s.getElementId());
+                    makeDetachHttpRequest(new StreamRelayEndpointUrlGenerator(relay), relay);
+                });
             }
         }
     }
@@ -148,10 +153,33 @@ public abstract class ElementSubmitter {
         return status;
     }
 
+    // Old: filter not working. leads to comparing endpoint of primary pipeline element with the one of nodectl
+    // because endpoint in PipelineOperationStatus set in @{HttpRequestBuilder} used nodectl endpoint
+    @Deprecated
     private Optional<InvocableStreamPipesEntity> findGraphs(String elementId) {
         return graphs.stream().filter(i -> i.getBelongsTo().equals(elementId)).findFirst();
     }
 
+    private Optional<InvocableStreamPipesEntity> compareAndFindGraphs(String elementId) {
+        return graphs.stream().filter(graph -> matchingEndpoints(graph, elementId)).findFirst();
+    }
+
+    private Optional<SpDataStreamRelayContainer> compareAndFindRelays(String elementId) {
+        return relays.stream().filter(relay -> matchingEndpoints(relay, elementId)).findFirst();
+    }
+
+    private boolean matchingEndpoints(NamedStreamPipesEntity entity, String endpoint) {
+        boolean matching = false;
+        if (entity instanceof InvocableStreamPipesEntity) {
+            matching = new InvocableEntityUrlGenerator((InvocableStreamPipesEntity) entity)
+                    .generateInvokeEndpoint().equals(endpoint);
+        } else if (entity instanceof SpDataStreamRelayContainer) {
+            matching = new StreamRelayEndpointUrlGenerator((SpDataStreamRelayContainer) entity)
+                    .generateInvokeEndpoint().equals(endpoint);
+        }
+        return matching;
+    }
+
     protected boolean allInvocableEntitiesRunning(PipelineOperationStatus status) {
         return status.getElementStatus().stream().allMatch(PipelineElementStatus::isSuccess);
     }
diff --git a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/http/GraphSubmitter.java b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/http/GraphSubmitter.java
index 726d1a6..2285355 100644
--- a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/http/GraphSubmitter.java
+++ b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/http/GraphSubmitter.java
@@ -74,7 +74,7 @@ public class GraphSubmitter extends ElementSubmitter {
     return verifyPipelineOperationStatus(
             status,
             "Successfully started pipeline " + pipelineName,
-            "Could not start pipeline" + pipelineName,
+            "Could not start pipeline " + pipelineName,
             true);
   }
 
diff --git a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/pipeline/PipelineMigrationExecutor.java b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/pipeline/PipelineMigrationExecutor.java
index e610d24..49cf71f 100644
--- a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/pipeline/PipelineMigrationExecutor.java
+++ b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/pipeline/PipelineMigrationExecutor.java
@@ -80,7 +80,6 @@ public class PipelineMigrationExecutor extends AbstractPipelineExecutor {
         this.relaysToBeDeleted = new ArrayList<>();
     }
 
-    // TODO: refactor!
     public PipelineOperationStatus migratePipelineElement() {
 
         PipelineOperationStatus status = initPipelineOperationStatus();
@@ -308,9 +307,17 @@ public class PipelineMigrationExecutor extends AbstractPipelineExecutor {
     }
 
     private void findPredecessorsInMigrationPipeline(PipelineGraph pipelineGraphAfterMigration) {
-        PipelineGraphHelpers.findStreams(pipelineGraphAfterMigration).forEach(stream ->
-                predecessorsAfterMigration.addAll(getPredecessors(stream, migrationEntity.getTargetElement(),
-                        pipelineGraphAfterMigration, new ArrayList<>())));
+        // get unique list of predecessors
+        List<NamedStreamPipesEntity> predecessors = PipelineGraphHelpers.findStreams(pipelineGraphAfterMigration).stream()
+                .map(stream -> getPredecessors(stream, migrationEntity.getTargetElement(),
+                        pipelineGraphAfterMigration, new ArrayList<>()))
+                .flatMap(List::stream)
+                .collect(Collectors.toList())
+                .stream()
+                .distinct()
+                .collect(Collectors.toList());
+
+        predecessorsAfterMigration.addAll(predecessors);
     }
 
     private List<SpDataSet> findDataSets() {
diff --git a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/pipeline/migration/Command.java b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/pipeline/migration/Command.java
deleted file mode 100644
index fd4bdd9..0000000
--- a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/pipeline/migration/Command.java
+++ /dev/null
@@ -1,23 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-package org.apache.streampipes.manager.execution.pipeline.migration;
-
-public interface Command {
-    void execute();
-    void rollback();
-}
diff --git a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/pipeline/migration/PipelineElementStopCommand.java b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/pipeline/migration/PipelineElementStopCommand.java
deleted file mode 100644
index 5a2bbd6..0000000
--- a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/pipeline/migration/PipelineElementStopCommand.java
+++ /dev/null
@@ -1,30 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-package org.apache.streampipes.manager.execution.pipeline.migration;
-
-public class PipelineElementStopCommand implements Command {
-    @Override
-    public void execute() {
-
-    }
-
-    @Override
-    public void rollback() {
-
-    }
-}
diff --git a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/pipeline/migration/RelayStartCommand.java b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/pipeline/migration/RelayStartCommand.java
deleted file mode 100644
index b2e242d..0000000
--- a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/pipeline/migration/RelayStartCommand.java
+++ /dev/null
@@ -1,30 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-package org.apache.streampipes.manager.execution.pipeline.migration;
-
-public class RelayStartCommand implements Command {
-    @Override
-    public void execute() {
-
-    }
-
-    @Override
-    public void rollback() {
-
-    }
-}
diff --git a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/pipeline/migration/RelayStopCommand.java b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/pipeline/migration/RelayStopCommand.java
deleted file mode 100644
index 1938268..0000000
--- a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/pipeline/migration/RelayStopCommand.java
+++ /dev/null
@@ -1,30 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-package org.apache.streampipes.manager.execution.pipeline.migration;
-
-public class RelayStopCommand implements Command {
-    @Override
-    public void execute() {
-
-    }
-
-    @Override
-    public void rollback() {
-
-    }
-}
diff --git a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/node/AbstractClusterManager.java b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/node/AbstractClusterManager.java
index dce1e06..0973c3e 100644
--- a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/node/AbstractClusterManager.java
+++ b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/node/AbstractClusterManager.java
@@ -28,6 +28,8 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.net.Socket;
 
 public abstract class AbstractClusterManager {
     private static final Logger LOG = LoggerFactory.getLogger(AbstractClusterManager.class.getCanonicalName());
@@ -36,8 +38,6 @@ public abstract class AbstractClusterManager {
     private static final String COLON = ":";
     private static final long RETRY_INTERVAL_MS = 5000;
     private static final int CONNECT_TIMEOUT = 1000;
-    private static final String BASE_NODE_CONTROLLER_INFO_ROUTE = "/api/v2/node/info";
-    private static final String BASE_NODE_CONTROLLER_RELAY_ROUTE = "/api/v2/node/stream/relay";
 
     public enum RequestOptions {
         GET,POST,PUT,DELETE
@@ -46,19 +46,55 @@ public abstract class AbstractClusterManager {
     protected static <T> boolean syncWithNodeController(T element, NodeSyncOptions sync) {
         switch (sync) {
             case ACTIVATE_NODE:
-                return sync(element, "/activate", RequestOptions.POST, false, NodeInfoDescription.class);
+                return sync(element, "/api/v2/node/info/activate", RequestOptions.POST, false);
             case DEACTIVATE_NODE:
-                return sync(element, "/deactivate", RequestOptions.POST, false, NodeInfoDescription.class);
+                return sync(element, "/api/v2/node/info/deactivate", RequestOptions.POST, false);
             case UPDATE_NODE:
-                return sync(element, "", RequestOptions.PUT, true, NodeInfoDescription.class);
+                return sync(element, "/api/v2/node/info", RequestOptions.PUT, true);
             case RESTART_RELAYS:
-                return sync(element, "/invoke", RequestOptions.POST, true, SpDataStreamRelayContainer.class);
+                return sync(element, "/api/v2/node/stream/relay/invoke", RequestOptions.POST, true);
+            case HEALTHY:
+                return healthCheck(element, "/healthy");
             default:
                 return false;
         }
     }
 
-    private static <T> boolean sync(T element, String subroute, RequestOptions request, boolean withBody, Class<?> type) {
+    private static <T> boolean healthCheck(T element, String route) {
+        String url = generateEndpoint(element, route);
+        // call node controller REST endpoints
+        //return get(url).contains("PONG");
+
+        NodeInfoDescription desc = (NodeInfoDescription) element;
+        String nodeCtlId = desc.getNodeControllerId();
+        boolean isAlive = false;
+        String host = desc.getHostname();
+        int port = desc.getPort();
+
+        int retries = 5;
+        for (int i = 0 ; i < retries ; i++) {
+            try {
+                LOG.info("Trying to health check node controller={} ({})", nodeCtlId, i+1 + "/" + retries);
+                InetSocketAddress sa = new InetSocketAddress(host, port);
+                Socket ss = new Socket();
+                ss.connect(sa, 500);
+                ss.close();
+                if (ss.isConnected()) {
+                    isAlive = true;
+                    break;
+                }
+                Thread.sleep(1000);
+            } catch (IOException | InterruptedException e) {
+                continue;
+            }
+            isAlive = true;
+        }
+        LOG.info(isAlive ? "Successfully health check node controller=" + url :
+                "Could not perform health check node with controller=" + url);
+        return isAlive;
+    }
+
+    private static <T> boolean sync(T element, String route, RequestOptions request, boolean withBody) {
         boolean synced = false;
 
         String body = "{}";
@@ -66,7 +102,7 @@ public abstract class AbstractClusterManager {
             body = jackson(element);
         }
 
-        String url = generateEndpoint(element, subroute, type);
+        String url = generateEndpoint(element, route);
         LOG.info("Trying to sync with node controller=" + url);
 
         boolean connected = false;
@@ -93,23 +129,21 @@ public abstract class AbstractClusterManager {
 
     // Helpers
 
-    private static <T> String generateEndpoint(T desc, String subroute, Class<?> type) {
-        if (type.equals(NodeInfoDescription.class)) {
+    private static <T> String generateEndpoint(T desc, String route) {
+        if (desc instanceof NodeInfoDescription) {
             NodeInfoDescription d = (NodeInfoDescription) desc;
             return PROTOCOL
                     + d.getHostname()
                     + COLON
                     + d.getPort()
-                    + BASE_NODE_CONTROLLER_INFO_ROUTE
-                    + subroute;
+                    + route;
         } else {
             SpDataStreamRelayContainer d = (SpDataStreamRelayContainer) desc;
             return PROTOCOL
                     + d.getDeploymentTargetNodeHostname()
                     + COLON
                     + d.getDeploymentTargetNodePort()
-                    + BASE_NODE_CONTROLLER_RELAY_ROUTE
-                    + subroute;
+                    + route;
         }
     }
 
@@ -121,6 +155,16 @@ public abstract class AbstractClusterManager {
         }
     }
 
+    private static String get(String url) {
+        try {
+            return Request.Get(url)
+                    .connectTimeout(CONNECT_TIMEOUT)
+                    .execute().returnContent().toString();
+        } catch (IOException e) {
+            throw new SpRuntimeException("Something went wrong during GET request to node controller", e);
+        }
+    }
+
     private static boolean put(String url, String body) {
         try {
             Request.Put(url)
diff --git a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/node/NodeClusterManager.java b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/node/NodeClusterManager.java
index 007dc15..5b85135 100644
--- a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/node/NodeClusterManager.java
+++ b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/node/NodeClusterManager.java
@@ -28,21 +28,25 @@ import org.apache.streampipes.storage.management.StorageDispatcher;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.util.ArrayList;
 import java.util.List;
 import java.util.Optional;
+import java.util.stream.Collectors;
 
 public class NodeClusterManager extends AbstractClusterManager {
     private static final Logger LOG = LoggerFactory.getLogger(NodeClusterManager.class.getCanonicalName());
 
-
     public static List<NodeInfoDescription> getAvailableNodes() {
-        //return new AvailableNodesFetcher().fetchNodes();
-        return getNodeStorageApi().getAllActiveNodes();
+        List<NodeInfoDescription> availableAndHealthyNodes = new ArrayList<>();
+        getNodeStorageApi().getAllActiveNodes().forEach(node -> {
+            if (syncWithNodeController(node, NodeSyncOptions.HEALTHY)) {
+                availableAndHealthyNodes.add(node);
+            }
+        });
+        return availableAndHealthyNodes;
     }
 
-
     public static List<NodeInfoDescription> getAllNodes() {
-        //return new AvailableNodesFetcher().fetchNodes();
         return getNodeStorageApi().getAllNodes();
     }
 
diff --git a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/node/NodeSyncOptions.java b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/node/NodeSyncOptions.java
index 644e688..5ba7567 100644
--- a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/node/NodeSyncOptions.java
+++ b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/node/NodeSyncOptions.java
@@ -18,5 +18,5 @@
 package org.apache.streampipes.manager.node;
 
 public enum NodeSyncOptions {
-    ACTIVATE_NODE, DEACTIVATE_NODE, UPDATE_NODE,RESTART_RELAYS;
+    ACTIVATE_NODE, DEACTIVATE_NODE, UPDATE_NODE,RESTART_RELAYS, HEALTHY;
 }
diff --git a/ui/src/app/configuration/node-configuration/node-configuration-details/node-configuration-details.component.html b/ui/src/app/configuration/node-configuration/node-configuration-details/node-configuration-details.component.html
index 6712a6e..37b068f 100644
--- a/ui/src/app/configuration/node-configuration/node-configuration-details/node-configuration-details.component.html
+++ b/ui/src/app/configuration/node-configuration/node-configuration-details/node-configuration-details.component.html
@@ -16,13 +16,6 @@
   ~
   -->
 
-<!--<div class="sp-dialog-container">-->
-<!--    <div class="sp-dialog-content padding-20">-->
-<!--        <div fxFlex="100" fxLayout="column">-->
-<!--            <p>node-settings works!</p>-->
-<!--        </div>-->
-<!--    </div>-->
-<!--</div>-->
 <div class="sp-dialog-container">
     <div class="sp-dialog-content padding-20">
         <div fxFlex="100" fxLayout="column">
diff --git a/ui/src/app/configuration/node-configuration/node-configuration-details/node-configuration-details.component.ts b/ui/src/app/configuration/node-configuration/node-configuration-details/node-configuration-details.component.ts
index ff2579d..bd9d555 100644
--- a/ui/src/app/configuration/node-configuration/node-configuration-details/node-configuration-details.component.ts
+++ b/ui/src/app/configuration/node-configuration/node-configuration-details/node-configuration-details.component.ts
@@ -17,11 +17,16 @@
  */
 
 import {Component, Input, OnInit} from '@angular/core';
-import {NodeInfoDescription} from "../../../core-model/gen/streampipes-model";
+import {Message, NodeInfoDescription, PipelineOperationStatus} from "../../../core-model/gen/streampipes-model";
 import {FormGroup} from "@angular/forms";
 import {DialogRef} from "../../../core-ui/dialog/base-dialog/dialog-ref";
 import {MatChipInputEvent} from "@angular/material/chips";
 import {COMMA, ENTER} from "@angular/cdk/keycodes";
+import {NodeService} from "../../../platform-services/apis/node.service";
+import {PipelineStatusDialogComponent} from "../../../pipelines/dialog/pipeline-status/pipeline-status-dialog.component";
+import {PanelType} from "../../../core-ui/dialog/base-dialog/base-dialog.model";
+import {DialogService} from "../../../core-ui/dialog/base-dialog/base-dialog.service";
+import {MatSnackBar} from "@angular/material/snack-bar";
 
 export interface Fruit {
   name: string;
@@ -38,7 +43,7 @@ export class NodeConfigurationDetailsComponent implements OnInit {
   saving: boolean = false;
   saved: boolean = false;
   advancedSettings: boolean;
-
+  errorMessage: string = '';
   visible = true;
   selectable = true;
   removable = true;
@@ -49,7 +54,9 @@ export class NodeConfigurationDetailsComponent implements OnInit {
   @Input()
   node: NodeInfoDescription;
 
-  constructor(private dialogRef: DialogRef<NodeConfigurationDetailsComponent>) {
+  constructor(private nodeService: NodeService,
+              private dialogRef: DialogRef<NodeConfigurationDetailsComponent>,
+              private _snackBar: MatSnackBar) {
   }
 
   ngOnInit(): void {
@@ -58,13 +65,32 @@ export class NodeConfigurationDetailsComponent implements OnInit {
   }
 
   updateNodeInfo() {
-
+    let updateRequest;
+    this.node.staticNodeMetadata.locationTags = this.tmpTags;
+    updateRequest = this.nodeService.updateNodeState(this.node);
+
+    updateRequest
+        .subscribe(statusMessage => {
+          if (statusMessage.success) {
+            this.openSnackBar("Node successfully updated");
+            this.hide();
+          } else {
+            this.openSnackBar("Node not updated")
+          }
+        });
   }
 
   hide() {
     this.dialogRef.close();
   }
 
+  openSnackBar(message: string) {
+    this._snackBar.open(message, "close", {
+      duration: 3000,
+    });
+  }
+
+
   add(event: MatChipInputEvent): void {
     const input = event.input;
     const value = event.value;