You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@streampipes.apache.org by wi...@apache.org on 2021/01/12 12:12:41 UTC

[incubator-streampipes] 01/02: [WIP] initial work on resource requirement defition in SDK and core

This is an automated email from the ASF dual-hosted git repository.

wiener pushed a commit to branch edge-extensions
in repository https://gitbox.apache.org/repos/asf/incubator-streampipes.git

commit 732915a4d0ae68fc5bd10cb6dcaf5c47ef2c0c84
Author: Patrick Wiener <wi...@fzi.de>
AuthorDate: Sun Jan 10 00:37:27 2021 +0100

    [WIP] initial work on resource requirement defition in SDK and core
---
 .../api/AbstractPipelineElementResource.java       |   1 -
 .../model/base/ConsumableStreamPipesEntity.java    |  19 ++++
 .../model/base/InvocableStreamPipesEntity.java     |  17 +++
 .../model/graph/DataProcessorInvocation.java       |   4 +
 .../model/graph/DataSinkInvocation.java            |   1 +
 .../model/graph/PipelineElementContainer.java      |   9 ++
 .../model/message/NotificationType.java            |   3 +
 .../model/node/NodeInfoDescription.java            |   6 +-
 .../streampipes/model/node/meta/GeoLocation.java   |   1 +
 .../GeoLocation.java => resource/Hardware.java}    |  41 +++----
 .../NodeResourceRequirement.java}                  |  47 +++-----
 .../org/apache/streampipes/model/util/Cloner.java  |  18 ++-
 .../controller/container/NodeControllerInit.java   |  36 +++---
 .../api/DataProcessorPipelineElementResource.java  |  14 +--
 .../api/DataSinkPipelineElementResource.java       |  14 +--
 .../container/api/InfoStatusResource.java          |  16 +++
 .../container/api/InvocableEntityResource.java     |  36 +++---
 .../api/NodeControllerResourceConfig.java          |   3 +-
 .../container/management/node/NodeManager.java     |  99 ++++++++++++----
 .../management/pe/InvocableElementManager.java     |   2 +-
 ...ifeCycle.java => PipelineElementLifeCycle.java} |   2 +-
 .../manager/node/AbstractClusterManager.java       | 124 +++++++++++++++++++++
 .../manager/node/NodeClusterManager.java           | 107 ++++++++++--------
 .../streampipes/manager/operations/Operations.java |  42 -------
 .../manager/setup/CouchDbInstallationStep.java     |   3 +-
 .../verification/extractor/TypeExtractor.java      |  33 ++++--
 .../org/apache/streampipes/rest/api/INode.java     |   2 +
 .../org/apache/streampipes/rest/impl/Node.java     |  49 +++++---
 .../impl/nouser/PipelineElementImportNoUser.java   |   2 +-
 .../builder/AbstractProcessingElementBuilder.java  |  19 +++-
 .../sdk/builder/ProcessingElementBuilder.java      |   1 +
 .../sdk/builder/ResourceRequirementsBuilder.java   |  30 +++--
 .../sdk/helpers/CollectedResourceRequirements.java |  23 ++++
 ...irements.java => NodeResourceRequirements.java} |  10 +-
 .../serializers/json/GsonSerializer.java           |   2 +
 .../jsonld/CustomAnnotationProvider.java           |   4 +-
 .../streampipes/storage/api/INodeInfoStorage.java  |   4 +
 .../storage/couchdb/impl/NodeInfoStorageImpl.java  |  28 ++++-
 .../apache/streampipes/vocabulary/StreamPipes.java |   5 +
 .../node-configuration.component.ts                |   8 +-
 ui/src/app/core-model/gen/streampipes-model.ts     |  56 ++++++++--
 .../save-pipeline/save-pipeline.component.ts       |   4 +-
 .../pipeline-overview.component.html               |   2 +-
 ui/src/app/platform-services/apis/node.service.ts  |  14 +++
 44 files changed, 683 insertions(+), 278 deletions(-)

diff --git a/streampipes-container/src/main/java/org/apache/streampipes/container/api/AbstractPipelineElementResource.java b/streampipes-container/src/main/java/org/apache/streampipes/container/api/AbstractPipelineElementResource.java
index 1797897..ba6f338 100644
--- a/streampipes-container/src/main/java/org/apache/streampipes/container/api/AbstractPipelineElementResource.java
+++ b/streampipes-container/src/main/java/org/apache/streampipes/container/api/AbstractPipelineElementResource.java
@@ -148,7 +148,6 @@ public abstract class AbstractPipelineElementResource<D extends Declarer<?>> {
       String originalId = desc.getElementId();
       String uri = DeclarersSingleton.getInstance().getBaseUri() + type + desc.getElementId();
       desc.setElementId(uri);
-      desc.setElementId(uri);
 
       // TODO remove after full internationalization support has been implemented
       if (desc.isIncludesLocales()) {
diff --git a/streampipes-model/src/main/java/org/apache/streampipes/model/base/ConsumableStreamPipesEntity.java b/streampipes-model/src/main/java/org/apache/streampipes/model/base/ConsumableStreamPipesEntity.java
index 6faf938..f7936fb 100644
--- a/streampipes-model/src/main/java/org/apache/streampipes/model/base/ConsumableStreamPipesEntity.java
+++ b/streampipes-model/src/main/java/org/apache/streampipes/model/base/ConsumableStreamPipesEntity.java
@@ -21,6 +21,7 @@ package org.apache.streampipes.model.base;
 import io.fogsy.empire.annotations.RdfProperty;
 import org.apache.streampipes.model.SpDataStream;
 import org.apache.streampipes.model.grounding.EventGrounding;
+import org.apache.streampipes.model.resource.NodeResourceRequirement;
 import org.apache.streampipes.model.staticproperty.StaticProperty;
 import org.apache.streampipes.model.util.Cloner;
 import org.apache.streampipes.vocabulary.StreamPipes;
@@ -46,6 +47,11 @@ public abstract class ConsumableStreamPipesEntity extends NamedStreamPipesEntity
   @RdfProperty(StreamPipes.HAS_STATIC_PROPERTY)
   protected List<StaticProperty> staticProperties;
 
+  @OneToMany(fetch = FetchType.EAGER,
+          cascade = {CascadeType.ALL})
+  @RdfProperty(StreamPipes.HAS_NODE_RESOURCE_PROPERTY)
+  protected List<NodeResourceRequirement> resourceRequirements;
+
   @OneToOne(fetch = FetchType.EAGER,
           cascade = {CascadeType.ALL})
   @RdfProperty(StreamPipes.SUPPORTED_GROUNDING)
@@ -64,12 +70,14 @@ public abstract class ConsumableStreamPipesEntity extends NamedStreamPipesEntity
     super();
     this.spDataStreams = new ArrayList<>();
     this.staticProperties = new ArrayList<>();
+    this.resourceRequirements = new ArrayList<>();
   }
 
   public ConsumableStreamPipesEntity(String uri, String name, String description, String iconUrl) {
     super(uri, name, description, iconUrl);
     this.spDataStreams = new ArrayList<>();
     this.staticProperties = new ArrayList<>();
+    this.resourceRequirements = new ArrayList<>();
   }
 
   public ConsumableStreamPipesEntity(ConsumableStreamPipesEntity other) {
@@ -84,6 +92,9 @@ public abstract class ConsumableStreamPipesEntity extends NamedStreamPipesEntity
     if (other.getSupportedGrounding() != null) {
       this.supportedGrounding = new EventGrounding(other.getSupportedGrounding());
     }
+    if (other.getResourceRequirements() != null) {
+      this.resourceRequirements = new Cloner().resourceRequirements(other.getResourceRequirements());
+    }
   }
 
   public List<SpDataStream> getSpDataStreams() {
@@ -137,4 +148,12 @@ public abstract class ConsumableStreamPipesEntity extends NamedStreamPipesEntity
   public void setElementEndpointServiceName(String elementEndpointServiceName) {
     this.elementEndpointServiceName = elementEndpointServiceName;
   }
+
+  public List<NodeResourceRequirement> getResourceRequirements() {
+    return resourceRequirements;
+  }
+
+  public void setResourceRequirements(List<NodeResourceRequirement> resourceRequirements) {
+    this.resourceRequirements = resourceRequirements;
+  }
 }
diff --git a/streampipes-model/src/main/java/org/apache/streampipes/model/base/InvocableStreamPipesEntity.java b/streampipes-model/src/main/java/org/apache/streampipes/model/base/InvocableStreamPipesEntity.java
index 8d38c28..457bfe1 100644
--- a/streampipes-model/src/main/java/org/apache/streampipes/model/base/InvocableStreamPipesEntity.java
+++ b/streampipes-model/src/main/java/org/apache/streampipes/model/base/InvocableStreamPipesEntity.java
@@ -24,6 +24,7 @@ import org.apache.streampipes.logging.api.Logger;
 import org.apache.streampipes.model.SpDataStream;
 import org.apache.streampipes.model.grounding.EventGrounding;
 import org.apache.streampipes.model.monitoring.ElementStatusInfoSettings;
+import org.apache.streampipes.model.resource.NodeResourceRequirement;
 import org.apache.streampipes.model.staticproperty.StaticProperty;
 import org.apache.streampipes.model.util.Cloner;
 import org.apache.streampipes.vocabulary.StreamPipes;
@@ -72,6 +73,11 @@ public abstract class InvocableStreamPipesEntity extends NamedStreamPipesEntity
   @RdfProperty(StreamPipes.REQUIRES_STREAM)
   private List<SpDataStream> streamRequirements;
 
+  @OneToMany(fetch = FetchType.EAGER,
+          cascade = {CascadeType.ALL})
+  @RdfProperty(StreamPipes.REQUIRES_RESOURCES)
+  protected List<NodeResourceRequirement> resourceRequirements;
+
   @RdfProperty(StreamPipes.ELEMENT_ENDPOINT_HOSTNAME)
   private String elementEndpointHostname;
 
@@ -127,6 +133,9 @@ public abstract class InvocableStreamPipesEntity extends NamedStreamPipesEntity
     if (other.getSupportedGrounding() != null) {
       this.supportedGrounding = new EventGrounding(other.getSupportedGrounding());
     }
+    if (other.getResourceRequirements() != null) {
+      this.resourceRequirements = new Cloner().resourceRequirements(other.getResourceRequirements());
+    }
   }
 
   public InvocableStreamPipesEntity(String uri, String name, String description, String iconUrl) {
@@ -279,4 +288,12 @@ public abstract class InvocableStreamPipesEntity extends NamedStreamPipesEntity
   public void setElementEndpointServiceName(String elementEndpointServiceName) {
     this.elementEndpointServiceName = elementEndpointServiceName;
   }
+
+  public List<NodeResourceRequirement> getResourceRequirements() {
+    return resourceRequirements;
+  }
+
+  public void setResourceRequirements(List<NodeResourceRequirement> resourceRequirements) {
+    this.resourceRequirements = resourceRequirements;
+  }
 }
diff --git a/streampipes-model/src/main/java/org/apache/streampipes/model/graph/DataProcessorInvocation.java b/streampipes-model/src/main/java/org/apache/streampipes/model/graph/DataProcessorInvocation.java
index 016f3c7..5c3236b 100644
--- a/streampipes-model/src/main/java/org/apache/streampipes/model/graph/DataProcessorInvocation.java
+++ b/streampipes-model/src/main/java/org/apache/streampipes/model/graph/DataProcessorInvocation.java
@@ -86,6 +86,7 @@ public class DataProcessorInvocation extends InvocableStreamPipesEntity implemen
 
     this.setOutputStreamRelays(sepa.getOutputStreamRelays());
     this.setEventRelayStrategy(sepa.getEventRelayStrategy());
+    this.setResourceRequirements(sepa.getResourceRequirements());
 
     //this.setUri(belongsTo +"/" +getElementId());
   }
@@ -102,6 +103,9 @@ public class DataProcessorInvocation extends InvocableStreamPipesEntity implemen
     this.pathName = other.getPathName();
     this.eventRelayStrategy = other.getEventRelayStrategy();
     this.category = new Cloner().epaTypes(other.getCategory());
+    if (other.getResourceRequirements() != null) {
+      this.resourceRequirements = new Cloner().resourceRequirements(other.getResourceRequirements());
+    }
   }
 
   public DataProcessorInvocation(DataProcessorDescription sepa, String domId) {
diff --git a/streampipes-model/src/main/java/org/apache/streampipes/model/graph/DataSinkInvocation.java b/streampipes-model/src/main/java/org/apache/streampipes/model/graph/DataSinkInvocation.java
index 17449f7..b4ced40 100644
--- a/streampipes-model/src/main/java/org/apache/streampipes/model/graph/DataSinkInvocation.java
+++ b/streampipes-model/src/main/java/org/apache/streampipes/model/graph/DataSinkInvocation.java
@@ -65,6 +65,7 @@ public class DataSinkInvocation extends InvocableStreamPipesEntity {
     this.setElementEndpointServiceName(sec.getElementEndpointServiceName());
     this.setElementEndpointHostname(sec.getElementEndpointHostname());
     this.setElementEndpointPort(sec.getElementEndpointPort());
+    this.setResourceRequirements(sec.getResourceRequirements());
     //this.setUri(belongsTo +"/" +getElementId());
   }
 
diff --git a/streampipes-model/src/main/java/org/apache/streampipes/model/graph/PipelineElementContainer.java b/streampipes-model/src/main/java/org/apache/streampipes/model/graph/PipelineElementContainer.java
index 47561c8..4f031ea 100644
--- a/streampipes-model/src/main/java/org/apache/streampipes/model/graph/PipelineElementContainer.java
+++ b/streampipes-model/src/main/java/org/apache/streampipes/model/graph/PipelineElementContainer.java
@@ -39,6 +39,15 @@ public class PipelineElementContainer extends NamedStreamPipesEntity {
   @RdfProperty(StreamPipes.HAS_PIPELINE_ELEMENTS)
   private List<NamedStreamPipesEntity> pipelineElementDescriptions;
 
+  @RdfProperty(StreamPipes.DEPLOYMENT_TARGET_NODE_ID)
+  private String deploymentTargetNodeId;
+
+  @RdfProperty(StreamPipes.DEPLOYMENT_TARGET_NODE_HOSTNAME)
+  private String deploymentTargetNodeHostname;
+
+  @RdfProperty(StreamPipes.DEPLOYMENT_TARGET_NODE_PORT)
+  private Integer deploymentTargetNodePort;
+
   public PipelineElementContainer(PipelineElementContainer other) {
     super(other);
     if (other.getPipelineElementDescriptions() != null) {
diff --git a/streampipes-model/src/main/java/org/apache/streampipes/model/message/NotificationType.java b/streampipes-model/src/main/java/org/apache/streampipes/model/message/NotificationType.java
index e8720c4..4a1c81e 100644
--- a/streampipes-model/src/main/java/org/apache/streampipes/model/message/NotificationType.java
+++ b/streampipes-model/src/main/java/org/apache/streampipes/model/message/NotificationType.java
@@ -60,6 +60,9 @@ public enum NotificationType {
 	REMOVED_ACTION("Action removed", ""),
 	REMOVED_SOURCE("Source removed", ""),
 	REMOVED_SEPA("Sepa removed", ""),
+	REMOVED_NODE("Node removed", ""),
+
+	NODE_STATE_UPDATE_ERROR("Node state update error", "Node state could not be changed."),
 
 	ADDED_CONFIGURATION("Configuration added", ""), 
 	INSTALLATION_SUCCESSFUL("Installation successful", ""), 
diff --git a/streampipes-model/src/main/java/org/apache/streampipes/model/node/NodeInfoDescription.java b/streampipes-model/src/main/java/org/apache/streampipes/model/node/NodeInfoDescription.java
index 1d87f86..a41db8c 100644
--- a/streampipes-model/src/main/java/org/apache/streampipes/model/node/NodeInfoDescription.java
+++ b/streampipes-model/src/main/java/org/apache/streampipes/model/node/NodeInfoDescription.java
@@ -30,12 +30,16 @@ import org.apache.streampipes.vocabulary.StreamPipes;
 
 import javax.persistence.*;
 import java.util.List;
+import java.util.UUID;
 
 @RdfsClass(StreamPipes.NODE_INFO_DESCRIPTION)
 @Entity
 @TsModel
 public class NodeInfoDescription extends UnnamedStreamPipesEntity {
 
+    private static final long serialVersionUID = 4294360613297596807L;
+    protected static final String prefix = "urn:streampipes.org:nid:";
+
     @JsonProperty("_id")
     private @SerializedName("_id") String id;
 
@@ -78,7 +82,7 @@ public class NodeInfoDescription extends UnnamedStreamPipesEntity {
     private List<String> supportedElements;
 
     public NodeInfoDescription() {
-        super();
+        super(prefix + UUID.randomUUID().toString());
     }
 
     public NodeInfoDescription(String elementId) {
diff --git a/streampipes-model/src/main/java/org/apache/streampipes/model/node/meta/GeoLocation.java b/streampipes-model/src/main/java/org/apache/streampipes/model/node/meta/GeoLocation.java
index 6d2aeef..2ec7053 100644
--- a/streampipes-model/src/main/java/org/apache/streampipes/model/node/meta/GeoLocation.java
+++ b/streampipes-model/src/main/java/org/apache/streampipes/model/node/meta/GeoLocation.java
@@ -21,6 +21,7 @@ import io.fogsy.empire.annotations.RdfProperty;
 import io.fogsy.empire.annotations.RdfsClass;
 import org.apache.streampipes.model.base.UnnamedStreamPipesEntity;
 import org.apache.streampipes.model.shared.annotation.TsModel;
+import org.apache.streampipes.vocabulary.Geo;
 import org.apache.streampipes.vocabulary.StreamPipes;
 
 import javax.persistence.Entity;
diff --git a/streampipes-model/src/main/java/org/apache/streampipes/model/node/meta/GeoLocation.java b/streampipes-model/src/main/java/org/apache/streampipes/model/resource/Hardware.java
similarity index 57%
copy from streampipes-model/src/main/java/org/apache/streampipes/model/node/meta/GeoLocation.java
copy to streampipes-model/src/main/java/org/apache/streampipes/model/resource/Hardware.java
index 6d2aeef..1d88228 100644
--- a/streampipes-model/src/main/java/org/apache/streampipes/model/node/meta/GeoLocation.java
+++ b/streampipes-model/src/main/java/org/apache/streampipes/model/resource/Hardware.java
@@ -15,50 +15,43 @@
  * limitations under the License.
  *
  */
-package org.apache.streampipes.model.node.meta;
+package org.apache.streampipes.model.resource;
+
 
 import io.fogsy.empire.annotations.RdfProperty;
 import io.fogsy.empire.annotations.RdfsClass;
-import org.apache.streampipes.model.base.UnnamedStreamPipesEntity;
 import org.apache.streampipes.model.shared.annotation.TsModel;
 import org.apache.streampipes.vocabulary.StreamPipes;
 
 import javax.persistence.Entity;
 
-
-@RdfsClass(StreamPipes.GEO_LOCATION)
+@RdfsClass(StreamPipes.HARDWARE_REQUIREMENT)
 @Entity
 @TsModel
-public class GeoLocation extends UnnamedStreamPipesEntity {
-
-    @RdfProperty(StreamPipes.GEO_LOCATION_LATITUDE)
-    private double latitude;
+public class Hardware extends NodeResourceRequirement {
 
-    @RdfProperty(StreamPipes.GEO_LOCATION_LONGITUDE)
-    private double longitude;
+    @RdfProperty(StreamPipes.HAS_GPU_REQUIREMENT)
+    private boolean gpu;
 
-    public GeoLocation() {
+    public Hardware() {
         super();
     }
 
-    public GeoLocation(double latitude, double longitude) {
-        this.latitude = latitude;
-        this.longitude = longitude;
-    }
-
-    public double getLatitude() {
-        return latitude;
+    public Hardware(Hardware other) {
+        super(other);
+        this.gpu = other.isGpu();
     }
 
-    public void setLatitude(double latitude) {
-        this.latitude = latitude;
+    public Hardware(Hardware other, boolean gpu) {
+        super(other);
+        this.gpu = gpu;
     }
 
-    public double getLongitude() {
-        return longitude;
+    public boolean isGpu() {
+        return gpu;
     }
 
-    public void setLongitude(double longitude) {
-        this.longitude = longitude;
+    public void setGpu(boolean gpu) {
+        this.gpu = gpu;
     }
 }
diff --git a/streampipes-model/src/main/java/org/apache/streampipes/model/node/meta/GeoLocation.java b/streampipes-model/src/main/java/org/apache/streampipes/model/resource/NodeResourceRequirement.java
similarity index 56%
copy from streampipes-model/src/main/java/org/apache/streampipes/model/node/meta/GeoLocation.java
copy to streampipes-model/src/main/java/org/apache/streampipes/model/resource/NodeResourceRequirement.java
index 6d2aeef..8847434 100644
--- a/streampipes-model/src/main/java/org/apache/streampipes/model/node/meta/GeoLocation.java
+++ b/streampipes-model/src/main/java/org/apache/streampipes/model/resource/NodeResourceRequirement.java
@@ -15,50 +15,35 @@
  * limitations under the License.
  *
  */
-package org.apache.streampipes.model.node.meta;
+package org.apache.streampipes.model.resource;
 
-import io.fogsy.empire.annotations.RdfProperty;
+import com.fasterxml.jackson.annotation.JsonSubTypes;
 import io.fogsy.empire.annotations.RdfsClass;
 import org.apache.streampipes.model.base.UnnamedStreamPipesEntity;
 import org.apache.streampipes.model.shared.annotation.TsModel;
 import org.apache.streampipes.vocabulary.StreamPipes;
 
-import javax.persistence.Entity;
 
+import javax.persistence.Entity;
+import javax.persistence.MappedSuperclass;
+import java.util.UUID;
 
-@RdfsClass(StreamPipes.GEO_LOCATION)
+@RdfsClass(StreamPipes.NODE_RESOURCE_REQUIREMENT)
 @Entity
 @TsModel
-public class GeoLocation extends UnnamedStreamPipesEntity {
-
-    @RdfProperty(StreamPipes.GEO_LOCATION_LATITUDE)
-    private double latitude;
-
-    @RdfProperty(StreamPipes.GEO_LOCATION_LONGITUDE)
-    private double longitude;
-
-    public GeoLocation() {
-        super();
-    }
+@JsonSubTypes({
+        @JsonSubTypes.Type(Hardware.class)
+})
+public abstract class NodeResourceRequirement extends UnnamedStreamPipesEntity {
 
-    public GeoLocation(double latitude, double longitude) {
-        this.latitude = latitude;
-        this.longitude = longitude;
-    }
-
-    public double getLatitude() {
-        return latitude;
-    }
-
-    public void setLatitude(double latitude) {
-        this.latitude = latitude;
-    }
+    private static final long serialVersionUID = -8700750792058131323L;
+    protected static final String prefix = "urn:streampipes.org:nrr:";
 
-    public double getLongitude() {
-        return longitude;
+    public NodeResourceRequirement() {
+        super(prefix + UUID.randomUUID().toString());
     }
 
-    public void setLongitude(double longitude) {
-        this.longitude = longitude;
+    public NodeResourceRequirement(NodeResourceRequirement other) {
+        super(other);
     }
 }
diff --git a/streampipes-model/src/main/java/org/apache/streampipes/model/util/Cloner.java b/streampipes-model/src/main/java/org/apache/streampipes/model/util/Cloner.java
index c02d0ff..26c6642 100644
--- a/streampipes-model/src/main/java/org/apache/streampipes/model/util/Cloner.java
+++ b/streampipes-model/src/main/java/org/apache/streampipes/model/util/Cloner.java
@@ -20,8 +20,9 @@ package org.apache.streampipes.model.util;
 
 import org.apache.streampipes.model.SpDataStreamRelay;
 import org.apache.streampipes.model.grounding.*;
-import org.apache.streampipes.model.node.NodeBrokerDescription;
 import org.apache.streampipes.model.output.*;
+import org.apache.streampipes.model.resource.Hardware;
+import org.apache.streampipes.model.resource.NodeResourceRequirement;
 import org.apache.streampipes.model.staticproperty.*;
 import org.apache.streampipes.model.grounding.MqttTransportProtocol;
 import org.slf4j.Logger;
@@ -334,4 +335,19 @@ public class Cloner {
       return ad;
     }
   }
+
+  public NodeResourceRequirement cloneResourceRequirements(NodeResourceRequirement o) {
+    if (o instanceof Hardware) {
+      return new Hardware((Hardware) o);
+    } else {
+      LOG.error("Could not clone node resource requirement of type: " + o.getClass().getCanonicalName());
+      return o;
+    }
+  }
+
+  public List<NodeResourceRequirement> resourceRequirements(List<NodeResourceRequirement> nrp) {
+    return nrp.stream()
+            .map(this::cloneResourceRequirements)
+            .collect(Collectors.toList());
+  }
 }
diff --git a/streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/NodeControllerInit.java b/streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/NodeControllerInit.java
index 48e2e88..ca50136 100644
--- a/streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/NodeControllerInit.java
+++ b/streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/NodeControllerInit.java
@@ -17,6 +17,7 @@
  */
 package org.apache.streampipes.node.controller.container;
 
+import org.apache.streampipes.commons.exceptions.SpRuntimeException;
 import org.apache.streampipes.container.util.ConsulUtil;
 import org.apache.streampipes.node.controller.container.management.orchestrator.docker.DockerContainerManager;
 import org.apache.streampipes.node.controller.container.api.NodeControllerResourceConfig;
@@ -50,26 +51,31 @@ public class NodeControllerInit {
         app.setDefaultProperties(Collections.singletonMap("server.port", conf.getNodeControllerPort()));
         app.run();
 
-        LOG.info("Load node info");
+        LOG.info("Load node info description");
         NodeManager.getInstance().init();
 
-        LOG.info("Start Node resource manager");
-        ResourceManager.getInstance().run();
+        LOG.info("Register node controller at backend");
+        boolean success = NodeManager.getInstance().register();
 
-        if (!"true".equals(System.getenv("SP_DEBUG"))) {
-            LOG.info("Auto-deploy StreamPipes node container");
-            DockerContainerManager.getInstance().init();
+        if (success) {
+            LOG.info("Start resource manager");
+            ResourceManager.getInstance().run();
 
-            LOG.info("Start Janitor manager");
-            JanitorManager.getInstance().run();
-        }
+            if (!"true".equals(System.getenv("SP_DEBUG"))) {
+                LOG.info("Auto-deploy extensions container");
+                DockerContainerManager.getInstance().init();
 
-        // registration with consul here
-        ConsulUtil.registerNodeService(
-                conf.getNodeServiceId(),
-                conf.getNodeHostName(),
-                conf.getNodeControllerPort()
-        );
+                LOG.info("Start janitor manager");
+                JanitorManager.getInstance().run();
+            }
+
+            // registration with consul here
+            ConsulUtil.registerNodeService(
+                    conf.getNodeServiceId(),
+                    conf.getNodeHostName(),
+                    conf.getNodeControllerPort()
+            );
+        } else throw new SpRuntimeException("Could not register node controller at backend");
     }
 
     @PreDestroy
diff --git a/streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/api/DataProcessorPipelineElementResource.java b/streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/api/DataProcessorPipelineElementResource.java
index 6e5ebb0..bb186e8 100644
--- a/streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/api/DataProcessorPipelineElementResource.java
+++ b/streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/api/DataProcessorPipelineElementResource.java
@@ -21,10 +21,10 @@ import org.apache.streampipes.model.graph.DataProcessorInvocation;
 
 import javax.ws.rs.Path;
 
-@Path("/api/v2/node/element/sepa")
-public class DataProcessorPipelineElementResource extends InvocableEntityResource<DataProcessorInvocation> {
-
-    public DataProcessorPipelineElementResource() {
-        super(DataProcessorInvocation.class);
-    }
-}
+//@Path("/api/v2/node/element/sepa")
+//public class DataProcessorPipelineElementResource extends InvocableEntityResource<DataProcessorInvocation> {
+//
+//    public DataProcessorPipelineElementResource() {
+//        super(DataProcessorInvocation.class);
+//    }
+//}
diff --git a/streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/api/DataSinkPipelineElementResource.java b/streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/api/DataSinkPipelineElementResource.java
index af99b25..c404896 100644
--- a/streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/api/DataSinkPipelineElementResource.java
+++ b/streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/api/DataSinkPipelineElementResource.java
@@ -21,10 +21,10 @@ import org.apache.streampipes.model.graph.DataSinkInvocation;
 
 import javax.ws.rs.Path;
 
-@Path("/api/v2/node/element/sec")
-public class DataSinkPipelineElementResource extends InvocableEntityResource<DataSinkInvocation> {
-
-    public DataSinkPipelineElementResource() {
-        super(DataSinkInvocation.class);
-    }
-}
+//@Path("/api/v2/node/element/sec")
+//public class DataSinkPipelineElementResource extends InvocableEntityResource<DataSinkInvocation> {
+//
+//    public DataSinkPipelineElementResource() {
+//        super(DataSinkInvocation.class);
+//    }
+//}
diff --git a/streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/api/InfoStatusResource.java b/streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/api/InfoStatusResource.java
index b3a01f2..7e7b36a 100644
--- a/streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/api/InfoStatusResource.java
+++ b/streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/api/InfoStatusResource.java
@@ -30,6 +30,9 @@ import javax.ws.rs.core.Response;
 @Path("/api/v2/node/info")
 public class InfoStatusResource extends AbstractResource {
 
+    private static final String ACTIVATE = "activate";
+    private static final String DEACTIVATE = "deactivate";
+
     @GET
     @Produces(MediaType.APPLICATION_JSON)
     public Response getNodeInfo() {
@@ -43,6 +46,19 @@ public class InfoStatusResource extends AbstractResource {
         return ok(NodeManager.getInstance().updateNodeInfoDescription(desc));
     }
 
+    @POST
+    @Path("{action}")
+    @JacksonSerialized
+    @Consumes(MediaType.APPLICATION_JSON)
+    @Produces(MediaType.APPLICATION_JSON)
+    public Response activateNode(@PathParam("action") String action) {
+        if (action.equals(ACTIVATE)) {
+            return ok(NodeManager.getInstance().activate());
+        } else if (action.equals(DEACTIVATE)) {
+            return ok(NodeManager.getInstance().deactivate());
+        } else return fail();
+    }
+
     @GET
     @Path("/resources")
     @Produces(MediaType.APPLICATION_JSON)
diff --git a/streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/api/InvocableEntityResource.java b/streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/api/InvocableEntityResource.java
index 0c71b57..2b779c5 100644
--- a/streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/api/InvocableEntityResource.java
+++ b/streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/api/InvocableEntityResource.java
@@ -23,10 +23,10 @@ import org.apache.streampipes.container.model.node.InvocableRegistration;
 import org.apache.streampipes.model.Response;
 import org.apache.streampipes.model.base.InvocableStreamPipesEntity;
 import org.apache.streampipes.model.graph.DataProcessorInvocation;
-import org.apache.streampipes.model.graph.DataSinkInvocation;
 import org.apache.streampipes.node.controller.container.management.pe.InvocableElementManager;
 import org.apache.streampipes.node.controller.container.management.pe.RunningInvocableInstances;
 import org.apache.streampipes.node.controller.container.management.relay.DataStreamRelayManager;
+import org.apache.streampipes.rest.shared.annotation.JacksonSerialized;
 import org.apache.streampipes.serializers.json.JacksonSerializer;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -34,31 +34,30 @@ import org.slf4j.LoggerFactory;
 import javax.ws.rs.*;
 import javax.ws.rs.core.MediaType;
 
-public abstract class InvocableEntityResource<I extends InvocableStreamPipesEntity> extends AbstractResource {
+@Path("/api/v2/node/element")
+public class InvocableEntityResource extends AbstractResource {
     private static final Logger LOG = LoggerFactory.getLogger(InvocableEntityResource.class.getCanonicalName());
-
     private static final String SLASH = "/";
-
-    protected Class<I> clazz;
-
-    public InvocableEntityResource(Class<I> clazz) {
-        this.clazz = clazz;
-    }
+    private static final String DATA_PROCESSOR_PREFIX = "sepa";
+    private static final String DATA_SINK_PREFIX = "sec";
 
     @POST
     @Path("/register")
+    @JacksonSerialized
+    @Consumes(MediaType.APPLICATION_JSON)
     public void register(InvocableRegistration registration) {
         InvocableElementManager.getInstance().register(registration);
     }
 
     @POST
-    @Path("{elementId}")
+    @Path("{identifier}/{elementId}")
     @Consumes(MediaType.APPLICATION_JSON)
     @Produces(MediaType.APPLICATION_JSON)
-    public javax.ws.rs.core.Response invoke(@PathParam("elementId") String elementId, I graph) {
+    public javax.ws.rs.core.Response invoke(@PathParam("identifier") String identifier,
+                                            @PathParam("elementId") String elementId, InvocableStreamPipesEntity graph) {
         String endpoint;
 
-        if (graph instanceof DataProcessorInvocation) {
+        if (identifier.equals(DATA_PROCESSOR_PREFIX)) {
             endpoint = graph.getBelongsTo();
             DataStreamRelayManager.getInstance().startPipelineElementDataStreamRelay((DataProcessorInvocation) graph);
             Response resp = InvocableElementManager.getInstance().invoke(endpoint, toJson(graph));
@@ -69,7 +68,7 @@ public abstract class InvocableEntityResource<I extends InvocableStreamPipesEnti
         }
         // Currently no data sinks are registered at node controller. If we, at some point, want to also run data
         // sinks on edge nodes we need to register there Declarer at the node controller one startup.
-        else if (graph instanceof DataSinkInvocation) {
+        else if (identifier.equals(DATA_SINK_PREFIX)) {
             endpoint = graph.getBelongsTo();
             Response resp = InvocableElementManager.getInstance().invoke(endpoint, toJson(graph));
             if (resp.isSuccess()) {
@@ -82,12 +81,11 @@ public abstract class InvocableEntityResource<I extends InvocableStreamPipesEnti
     }
 
     @DELETE
-    @Path("{elementId}/{runningInstanceId}")
+    @Path("{identifier}/{elementId}/{runningInstanceId}")
     @Produces(MediaType.APPLICATION_JSON)
-    public javax.ws.rs.core.Response detach(@PathParam("elementId") String elementId,
-                         @PathParam("runningInstanceId") String runningInstanceId) {
-        LOG.info("receive stop request elementId={}, runningInstanceId={}", elementId, runningInstanceId);
-
+    public javax.ws.rs.core.Response detach(@PathParam("identifier") String identifier,
+                                            @PathParam("elementId") String elementId,
+                                            @PathParam("runningInstanceId") String runningInstanceId) {
         String endpoint = RunningInvocableInstances.INSTANCE.get(runningInstanceId).getBelongsTo();
         Response resp = InvocableElementManager.getInstance().detach(endpoint + SLASH + runningInstanceId);
         RunningInvocableInstances.INSTANCE.remove(runningInstanceId);
@@ -96,7 +94,7 @@ public abstract class InvocableEntityResource<I extends InvocableStreamPipesEnti
         return ok(resp);
     }
 
-    private String toJson(I graph) {
+    private String toJson(InvocableStreamPipesEntity graph) {
         try {
             return JacksonSerializer.getObjectMapper().writeValueAsString(graph);
         } catch (JsonProcessingException e) {
diff --git a/streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/api/NodeControllerResourceConfig.java b/streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/api/NodeControllerResourceConfig.java
index 8db6aba..5fcd3dc 100644
--- a/streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/api/NodeControllerResourceConfig.java
+++ b/streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/api/NodeControllerResourceConfig.java
@@ -26,8 +26,7 @@ public class NodeControllerResourceConfig extends ResourceConfig {
     public NodeControllerResourceConfig() {
         register(HealthCheckResource.class);
         register(InfoStatusResource.class);
-        register(DataProcessorPipelineElementResource.class);
-        register(DataSinkPipelineElementResource.class);
+        register(InvocableEntityResource.class);
         register(AdapterDataStreamRelayResource.class);
         register(ConnectResource.class);
         register(ContainerResource.class);
diff --git a/streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/management/node/NodeManager.java b/streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/management/node/NodeManager.java
index c468440..a597a03 100644
--- a/streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/management/node/NodeManager.java
+++ b/streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/management/node/NodeManager.java
@@ -18,7 +18,12 @@
 package org.apache.streampipes.node.controller.container.management.node;
 
 import org.apache.http.client.fluent.Request;
+import org.apache.http.client.fluent.Response;
 import org.apache.http.entity.ContentType;
+import org.apache.streampipes.model.message.Message;
+import org.apache.streampipes.model.message.NotificationType;
+import org.apache.streampipes.model.message.Notifications;
+import org.apache.streampipes.model.message.SuccessMessage;
 import org.apache.streampipes.model.node.*;
 import org.apache.streampipes.model.node.meta.GeoLocation;
 import org.apache.streampipes.node.controller.container.config.NodeControllerConfig;
@@ -31,6 +36,14 @@ import java.io.IOException;
 public class NodeManager {
     private static final Logger LOG = LoggerFactory.getLogger(NodeManager.class.getCanonicalName());
 
+    private static final String PROTOCOL = "http://";
+    private static final String SLASH = "/";
+    private static final String COLON = ":";
+    private static final String BACKEND_BASE_ROUTE = "/streampipes-backend";
+    private static final String NODE_REGISTRATION_ROUTE = "/api/v2/users/admin@streampipes.org/nodes";
+    private static final long RETRY_INTERVAL_MS = 5000;
+    private static final int CONNECT_TIMEOUT_MS = 10000;
+
     private NodeInfoDescription nodeInfo = new NodeInfoDescription();
 
     private static NodeManager instance = null;
@@ -78,38 +91,76 @@ public class NodeManager {
                                 .build())
                         .build();
 
-        NodeManager.getInstance().add(nodeInfoDescription);
-
-        register(nodeInfoDescription);
+        add(nodeInfoDescription);
     }
 
-    private void register(NodeInfoDescription desc) {
-
-        String url =
-                "http://"
-                + NodeControllerConfig.INSTANCE.getBackendHost()
-                + ":"
-                + NodeControllerConfig.INSTANCE.getBackendPort()
-                + "/"
-                + "streampipes-backend/api/v2/users/admin@streampipes.org/nodes";
-
+    public boolean register() {
+        boolean connected = false;
         try {
-            String nodeInfoDescription = JacksonSerializer.getObjectMapper().writeValueAsString(desc);
-
-            Request.Post(url)
-                    .bodyString(nodeInfoDescription, ContentType.APPLICATION_JSON)
-                    .connectTimeout(1000)
-                    .socketTimeout(100000)
-                    .execute().returnContent().asString();
-
+            String body = JacksonSerializer.getObjectMapper().writeValueAsString(this.nodeInfo);
+            String endpoint = generateEndpoint();
+
+            LOG.info("Trying to register node at backend: " + endpoint);
+
+            while (!connected) {
+                connected = post(endpoint, body);
+                if (!connected) {
+                    LOG.info("Retrying in {} seconds", (RETRY_INTERVAL_MS / 1000));
+                    try {
+                        Thread.sleep(RETRY_INTERVAL_MS);
+                    } catch (InterruptedException e) {
+                        e.printStackTrace();
+                    }
+                }
+            }
+            LOG.info("Successfully registered node at backend");
         } catch (IOException e) {
-            LOG.info("Could not connect to " + url);
+            e.printStackTrace();
         }
+        return connected;
     }
 
-    public boolean updateNodeInfoDescription(NodeInfoDescription desc) {
+    public Message updateNodeInfoDescription(NodeInfoDescription desc) {
         LOG.info("Update node description for node controller: {}", desc.getNodeControllerId());
         this.nodeInfo = desc;
-        return true;
+        return Notifications.success(NotificationType.OPERATION_SUCCESS);
+    }
+
+    public Message activate() {
+        LOG.info("Deactivate node controller");
+        this.nodeInfo.setActive(true);
+        return Notifications.success(NotificationType.OPERATION_SUCCESS);
+    }
+
+    public Message deactivate() {
+        LOG.info("Activate node controller");
+        this.nodeInfo.setActive(false);
+        return Notifications.success(NotificationType.OPERATION_SUCCESS);
+    }
+
+    private boolean post(String endpoint, String body) throws IOException {
+        Response response = Request.Post(endpoint)
+                .bodyString(body, ContentType.APPLICATION_JSON)
+                .connectTimeout(CONNECT_TIMEOUT_MS)
+                .execute();
+        return handleResponse(response);
+    }
+
+    private boolean handleResponse(Response response) throws IOException {
+        String resp = response.returnContent().asString();
+        SuccessMessage message = JacksonSerializer
+                .getObjectMapper()
+                .readValue(resp, SuccessMessage.class);
+        return message.isSuccess();
+    }
+
+    private String generateEndpoint() {
+        return  PROTOCOL
+                + NodeControllerConfig.INSTANCE.getBackendHost()
+                + COLON
+                + NodeControllerConfig.INSTANCE.getBackendPort()
+                + SLASH
+                + BACKEND_BASE_ROUTE
+                + NODE_REGISTRATION_ROUTE;
     }
 }
diff --git a/streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/management/pe/InvocableElementManager.java b/streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/management/pe/InvocableElementManager.java
index 692f3b4..4db36b3 100644
--- a/streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/management/pe/InvocableElementManager.java
+++ b/streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/management/pe/InvocableElementManager.java
@@ -32,7 +32,7 @@ import org.slf4j.LoggerFactory;
 import java.io.IOException;
 import java.util.Collections;
 
-public class InvocableElementManager implements InvocableLifeCycle {
+public class InvocableElementManager implements PipelineElementLifeCycle {
 
     private static final Logger LOG =
             LoggerFactory.getLogger(InvocableElementManager.class.getCanonicalName());
diff --git a/streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/management/pe/InvocableLifeCycle.java b/streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/management/pe/PipelineElementLifeCycle.java
similarity index 96%
rename from streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/management/pe/InvocableLifeCycle.java
rename to streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/management/pe/PipelineElementLifeCycle.java
index 168d998..0ad32ee 100644
--- a/streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/management/pe/InvocableLifeCycle.java
+++ b/streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/management/pe/PipelineElementLifeCycle.java
@@ -20,7 +20,7 @@ package org.apache.streampipes.node.controller.container.management.pe;
 import org.apache.streampipes.container.model.node.InvocableRegistration;
 import org.apache.streampipes.model.Response;
 
-public interface InvocableLifeCycle {
+public interface PipelineElementLifeCycle {
 
     void register(InvocableRegistration registration);
 
diff --git a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/node/AbstractClusterManager.java b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/node/AbstractClusterManager.java
new file mode 100644
index 0000000..b5803d5
--- /dev/null
+++ b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/node/AbstractClusterManager.java
@@ -0,0 +1,124 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.apache.streampipes.manager.node;
+
+import org.apache.http.client.fluent.Request;
+import org.apache.http.entity.ContentType;
+import org.apache.streampipes.model.node.NodeInfoDescription;
+import org.apache.streampipes.serializers.json.JacksonSerializer;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+
+public abstract class AbstractClusterManager {
+    private static final Logger LOG = LoggerFactory.getLogger(AbstractClusterManager.class.getCanonicalName());
+
+    private static final String PROTOCOL = "http://";
+    private static final String SLASH = "/";
+    private static final String COLON = ":";
+    private static final long RETRY_INTERVAL_MS = 5000;
+    private static final Object BASE_NODE_CONTROLLER_INFO_ROUTE = "/api/v2/node/info";
+    private static final int CONNECT_TIMEOUT = 1000;
+
+    protected static boolean syncStateUpdateWithRemoteNodeController(NodeInfoDescription desc, boolean activate) {
+        boolean synced = false;
+        String url;
+        if (activate) {
+            url = generateEndpoint(desc, "/activate");
+        } else {
+            url = generateEndpoint(desc, "/deactivate");
+        }
+        LOG.info("Trying to sync state update with node controller=" + url);
+
+        boolean connected = false;
+        while (!connected) {
+            connected = post(url);
+            if (!connected) {
+                LOG.info("Retrying in {} seconds", (RETRY_INTERVAL_MS / 10000));
+                try {
+                    Thread.sleep(RETRY_INTERVAL_MS);
+                } catch (InterruptedException e) {
+                    e.printStackTrace();
+                }
+            }
+        }
+        synced = true;
+        return synced;
+    }
+
+    protected static boolean syncWithRemoteNodeController(NodeInfoDescription desc) {
+        boolean synced = false;
+        try {
+            String body = JacksonSerializer.getObjectMapper().writeValueAsString(desc);
+            String url = generateEndpoint(desc);
+            LOG.info("Trying to sync description updates with node controller=" + url);
+
+            boolean connected = false;
+            while (!connected) {
+                connected = put(url, body);
+                if (!connected) {
+                    LOG.info("Retrying in {} seconds", (RETRY_INTERVAL_MS / 10000));
+                    try {
+                        Thread.sleep(RETRY_INTERVAL_MS);
+                    } catch (InterruptedException e) {
+                        e.printStackTrace();
+                    }
+                }
+            }
+            synced = true;
+        } catch (IOException e) {
+            e.printStackTrace();
+        }
+        return synced;
+    }
+
+    protected static String generateEndpoint(NodeInfoDescription desc) {
+        return generateEndpoint(desc, "");
+    }
+
+    protected static String generateEndpoint(NodeInfoDescription desc, String subroute) {
+        return PROTOCOL + desc.getHostname() + COLON + desc.getPort() + BASE_NODE_CONTROLLER_INFO_ROUTE + subroute;
+    }
+
+    protected static boolean put(String url, String body) {
+        try {
+            Request.Put(url)
+                    .bodyString(body, ContentType.APPLICATION_JSON)
+                    .connectTimeout(CONNECT_TIMEOUT)
+                    .execute();
+            return true;
+        } catch (IOException e) {
+            e.printStackTrace();
+        }
+        return false;
+    }
+
+    protected static boolean post(String url) {
+        try {
+            Request.Post(url)
+                    .bodyString("{}", ContentType.APPLICATION_JSON)
+                    .connectTimeout(CONNECT_TIMEOUT)
+                    .execute();
+            return true;
+        } catch (IOException e) {
+            e.printStackTrace();
+        }
+        return false;
+    }
+}
diff --git a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/node/NodeClusterManager.java b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/node/NodeClusterManager.java
index d37e23a..3f58f2f 100644
--- a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/node/NodeClusterManager.java
+++ b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/node/NodeClusterManager.java
@@ -17,68 +17,81 @@
  */
 package org.apache.streampipes.manager.node;
 
-import com.fasterxml.jackson.core.JsonProcessingException;
-import org.apache.http.client.ClientProtocolException;
-import org.apache.http.client.fluent.Request;
-import org.apache.http.entity.ContentType;
+import org.apache.streampipes.model.message.Message;
+import org.apache.streampipes.model.message.Notifications;
 import org.apache.streampipes.model.node.NodeInfoDescription;
-import org.apache.streampipes.serializers.json.JacksonSerializer;
+import org.apache.streampipes.storage.management.StorageDispatcher;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.io.IOException;
+import java.util.List;
+import java.util.Optional;
 
-public enum NodeClusterManager {
-    INSTANCE;
+public class NodeClusterManager extends AbstractClusterManager {
 
+    private static final Logger LOG = LoggerFactory.getLogger(NodeClusterManager.class.getCanonicalName());
 
-    private static final Logger LOG =
-            LoggerFactory.getLogger(NodeClusterManager.class.getCanonicalName());
-
-
-    public boolean updateNodeInfoDescription(NodeInfoDescription desc) {
-        boolean successfullyUpdated = false;
-        try {
-            String body = JacksonSerializer.getObjectMapper().writeValueAsString(desc);
-            String url = makeNodeControllerEndpoint(desc);
+    public static List<NodeInfoDescription> getAvailableNodes() {
+        //return new AvailableNodesFetcher().fetchNodes();
+        return StorageDispatcher.INSTANCE.getNoSqlStore().getNodeStorage().getAllActiveNodes();
+    }
 
-            LOG.info("Trying to update description for node controller: " + url);
+    public static List<NodeInfoDescription> getAllNodes() {
+        //return new AvailableNodesFetcher().fetchNodes();
+        return StorageDispatcher.INSTANCE.getNoSqlStore().getNodeStorage().getAllNodes();
+    }
 
-            boolean connected = false;
-            while (!connected) {
-                connected = put(url, body);
+    public static Message updateNode(NodeInfoDescription desc) {
+        boolean successfullyUpdated = syncWithRemoteNodeController(desc);
+        if (successfullyUpdated) {
+            StorageDispatcher.INSTANCE.getNoSqlStore().getNodeStorage().updateNode(desc);
+            return Notifications.success("Node updated");
+        }
+        return Notifications.error("Could not update node");
+    }
 
-                if (!connected) {
-                    LOG.info("Retrying in 5 seconds");
-                    try {
-                        Thread.sleep(5000);
-                    } catch (InterruptedException e) {
-                        e.printStackTrace();
-                    }
-                }
-            }
-            successfullyUpdated = true;
-        } catch (IOException e) {
-            e.printStackTrace();
+    public static boolean deactivateNode(String nodeControllerId) {
+        Optional<NodeInfoDescription> storedNode =
+                StorageDispatcher.INSTANCE.getNoSqlStore().getNodeStorage().getNode(nodeControllerId);
+        boolean status = false;
+        if (storedNode.isPresent()) {
+            StorageDispatcher.INSTANCE.getNoSqlStore().getNodeStorage().deactivateNode(nodeControllerId);
+            status = syncStateUpdateWithRemoteNodeController(storedNode.get(), false);
         }
-        return successfullyUpdated;
+        return status;
     }
 
-    private String makeNodeControllerEndpoint(NodeInfoDescription desc) {
-        return "http://" + desc.getHostname() + ":" + desc.getPort() + "/api/v2/node/info";
+    public static boolean activateNode(String nodeControllerId) {
+        Optional<NodeInfoDescription> storedNode =
+                StorageDispatcher.INSTANCE.getNoSqlStore().getNodeStorage().getNode(nodeControllerId);
+        boolean status = false;
+        if (storedNode.isPresent()) {
+            StorageDispatcher.INSTANCE.getNoSqlStore().getNodeStorage().activateNode(nodeControllerId);
+            status = syncStateUpdateWithRemoteNodeController(storedNode.get(), true);
+        }
+        return status;
     }
 
-    private boolean put(String url, String body) {
-        try {
-            Request.Put(url)
-                    .bodyString(body, ContentType.APPLICATION_JSON)
-                    .connectTimeout(1000)
-                    .socketTimeout(100000)
-                    .execute();
-            return true;
-        } catch (IOException e) {
-            e.printStackTrace();
+    public static void addNode(NodeInfoDescription desc) {
+        List<NodeInfoDescription> allNodes =
+                StorageDispatcher.INSTANCE.getNoSqlStore().getNodeStorage().getAllNodes();
+
+        boolean alreadyRegistered = false;
+        if (allNodes.size() > 0) {
+            alreadyRegistered = allNodes.stream()
+                    .anyMatch(n -> n.getNodeControllerId().equals(desc.getNodeControllerId()));
+        }
+
+        if (!alreadyRegistered) {
+            LOG.info("New cluster node join registration request on from http://{}:{}", desc.getHostname(), desc.getPort());
+            StorageDispatcher.INSTANCE.getNoSqlStore().getNodeStorage().storeNode(desc);
+            LOG.info("New cluster node successfully joined http://{}:{}", desc.getHostname(), desc.getPort());
+        } else {
+            LOG.info("Re-joined cluster node from http://{}:{}", desc.getHostname(), desc.getPort());
         }
-        return false;
+    }
+
+    public static void deleteNode(String nodeControllerId) {
+        StorageDispatcher.INSTANCE.getNoSqlStore().getNodeStorage().deleteNode(nodeControllerId);
     }
 }
diff --git a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/operations/Operations.java b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/operations/Operations.java
index eb71a42..a3696a3 100644
--- a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/operations/Operations.java
+++ b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/operations/Operations.java
@@ -26,8 +26,6 @@ import org.apache.streampipes.manager.execution.http.PipelineExecutor;
 import org.apache.streampipes.manager.execution.http.PipelineStorageService;
 import org.apache.streampipes.manager.matching.DataSetGroundingSelector;
 import org.apache.streampipes.manager.matching.PipelineVerificationHandler;
-import org.apache.streampipes.manager.node.AvailableNodesFetcher;
-import org.apache.streampipes.manager.node.NodeClusterManager;
 import org.apache.streampipes.manager.recommender.ElementRecommender;
 import org.apache.streampipes.manager.remote.ContainerProvidedOptionsHandler;
 import org.apache.streampipes.manager.runtime.PipelineElementRuntimeInfoFetcher;
@@ -40,12 +38,9 @@ import org.apache.streampipes.model.SpDataSet;
 import org.apache.streampipes.model.SpDataStream;
 import org.apache.streampipes.model.client.endpoint.RdfEndpoint;
 import org.apache.streampipes.model.client.endpoint.RdfEndpointItem;
-import org.apache.streampipes.model.connect.worker.ConnectWorkerContainer;
 import org.apache.streampipes.model.message.DataSetModificationMessage;
 import org.apache.streampipes.model.message.Message;
-import org.apache.streampipes.model.message.Notifications;
 import org.apache.streampipes.model.message.PipelineModificationMessage;
-import org.apache.streampipes.model.node.NodeInfoDescription;
 import org.apache.streampipes.model.pipeline.Pipeline;
 import org.apache.streampipes.model.pipeline.PipelineElementRecommendationMessage;
 import org.apache.streampipes.model.pipeline.PipelineOperationStatus;
@@ -188,41 +183,4 @@ public class Operations {
   public static String getRuntimeInfo(SpDataStream spDataStream) throws SpRuntimeException {
     return PipelineElementRuntimeInfoFetcher.INSTANCE.getCurrentData(spDataStream);
   }
-
-  public static List<NodeInfoDescription> getAvailableNodes() {
-    //return new AvailableNodesFetcher().fetchNodes();
-    return StorageDispatcher.INSTANCE.getNoSqlStore().getNodeStorage().getAllActiveNodes();
-  }
-
-  public static List<NodeInfoDescription> getAllNodes() {
-    //return new AvailableNodesFetcher().fetchNodes();
-    return StorageDispatcher.INSTANCE.getNoSqlStore().getNodeStorage().getAllNodes();
-  }
-
-
-  public static Message updateNode(NodeInfoDescription desc) {
-    boolean successfullyUpdated = NodeClusterManager.INSTANCE.updateNodeInfoDescription(desc);
-    if (successfullyUpdated) {
-      StorageDispatcher.INSTANCE.getNoSqlStore().getNodeStorage().updateNode(desc);
-      return Notifications.success("Node modified");
-    }
-    return Notifications.error("Could not modify node");
-  }
-
-  public static void addNode(NodeInfoDescription desc) {
-
-    List<NodeInfoDescription> allNodes =
-            StorageDispatcher.INSTANCE.getNoSqlStore().getNodeStorage().getAllNodes();
-
-    boolean alreadyRegistered = allNodes.stream()
-            .anyMatch(n -> n.getNodeControllerId().equals(desc.getNodeControllerId()));
-
-    if (!alreadyRegistered) {
-      StorageDispatcher.INSTANCE.getNoSqlStore().getNodeStorage().storeNode(desc);
-    }
-  }
-
-  public static void deleteNode(String nodeControllerId) {
-    StorageDispatcher.INSTANCE.getNoSqlStore().getNodeStorage().deleteNode(nodeControllerId);
-  }
 }
diff --git a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/setup/CouchDbInstallationStep.java b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/setup/CouchDbInstallationStep.java
index d522345..d4f1b1f 100644
--- a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/setup/CouchDbInstallationStep.java
+++ b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/setup/CouchDbInstallationStep.java
@@ -32,7 +32,7 @@ import java.util.*;
 
 public class CouchDbInstallationStep implements InstallationStep {
 
-    private static List<String> initRdfEndpointPorts =
+    private static final List<String> initRdfEndpointPorts =
             Collections.singletonList("8099/api/v1/admin@streampipes.org/master/sources/");
     private static final String initRdfEndpointHost = "http://localhost:";
 
@@ -79,6 +79,7 @@ public class CouchDbInstallationStep implements InstallationStep {
             Utils.getCouchDbDashboardWidgetClient();
             Utils.getCouchDbLabelClient();
             Utils.getCouchDbCategoryClient();
+            Utils.getCouchDbNodeClient();
 
             return Collections.singletonList(Notifications.success(getTitle()));
         } catch (Exception e) {
diff --git a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/verification/extractor/TypeExtractor.java b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/verification/extractor/TypeExtractor.java
index 7ec4697..b5ba229 100644
--- a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/verification/extractor/TypeExtractor.java
+++ b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/verification/extractor/TypeExtractor.java
@@ -29,23 +29,28 @@ import org.apache.streampipes.model.graph.DataProcessorDescription;
 import org.apache.streampipes.model.graph.DataSinkDescription;
 import org.apache.streampipes.model.graph.DataSourceDescription;
 import org.apache.streampipes.serializers.json.JacksonSerializer;
+import org.slf4j.LoggerFactory;
 
 import java.util.logging.Logger;
 
 public class TypeExtractor {
+	private static final org.slf4j.Logger LOG = LoggerFactory.getLogger(TypeExtractor.class.getCanonicalName());
 
-	private static final Logger logger = Logger.getAnonymousLogger();
+	private static final String DATA_PROCESSOR_PREFIX = "sepa";
+	private static final String DATA_SINK_PREFIX = "sec";
+	private static final String DATA_SOURCE_PREFIX = "sep";
 
-	private String pipelineElementDescription;
+	private final String pipelineElementDescription;
 	
 	public TypeExtractor(String pipelineElementDescription) {
 		this.pipelineElementDescription = pipelineElementDescription;
-
 	}
 	
 	public ElementVerifier<?> getTypeVerifier() throws SepaParseException {
 		try {
-			ObjectNode jsonNode = JacksonSerializer.getObjectMapper().readValue(this.pipelineElementDescription, ObjectNode.class);
+			ObjectNode jsonNode = JacksonSerializer
+					.getObjectMapper()
+					.readValue(this.pipelineElementDescription, ObjectNode.class);
 			String jsonClassName = jsonNode.get("@class").asText();
 			return getTypeDef(jsonClassName);
 		} catch (JsonProcessingException e) {
@@ -57,24 +62,30 @@ public class TypeExtractor {
 		if (jsonClassName == null) {
 			throw new SepaParseException();
 		} else {
-			if (jsonClassName.equals(ep())) { logger.info("Detected type sep"); return new SepVerifier(pipelineElementDescription); }
-			else if (jsonClassName.equals(epa())) { logger.info("Detected type sepa"); return new SepaVerifier(pipelineElementDescription); }
-			else if (jsonClassName.equals(ec())) { logger.info("Detected type sec"); return new SecVerifier(pipelineElementDescription); }
-			else throw new SepaParseException();
+			if (jsonClassName.equals(ep())) {
+				LOG.debug("Detected active pipeline element type {}", DATA_SOURCE_PREFIX);
+				return new SepVerifier(pipelineElementDescription);
+			} else if (jsonClassName.equals(epa())) {
+				LOG.info("Detected active pipeline element type {}", DATA_PROCESSOR_PREFIX);
+				return new SepaVerifier(pipelineElementDescription);
+			} else if (jsonClassName.equals(ec())) {
+				LOG.info("Detected active pipeline element type {}", DATA_SINK_PREFIX);
+				return new SecVerifier(pipelineElementDescription);
+			} else throw new SepaParseException();
 		}
 	}
 	
-	private static final String ep()
+	private static String ep()
 	{
 		return DataSourceDescription.class.getCanonicalName();
 	}
 	
-	private static final String epa()
+	private static String epa()
 	{
 		return DataProcessorDescription.class.getCanonicalName();
 	}
 	
-	private static final String ec()
+	private static String ec()
 	{
 		return DataSinkDescription.class.getCanonicalName();
 	}
diff --git a/streampipes-rest/src/main/java/org/apache/streampipes/rest/api/INode.java b/streampipes-rest/src/main/java/org/apache/streampipes/rest/api/INode.java
index e5ae656..484d084 100644
--- a/streampipes-rest/src/main/java/org/apache/streampipes/rest/api/INode.java
+++ b/streampipes-rest/src/main/java/org/apache/streampipes/rest/api/INode.java
@@ -29,6 +29,8 @@ public interface INode {
 
     Response deleteNode(String username, String nodeControllerId);
 
+    Response changeNodeState(String action, String username, String nodeControllerId);
+
     Response getAvailableNodes();
 
     Response getNodes();
diff --git a/streampipes-rest/src/main/java/org/apache/streampipes/rest/impl/Node.java b/streampipes-rest/src/main/java/org/apache/streampipes/rest/impl/Node.java
index b7f2dee..df43c35 100644
--- a/streampipes-rest/src/main/java/org/apache/streampipes/rest/impl/Node.java
+++ b/streampipes-rest/src/main/java/org/apache/streampipes/rest/impl/Node.java
@@ -17,14 +17,12 @@
  */
 package org.apache.streampipes.rest.impl;
 
-import org.apache.streampipes.manager.operations.Operations;
+import org.apache.streampipes.manager.node.NodeClusterManager;
+import org.apache.streampipes.model.message.NotificationType;
 import org.apache.streampipes.model.message.Notifications;
 import org.apache.streampipes.model.node.NodeInfoDescription;
 import org.apache.streampipes.rest.api.INode;
-import org.apache.streampipes.rest.shared.annotation.GsonClientModel;
 import org.apache.streampipes.rest.shared.annotation.JacksonSerialized;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
 import javax.ws.rs.*;
 import javax.ws.rs.core.MediaType;
@@ -32,18 +30,20 @@ import javax.ws.rs.core.Response;
 
 @Path("/v2/users/{username}/nodes")
 public class Node extends AbstractRestInterface implements INode {
-    private static final Logger LOG = LoggerFactory.getLogger(Node.class.getCanonicalName());
+
+    private static final String ACTIVATE = "activate";
+    private static final String DEACTIVATE = "deactivate";
 
     @POST
     @JacksonSerialized
     @Consumes(MediaType.APPLICATION_JSON)
+    @Produces(MediaType.APPLICATION_JSON)
     @Override
     public Response addNode(@PathParam("username") String username, NodeInfoDescription desc) {
-        Operations.addNode(desc);
-        return statusMessage(Notifications.success("Node added"));
+        NodeClusterManager.addNode(desc);
+        return statusMessage(Notifications.success(NotificationType.STORAGE_SUCCESS));
     }
 
-
     @PUT
     @JacksonSerialized
     @Path("/{nodeControllerId}")
@@ -52,7 +52,29 @@ public class Node extends AbstractRestInterface implements INode {
     @Override
     public Response updateNode(@PathParam("username") String username,
                                @PathParam("nodeControllerId") String nodeControllerId, NodeInfoDescription desc) {
-        return statusMessage(Operations.updateNode(desc));
+        return statusMessage(NodeClusterManager.updateNode(desc));
+    }
+
+    @POST
+    @JacksonSerialized
+    @Path("/{action}/{nodeControllerId}")
+    @Consumes(MediaType.APPLICATION_JSON)
+    @Produces(MediaType.APPLICATION_JSON)
+    @Override
+    public Response changeNodeState(@PathParam("action") String action,
+                                    @PathParam("username") String username,
+                                    @PathParam("nodeControllerId") String nodeControllerId) {
+        boolean success = false;
+        if (action.equals(ACTIVATE)) {
+            success = NodeClusterManager.activateNode(nodeControllerId);
+        } else if (action.equals(DEACTIVATE)) {
+            success = NodeClusterManager.deactivateNode(nodeControllerId);
+        }
+        if (success) {
+            return statusMessage(Notifications.success(NotificationType.OPERATION_SUCCESS));
+        } else {
+            return statusMessage(Notifications.error(NotificationType.NODE_STATE_UPDATE_ERROR));
+        }
     }
 
     @DELETE
@@ -60,8 +82,8 @@ public class Node extends AbstractRestInterface implements INode {
     @Override
     public Response deleteNode(@PathParam("username") String username,
                                @PathParam("nodeControllerId") String nodeControllerId) {
-        Operations.deleteNode(nodeControllerId);
-        return statusMessage(Notifications.success("Node deleted"));
+        NodeClusterManager.deleteNode(nodeControllerId);
+        return statusMessage(Notifications.success(NotificationType.REMOVED_NODE));
     }
 
     @GET
@@ -70,8 +92,7 @@ public class Node extends AbstractRestInterface implements INode {
     @Produces(MediaType.APPLICATION_JSON)
     @Override
     public Response getAvailableNodes() {
-        // TODO: get from couchdb not from consul
-        return ok(Operations.getAvailableNodes());
+        return ok(NodeClusterManager.getAvailableNodes());
     }
 
     @GET
@@ -79,6 +100,6 @@ public class Node extends AbstractRestInterface implements INode {
     @Produces(MediaType.APPLICATION_JSON)
     @Override
     public Response getNodes() {
-        return ok(Operations.getAllNodes());
+        return ok(NodeClusterManager.getAllNodes());
     }
 }
diff --git a/streampipes-rest/src/main/java/org/apache/streampipes/rest/impl/nouser/PipelineElementImportNoUser.java b/streampipes-rest/src/main/java/org/apache/streampipes/rest/impl/nouser/PipelineElementImportNoUser.java
index 54b23f8..cb9a033 100644
--- a/streampipes-rest/src/main/java/org/apache/streampipes/rest/impl/nouser/PipelineElementImportNoUser.java
+++ b/streampipes-rest/src/main/java/org/apache/streampipes/rest/impl/nouser/PipelineElementImportNoUser.java
@@ -42,7 +42,7 @@ public class PipelineElementImportNoUser extends AbstractRestInterface {
   @Context
   UriInfo uri;
 
-  @Path("/")
+//  @Path("/")
   @POST
   @Produces(MediaType.APPLICATION_JSON)
   public Response addElement(@PathParam("username") String username, @FormParam("uri") String uri, @FormParam("publicElement") boolean publicElement) {
diff --git a/streampipes-sdk/src/main/java/org/apache/streampipes/sdk/builder/AbstractProcessingElementBuilder.java b/streampipes-sdk/src/main/java/org/apache/streampipes/sdk/builder/AbstractProcessingElementBuilder.java
index 637b2c4..e035491 100644
--- a/streampipes-sdk/src/main/java/org/apache/streampipes/sdk/builder/AbstractProcessingElementBuilder.java
+++ b/streampipes-sdk/src/main/java/org/apache/streampipes/sdk/builder/AbstractProcessingElementBuilder.java
@@ -24,12 +24,14 @@ import org.apache.streampipes.model.constants.PropertySelectorConstants;
 import org.apache.streampipes.model.grounding.EventGrounding;
 import org.apache.streampipes.model.grounding.TransportFormat;
 import org.apache.streampipes.model.grounding.TransportProtocol;
+import org.apache.streampipes.model.resource.NodeResourceRequirement;
 import org.apache.streampipes.model.schema.EventProperty;
 import org.apache.streampipes.model.schema.EventSchema;
 import org.apache.streampipes.model.schema.PropertyScope;
 import org.apache.streampipes.model.staticproperty.MappingProperty;
 import org.apache.streampipes.model.staticproperty.MappingPropertyNary;
 import org.apache.streampipes.model.staticproperty.MappingPropertyUnary;
+import org.apache.streampipes.sdk.helpers.CollectedResourceRequirements;
 import org.apache.streampipes.sdk.helpers.CollectedStreamRequirements;
 import org.apache.streampipes.sdk.helpers.Label;
 
@@ -43,6 +45,8 @@ public abstract class AbstractProcessingElementBuilder<BU extends
 
   protected List<SpDataStream> streamRequirements;
 
+  protected List<NodeResourceRequirement> resourceRequirements;
+
   protected List<EventProperty> stream1Properties;
   protected List<EventProperty> stream2Properties;
 
@@ -57,6 +61,7 @@ public abstract class AbstractProcessingElementBuilder<BU extends
     this.streamRequirements = new ArrayList<>();
     this.stream1Properties = new ArrayList<>();
     this.stream2Properties = new ArrayList<>();
+    this.resourceRequirements = new ArrayList<>();
     this.supportedGrounding = new EventGrounding();
   }
 
@@ -65,6 +70,7 @@ public abstract class AbstractProcessingElementBuilder<BU extends
     this.streamRequirements = new ArrayList<>();
     this.stream1Properties = new ArrayList<>();
     this.stream2Properties = new ArrayList<>();
+    this.resourceRequirements = new ArrayList<>();
     this.supportedGrounding = new EventGrounding();
   }
 
@@ -93,7 +99,15 @@ public abstract class AbstractProcessingElementBuilder<BU extends
     return me();
   }
 
-  public BU requiredResource() {
+  /**
+   * Set a new resource requirement by adding restrictions on node. Use
+   * {@link ResourceRequirementsBuilder} to create requirements.
+   *
+   * @param resourceRequirements: A bundle of collected {@link CollectedResourceRequirements}
+   * @return this
+   */
+  public BU requiredResource(CollectedResourceRequirements resourceRequirements) {
+    this.resourceRequirements.addAll(resourceRequirements.getResourceRequirements());
     return me();
   }
 
@@ -279,6 +293,9 @@ public abstract class AbstractProcessingElementBuilder<BU extends
 
     this.elementDescription.setSpDataStreams(streamRequirements);
 
+    if (this.resourceRequirements.size() > 0) {
+      this.elementDescription.setResourceRequirements(resourceRequirements);
+    }
   }
 
   private SpDataStream buildStream(List<EventProperty> streamProperties) {
diff --git a/streampipes-sdk/src/main/java/org/apache/streampipes/sdk/builder/ProcessingElementBuilder.java b/streampipes-sdk/src/main/java/org/apache/streampipes/sdk/builder/ProcessingElementBuilder.java
index 9a1dbbf..1f99575 100644
--- a/streampipes-sdk/src/main/java/org/apache/streampipes/sdk/builder/ProcessingElementBuilder.java
+++ b/streampipes-sdk/src/main/java/org/apache/streampipes/sdk/builder/ProcessingElementBuilder.java
@@ -102,6 +102,7 @@ public class ProcessingElementBuilder extends AbstractProcessingElementBuilder<P
   public void prepareBuild() {
     super.prepareBuild();
     this.elementDescription.setOutputStrategies(outputStrategies);
+    this.elementDescription.setResourceRequirements(resourceRequirements);
   }
 
   @Override
diff --git a/streampipes-storage-api/src/main/java/org/apache/streampipes/storage/api/INodeInfoStorage.java b/streampipes-sdk/src/main/java/org/apache/streampipes/sdk/builder/ResourceRequirementsBuilder.java
similarity index 50%
copy from streampipes-storage-api/src/main/java/org/apache/streampipes/storage/api/INodeInfoStorage.java
copy to streampipes-sdk/src/main/java/org/apache/streampipes/sdk/builder/ResourceRequirementsBuilder.java
index cc1cb92..94b3590 100644
--- a/streampipes-storage-api/src/main/java/org/apache/streampipes/storage/api/INodeInfoStorage.java
+++ b/streampipes-sdk/src/main/java/org/apache/streampipes/sdk/builder/ResourceRequirementsBuilder.java
@@ -15,25 +15,33 @@
  * limitations under the License.
  *
  */
+package org.apache.streampipes.sdk.builder;
 
-package org.apache.streampipes.storage.api;
 
-import org.apache.streampipes.model.node.NodeInfoDescription;
+import org.apache.streampipes.model.resource.NodeResourceRequirement;
+import org.apache.streampipes.sdk.helpers.CollectedResourceRequirements;
 
+import java.util.ArrayList;
 import java.util.List;
-import java.util.Optional;
 
-public interface INodeInfoStorage {
+public class ResourceRequirementsBuilder {
 
-    List<NodeInfoDescription> getAllNodes();
+    private final List<NodeResourceRequirement> resourceRequirements;
 
-    List<NodeInfoDescription> getAllActiveNodes();
+    public static ResourceRequirementsBuilder create() {
+        return new ResourceRequirementsBuilder();
+    }
 
-    void storeNode(NodeInfoDescription desc);
+    public ResourceRequirementsBuilder() {
+        this.resourceRequirements = new ArrayList<>();
+    }
 
-    void updateNode(NodeInfoDescription desc);
+    public ResourceRequirementsBuilder requiredProperty(NodeResourceRequirement nrp) {
+        this.resourceRequirements.add(nrp);
+        return this;
+    }
 
-    Optional<NodeInfoDescription> getNode(String nodeControllerId);
-
-    void deleteNode(String nodeControllerId);
+    public CollectedResourceRequirements build() {
+        return new CollectedResourceRequirements(this.resourceRequirements);
+    }
 }
diff --git a/streampipes-sdk/src/main/java/org/apache/streampipes/sdk/helpers/CollectedResourceRequirements.java b/streampipes-sdk/src/main/java/org/apache/streampipes/sdk/helpers/CollectedResourceRequirements.java
index 20a8786..77b54db 100644
--- a/streampipes-sdk/src/main/java/org/apache/streampipes/sdk/helpers/CollectedResourceRequirements.java
+++ b/streampipes-sdk/src/main/java/org/apache/streampipes/sdk/helpers/CollectedResourceRequirements.java
@@ -17,5 +17,28 @@
  */
 package org.apache.streampipes.sdk.helpers;
 
+import org.apache.streampipes.model.resource.NodeResourceRequirement;
+
+import java.util.ArrayList;
+import java.util.List;
+
 public class CollectedResourceRequirements {
+
+    private List<NodeResourceRequirement> resourceRequirements;
+
+    public CollectedResourceRequirements(List<NodeResourceRequirement> resourceRequirements) {
+        this.resourceRequirements = resourceRequirements;
+    }
+
+    public CollectedResourceRequirements() {
+        this.resourceRequirements = new ArrayList<>();
+    }
+
+    public List<NodeResourceRequirement> getResourceRequirements() {
+        return resourceRequirements;
+    }
+
+    public void setResourceRequirements(List<NodeResourceRequirement> resourceRequirements) {
+        this.resourceRequirements = resourceRequirements;
+    }
 }
diff --git a/streampipes-sdk/src/main/java/org/apache/streampipes/sdk/helpers/CollectedResourceRequirements.java b/streampipes-sdk/src/main/java/org/apache/streampipes/sdk/helpers/NodeResourceRequirements.java
similarity index 79%
copy from streampipes-sdk/src/main/java/org/apache/streampipes/sdk/helpers/CollectedResourceRequirements.java
copy to streampipes-sdk/src/main/java/org/apache/streampipes/sdk/helpers/NodeResourceRequirements.java
index 20a8786..457fba5 100644
--- a/streampipes-sdk/src/main/java/org/apache/streampipes/sdk/helpers/CollectedResourceRequirements.java
+++ b/streampipes-sdk/src/main/java/org/apache/streampipes/sdk/helpers/NodeResourceRequirements.java
@@ -17,5 +17,13 @@
  */
 package org.apache.streampipes.sdk.helpers;
 
-public class CollectedResourceRequirements {
+import org.apache.streampipes.model.resource.Hardware;
+
+public class NodeResourceRequirements {
+
+    public static Hardware gpu() {
+        Hardware rp = new Hardware();
+        rp.setGpu(true);
+        return rp;
+    }
 }
diff --git a/streampipes-serializers-json/src/main/java/org/apache/streampipes/serializers/json/GsonSerializer.java b/streampipes-serializers-json/src/main/java/org/apache/streampipes/serializers/json/GsonSerializer.java
index 357a5e4..cf056c2 100644
--- a/streampipes-serializers-json/src/main/java/org/apache/streampipes/serializers/json/GsonSerializer.java
+++ b/streampipes-serializers-json/src/main/java/org/apache/streampipes/serializers/json/GsonSerializer.java
@@ -49,6 +49,7 @@ import org.apache.streampipes.model.output.OutputStrategy;
 import org.apache.streampipes.model.quality.EventPropertyQualityDefinition;
 import org.apache.streampipes.model.quality.EventStreamQualityDefinition;
 import org.apache.streampipes.model.quality.Frequency;
+import org.apache.streampipes.model.resource.NodeResourceRequirement;
 import org.apache.streampipes.model.schema.EventProperty;
 import org.apache.streampipes.model.schema.ValueSpecification;
 import org.apache.streampipes.model.staticproperty.MappingProperty;
@@ -118,6 +119,7 @@ public class GsonSerializer {
 
     builder.registerTypeAdapter(ContainerRuntime.class, new JsonLdSerializer<ContainerRuntime>());
     builder.registerTypeAdapter(DeploymentContainer.class, new JsonLdSerializer<DeploymentContainer>());
+    builder.registerTypeAdapter(NodeResourceRequirement.class, new JsonLdSerializer<NodeResourceRequirement>());
 
     builder.setPrettyPrinting();
     return builder;
diff --git a/streampipes-serializers-jsonld/src/main/java/org/apache/streampipes/serializers/jsonld/CustomAnnotationProvider.java b/streampipes-serializers-jsonld/src/main/java/org/apache/streampipes/serializers/jsonld/CustomAnnotationProvider.java
index 4a3cf13..b9ee0fa 100644
--- a/streampipes-serializers-jsonld/src/main/java/org/apache/streampipes/serializers/jsonld/CustomAnnotationProvider.java
+++ b/streampipes-serializers-jsonld/src/main/java/org/apache/streampipes/serializers/jsonld/CustomAnnotationProvider.java
@@ -57,6 +57,7 @@ import org.apache.streampipes.model.node.resources.software.NvidiaContainerRunti
 import org.apache.streampipes.model.node.resources.software.SoftwareResource;
 import org.apache.streampipes.model.output.*;
 import org.apache.streampipes.model.quality.*;
+import org.apache.streampipes.model.resource.Hardware;
 import org.apache.streampipes.model.runtime.RuntimeOptionsRequest;
 import org.apache.streampipes.model.runtime.RuntimeOptionsResponse;
 import org.apache.streampipes.model.schema.*;
@@ -211,7 +212,8 @@ public class CustomAnnotationProvider implements EmpireAnnotationProvider {
             DISK.class,
             GPU.class,
             SoftwareResource.class,
-            FieldDeviceAccessResource.class
+            FieldDeviceAccessResource.class,
+            Hardware.class
     );
   }
 }
diff --git a/streampipes-storage-api/src/main/java/org/apache/streampipes/storage/api/INodeInfoStorage.java b/streampipes-storage-api/src/main/java/org/apache/streampipes/storage/api/INodeInfoStorage.java
index cc1cb92..b1dfa29 100644
--- a/streampipes-storage-api/src/main/java/org/apache/streampipes/storage/api/INodeInfoStorage.java
+++ b/streampipes-storage-api/src/main/java/org/apache/streampipes/storage/api/INodeInfoStorage.java
@@ -31,6 +31,10 @@ public interface INodeInfoStorage {
 
     void storeNode(NodeInfoDescription desc);
 
+    void deactivateNode(String nodeControllerId);
+
+    void activateNode(String nodeControllerId);
+
     void updateNode(NodeInfoDescription desc);
 
     Optional<NodeInfoDescription> getNode(String nodeControllerId);
diff --git a/streampipes-storage-couchdb/src/main/java/org/apache/streampipes/storage/couchdb/impl/NodeInfoStorageImpl.java b/streampipes-storage-couchdb/src/main/java/org/apache/streampipes/storage/couchdb/impl/NodeInfoStorageImpl.java
index 6a8a7e4..5107bfa 100644
--- a/streampipes-storage-couchdb/src/main/java/org/apache/streampipes/storage/couchdb/impl/NodeInfoStorageImpl.java
+++ b/streampipes-storage-couchdb/src/main/java/org/apache/streampipes/storage/couchdb/impl/NodeInfoStorageImpl.java
@@ -50,12 +50,36 @@ public class NodeInfoStorageImpl extends AbstractDao<NodeInfoDescription> implem
 
     @Override
     public void storeNode(NodeInfoDescription desc) {
-        LOG.info("Store new node description with node id={}, url={}", desc.getNodeControllerId(),
-                desc.getHostname() + ":" + desc.getPort());
         persist(desc);
     }
 
     @Override
+    public void deactivateNode(String nodeControllerId) {
+        Optional<NodeInfoDescription> storedNode = getNode(nodeControllerId);
+        if (storedNode.isPresent()) {
+            LOG.info("Deactivate node controller={} at url=http://{}:{}",
+                    storedNode.get().getNodeControllerId(),
+                    storedNode.get().getHostname(),
+                    storedNode.get().getPort());
+            storedNode.get().setActive(false);
+            update(storedNode.get());
+        }
+    }
+
+    @Override
+    public void activateNode(String nodeControllerId) {
+        Optional<NodeInfoDescription> storedNode = getNode(nodeControllerId);
+        if (storedNode.isPresent()) {
+            LOG.info("Activate node controller={} at url=http://{}:{}",
+                    storedNode.get().getNodeControllerId(),
+                    storedNode.get().getHostname(),
+                    storedNode.get().getPort());
+            storedNode.get().setActive(true);
+            update(storedNode.get());
+        }
+    }
+
+    @Override
     public void updateNode(NodeInfoDescription desc) {
         LOG.info("Update node description for node id={}, url={}", desc.getNodeControllerId(),
                 desc.getHostname() + ":" + desc.getPort());
diff --git a/streampipes-vocabulary/src/main/java/org/apache/streampipes/vocabulary/StreamPipes.java b/streampipes-vocabulary/src/main/java/org/apache/streampipes/vocabulary/StreamPipes.java
index b706a6a..e2a2150 100644
--- a/streampipes-vocabulary/src/main/java/org/apache/streampipes/vocabulary/StreamPipes.java
+++ b/streampipes-vocabulary/src/main/java/org/apache/streampipes/vocabulary/StreamPipes.java
@@ -484,4 +484,9 @@ public class StreamPipes {
   public static final String HAS_HARDWARE_RESOURCES = NS + "hasHardwareResources";
   public static final String HAS_CONTAINER_RUNTIME_SERVER_VERSION = NS + "hasContainerRuntimeServerVersion";
   public static final String HAS_CONTAINER_RUNTIME_API_VERSION = NS + "hasContainerRuntimeApiVersion";
+  public static final String HARDWARE_REQUIREMENT = NS + "HardwareRequirement";
+  public static final String HAS_GPU_REQUIREMENT = NS + "hasGpuRequirement";
+  public static final String HAS_NODE_RESOURCE_PROPERTY = NS + "hasNodeResourceProperty";
+  public static final String NODE_RESOURCE_REQUIREMENT = NS + "NodeResourceRequirement";
+  public static final String REQUIRES_RESOURCES = NS + "requiresResources";
 }
diff --git a/ui/src/app/configuration/node-configuration/node-configuration.component.ts b/ui/src/app/configuration/node-configuration/node-configuration.component.ts
index 7f295c6..46cd508 100644
--- a/ui/src/app/configuration/node-configuration/node-configuration.component.ts
+++ b/ui/src/app/configuration/node-configuration/node-configuration.component.ts
@@ -110,7 +110,13 @@ export class NodeConfigurationComponent implements OnInit{
                     // No adapters detected on this node. This means that no adapter was created on this host. Thus
                     // we can safely proceed setting a new state, i.e. active = (true || false) this node
                     node.active = desiredState;
-                    this.nodeService.updateNodeState(node).subscribe(statusMessage => {
+                    let stateService;
+                    if (desiredState) {
+                        stateService = this.nodeService.activateNode(node.nodeControllerId);
+                    } else {
+                        stateService = this.nodeService.deactivateNode(node.nodeControllerId);
+                    }
+                    stateService.subscribe(statusMessage => {
                         if(statusMessage.success) {
                             this.openSnackBar("Node successfully " + (desiredState ? "activated" : "deactivated"))
                         }
diff --git a/ui/src/app/core-model/gen/streampipes-model.ts b/ui/src/app/core-model/gen/streampipes-model.ts
index 2650545..44fde71 100644
--- a/ui/src/app/core-model/gen/streampipes-model.ts
+++ b/ui/src/app/core-model/gen/streampipes-model.ts
@@ -19,10 +19,11 @@
 /* tslint:disable */
 /* eslint-disable */
 // @ts-nocheck
-// Generated using typescript-generator version 2.24.612 on 2020-12-30 22:52:01.
+// Generated using typescript-generator version 2.24.612 on 2021-01-09 20:47:18.
 
 export class AbstractStreamPipesEntity {
-    "@class": "org.apache.streampipes.model.base.NamedStreamPipesEntity" | "org.apache.streampipes.model.connect.adapter.AdapterDescription" | "org.apache.streampipes.model.connect.adapter.AdapterSetDescription" | "org.apache.streampipes.model.connect.adapter.GenericAdapterSetDescription" | "org.apache.streampipes.model.connect.adapter.SpecificAdapterSetDescription" | "org.apache.streampipes.model.connect.adapter.AdapterStreamDescription" | "org.apache.streampipes.model.connect.adapter.G [...]
+    "@class": "org.apache.streampipes.model.base.NamedStreamPipesEntity" | "org.apache.streampipes.model.connect.adapter.AdapterDescription" | "org.apache.streampipes.model.connect.adapter.AdapterSetDescription" | "org.apache.streampipes.model.connect.adapter.GenericAdapterSetDescription" | "org.apache.streampipes.model.connect.adapter.SpecificAdapterSetDescription" | "org.apache.streampipes.model.connect.adapter.AdapterStreamDescription" | "org.apache.streampipes.model.connect.adapter.G [...]
+    elementId: string;
 
     static fromData(data: AbstractStreamPipesEntity, target?: AbstractStreamPipesEntity): AbstractStreamPipesEntity {
         if (!data) {
@@ -30,13 +31,13 @@ export class AbstractStreamPipesEntity {
         }
         const instance = target || new AbstractStreamPipesEntity();
         instance["@class"] = data["@class"];
+        instance.elementId = data.elementId;
         return instance;
     }
 }
 
 export class UnnamedStreamPipesEntity extends AbstractStreamPipesEntity {
-    "@class": "org.apache.streampipes.model.base.UnnamedStreamPipesEntity" | "org.apache.streampipes.model.connect.guess.GuessSchema" | "org.apache.streampipes.model.connect.rules.TransformationRuleDescription" | "org.apache.streampipes.model.connect.rules.value.ValueTransformationRuleDescription" | "org.apache.streampipes.model.connect.rules.value.AddTimestampRuleDescription" | "org.apache.streampipes.model.connect.rules.value.AddValueTransformationRuleDescription" | "org.apache.streamp [...]
-    elementId: string;
+    "@class": "org.apache.streampipes.model.base.UnnamedStreamPipesEntity" | "org.apache.streampipes.model.connect.guess.GuessSchema" | "org.apache.streampipes.model.connect.rules.TransformationRuleDescription" | "org.apache.streampipes.model.connect.rules.value.ValueTransformationRuleDescription" | "org.apache.streampipes.model.connect.rules.value.AddTimestampRuleDescription" | "org.apache.streampipes.model.connect.rules.value.AddValueTransformationRuleDescription" | "org.apache.streamp [...]
 
     static fromData(data: UnnamedStreamPipesEntity, target?: UnnamedStreamPipesEntity): UnnamedStreamPipesEntity {
         if (!data) {
@@ -44,7 +45,6 @@ export class UnnamedStreamPipesEntity extends AbstractStreamPipesEntity {
         }
         const instance = target || new UnnamedStreamPipesEntity();
         super.fromData(data, instance);
-        instance.elementId = data.elementId;
         return instance;
     }
 }
@@ -125,7 +125,6 @@ export class NamedStreamPipesEntity extends AbstractStreamPipesEntity {
     connectedTo: string[];
     description: string;
     dom: string;
-    elementId: string;
     iconUrl: string;
     includedAssets: string[];
     includedLocales: string[];
@@ -143,7 +142,6 @@ export class NamedStreamPipesEntity extends AbstractStreamPipesEntity {
         instance.name = data.name;
         instance.description = data.description;
         instance.iconUrl = data.iconUrl;
-        instance.elementId = data.elementId;
         instance.appId = data.appId;
         instance.includesAssets = data.includesAssets;
         instance.includesLocales = data.includesLocales;
@@ -1009,6 +1007,7 @@ export class InvocableStreamPipesEntity extends NamedStreamPipesEntity {
     elementEndpointPort: number;
     elementEndpointServiceName: string;
     inputStreams: SpDataStreamUnion[];
+    resourceRequirements: NodeResourceRequirementUnion[];
     staticProperties: StaticPropertyUnion[];
     statusInfoSettings: ElementStatusInfoSettings;
     streamRequirements: SpDataStreamUnion[];
@@ -1029,6 +1028,7 @@ export class InvocableStreamPipesEntity extends NamedStreamPipesEntity {
         instance.correspondingPipeline = data.correspondingPipeline;
         instance.correspondingUser = data.correspondingUser;
         instance.streamRequirements = __getCopyArrayFn(SpDataStream.fromDataUnion)(data.streamRequirements);
+        instance.resourceRequirements = __getCopyArrayFn(NodeResourceRequirement.fromDataUnion)(data.resourceRequirements);
         instance.elementEndpointHostname = data.elementEndpointHostname;
         instance.elementEndpointPort = data.elementEndpointPort;
         instance.deploymentTargetNodeId = data.deploymentTargetNodeId;
@@ -1847,6 +1847,44 @@ export class GuessSchema extends UnnamedStreamPipesEntity {
     }
 }
 
+export class NodeResourceRequirement extends UnnamedStreamPipesEntity {
+    "@class": "org.apache.streampipes.model.resource.NodeResourceRequirement" | "org.apache.streampipes.model.resource.Hardware";
+
+    static fromData(data: NodeResourceRequirement, target?: NodeResourceRequirement): NodeResourceRequirement {
+        if (!data) {
+            return data;
+        }
+        const instance = target || new NodeResourceRequirement();
+        super.fromData(data, instance);
+        return instance;
+    }
+
+    static fromDataUnion(data: NodeResourceRequirementUnion): NodeResourceRequirementUnion {
+        if (!data) {
+            return data;
+        }
+        switch (data["@class"]) {
+            case "org.apache.streampipes.model.resource.Hardware":
+                return Hardware.fromData(data);
+        }
+    }
+}
+
+export class Hardware extends NodeResourceRequirement {
+    "@class": "org.apache.streampipes.model.resource.Hardware";
+    gpu: boolean;
+
+    static fromData(data: Hardware, target?: Hardware): Hardware {
+        if (!data) {
+            return data;
+        }
+        const instance = target || new Hardware();
+        super.fromData(data, instance);
+        instance.gpu = data.gpu;
+        return instance;
+    }
+}
+
 export class HardwareResource extends UnnamedStreamPipesEntity {
     "@class": "org.apache.streampipes.model.node.resources.hardware.HardwareResource";
     cpu: CPU;
@@ -2343,6 +2381,7 @@ export class Pipeline extends ElementComposition {
     eventRelayStrategy: string;
     pipelineCategories: string[];
     publicElement: boolean;
+    restartOnSystemReboot: boolean;
     running: boolean;
     startedAt: number;
 
@@ -2354,6 +2393,7 @@ export class Pipeline extends ElementComposition {
         super.fromData(data, instance);
         instance.actions = __getCopyArrayFn(DataSinkInvocation.fromData)(data.actions);
         instance.running = data.running;
+        instance.restartOnSystemReboot = data.restartOnSystemReboot;
         instance.startedAt = data.startedAt;
         instance.createdAt = data.createdAt;
         instance.publicElement = data.publicElement;
@@ -3225,6 +3265,8 @@ export type MappingPropertyUnion = MappingPropertyNary | MappingPropertyUnary;
 
 export type MeasurementPropertyUnion = EventPropertyQualityDefinition | EventStreamQualityDefinition;
 
+export type NodeResourceRequirementUnion = Hardware;
+
 export type OneOfStaticPropertyUnion = RuntimeResolvableOneOfStaticProperty;
 
 export type OutputStrategyUnion = AppendOutputStrategy | CustomOutputStrategy | CustomTransformOutputStrategy | FixedOutputStrategy | KeepOutputStrategy | ListOutputStrategy | TransformOutputStrategy | UserDefinedOutputStrategy;
diff --git a/ui/src/app/editor/dialog/save-pipeline/save-pipeline.component.ts b/ui/src/app/editor/dialog/save-pipeline/save-pipeline.component.ts
index fef866c..2571ead 100644
--- a/ui/src/app/editor/dialog/save-pipeline/save-pipeline.component.ts
+++ b/ui/src/app/editor/dialog/save-pipeline/save-pipeline.component.ts
@@ -23,7 +23,7 @@ import {
   Message,
   Pipeline,
   NodeInfoDescription,
-  StaticNodeMetadata
+  StaticNodeMetadata, NvidiaContainerRuntime, DockerContainerRuntime, ContainerRuntime, DataSinkInvocation
 } from "../../../core-model/gen/streampipes-model";
 import {ObjectProvider} from "../../services/object-provider.service";
 import {EditorService} from "../../services/editor.service";
@@ -310,6 +310,4 @@ export class SavePipelineComponent implements OnInit {
       this.applyDefaultPolicy(this.tmpPipeline.sepas);
     }
   }
-
-
 }
\ No newline at end of file
diff --git a/ui/src/app/pipelines/components/pipeline-overview/pipeline-overview.component.html b/ui/src/app/pipelines/components/pipeline-overview/pipeline-overview.component.html
index 1a99f9d..03a442b 100644
--- a/ui/src/app/pipelines/components/pipeline-overview/pipeline-overview.component.html
+++ b/ui/src/app/pipelines/components/pipeline-overview/pipeline-overview.component.html
@@ -78,7 +78,7 @@
                             matTooltipPosition="above"
                             [disabled]="!pipeline.running"
                             (click)="pipelineOperationsService.migratePipelineProcessors(pipeline)">
-                        <i class="material-icons">storage</i>
+                        <i class="material-icons">swap_horizontal_circle_filled</i>
                     </button>
                 </span>
                     <span fxFlex fxFlexOrder="4" fxLayout="row" fxLayoutAlign="center center">
diff --git a/ui/src/app/platform-services/apis/node.service.ts b/ui/src/app/platform-services/apis/node.service.ts
index 290d36a..921edb5 100644
--- a/ui/src/app/platform-services/apis/node.service.ts
+++ b/ui/src/app/platform-services/apis/node.service.ts
@@ -52,4 +52,18 @@ export class NodeService {
                 return Message.fromData(response as Message);
             }));
     }
+
+    activateNode(nodeControllerId: string): Observable<Message> {
+        return this.http.post(this.platformServicesCommons.authUserBasePath() + '/nodes/activate/' + nodeControllerId, {})
+            .pipe(map(response => {
+                return Message.fromData(response as Message);
+            }));
+    }
+
+    deactivateNode(nodeControllerId: string): Observable<Message> {
+        return this.http.post(this.platformServicesCommons.authUserBasePath() + '/nodes/deactivate/' + nodeControllerId, {})
+            .pipe(map(response => {
+                return Message.fromData(response as Message);
+            }));
+    }
 }
\ No newline at end of file