You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@streampipes.apache.org by wi...@apache.org on 2021/03/04 00:43:05 UTC

[incubator-streampipes] branch edge-extensions updated (dfbea97 -> 6a7c1e6)

This is an automated email from the ASF dual-hosted git repository.

wiener pushed a change to branch edge-extensions
in repository https://gitbox.apache.org/repos/asf/incubator-streampipes.git.


    from dfbea97  Update Pipeline Status Dialog for multiple edge nodes and migration
     new afacf89  [WIP] refactoring pipeline element migration and node controller sync on rejoin
     add 7905969  [STREAMPIPES-275] Support more static properties in templates
     add 32bb1f2  Merge branch 'dev' of github.com:apache/incubator-streampipes into dev
     add e20bf08  [hotfix] Add pipeline API resource to API docs
     add 71b4c5f  [hotfix] Fix adapter preview in UI
     add ac37be8  [hotfix] Minor layout improvements in adapter started dialog
     add 559cae4  [STREAMPIPES-302] Modify adapter resource paths for data set invocation
     add 03db03b  [hotfix] Add minor layout improvements to dashboard
     add 11675bf  [STREAMPIPES-303] Add autocomplete support for semantic type selection in StreamPipes Connect
     add b9aab80  [STREAMPIPES-245] Remove debug mode from monitoring feature
     add ff43f77  [STREAMPIPES-307] Extend SDK to add default code block
     new 6a7c1e6  Merge branch 'dev' into edge-extensions

The 2 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../src/main/resources/openapi.yaml                |   2 +-
 streampipes-backend/src/main/resources/shiro.ini   |   4 +-
 .../container/master/rest/SourcesResource.java     |   4 +-
 .../model/{ => eventrelay}/SpDataStreamRelay.java  |   2 +-
 .../SpDataStreamRelayContainer.java                |  58 ++-
 .../model/eventrelay}/metrics/RelayMetrics.java    |   2 +-
 .../model/graph/DataProcessorDescription.java      |   2 +-
 .../model/graph/DataProcessorInvocation.java       |   2 +-
 .../model/message/NotificationType.java            |   5 +-
 .../pipeline/PipelineElementMigrationEntity.java   |  51 ++
 .../template/PipelineElementTemplateConfig.java    |   8 +
 .../org/apache/streampipes/model/util/Cloner.java  |   2 +-
 streampipes-node-controller-container/pom.xml      |   4 +
 .../api/DataProcessorPipelineElementResource.java  |  30 --
 .../api/DataSinkPipelineElementResource.java       |  30 --
 ...yResource.java => DataStreamRelayResource.java} |   8 +-
 .../container/api/InvocableEntityResource.java     |  55 +--
 .../api/NodeControllerResourceConfig.java          |   4 +-
 ...ource.java => NodeInfoDescriptionResource.java} |   4 +-
 .../container/management/node/NodeManager.java     |   5 +-
 .../management/pe/InvocableElementManager.java     | 111 +++--
 .../management/pe/PipelineElementLifeCycle.java    |   3 +-
 .../management/relay/DataStreamRelayManager.java   | 108 ++---
 .../container/management/relay/EventRelay.java     |   6 +-
 .../relay/bridges/MultiBrokerBridge.java           |   3 +-
 .../manager/execution/http/ElementSubmitter.java   | 162 +++++++
 .../manager/execution/http/GraphSubmitter.java     | 208 ++++-----
 .../manager/execution/http/HttpRequestBuilder.java |   9 +-
 .../manager/execution/http/MigrationHelpers.java   |  75 ---
 .../manager/execution/http/PipelineExecutor.java   | 513 ---------------------
 .../http/StreamRelayEndpointUrlGenerator.java      |   2 +-
 .../pipeline/AbstractPipelineExecutor.java         | 458 ++++++++++++++++++
 .../execution/pipeline/PipelineExecutor.java       | 110 +++++
 .../pipeline/PipelineMigrationExecutor.java        | 349 ++++++++++++++
 .../pipeline/PipelineMigrationHelpers.java         |   6 +-
 .../{http => pipeline}/PipelineStorageService.java |   2 +-
 .../execution/pipeline/migration/Command.java      |   8 +-
 .../migration/PipelineElementStartCommand.java     |  17 +-
 .../migration/PipelineElementStopCommand.java      |  16 +-
 .../pipeline/migration/RelayStartCommand.java      |  14 +-
 .../pipeline/migration/RelayStopCommand.java       |  14 +-
 .../manager/matching/InvocationGraphBuilder.java   |   2 +-
 .../migration/PipelineElementMigrationHandler.java | 147 ++++++
 .../monitoring/pipeline/TopicInfoCollector.java    |   4 -
 .../manager/node/AbstractClusterManager.java       | 112 +++--
 .../manager/node/AvailableNodesFetcher.java        |   1 +
 .../manager/node/NodeClusterManager.java           | 112 ++++-
 .../streampipes/manager/node/NodeSyncOptions.java  |   7 +-
 .../streampipes/manager/operations/Operations.java |  51 +-
 .../template/PipelineElementTemplateHandler.java   |   7 +-
 .../template/PipelineElementTemplateVisitor.java   |  95 +++-
 .../apache/streampipes/rest/impl/AutoComplete.java |  31 +-
 .../org/apache/streampipes/rest/impl/Node.java     |   6 +-
 .../streampipes/rest/impl/PipelineResource.java    |   9 +-
 ...AbstractConfigurablePipelineElementBuilder.java |  18 +-
 .../serializers/json/GsonSerializer.java           |   2 +
 .../jsonld/CustomAnnotationProvider.java           |   2 +
 .../serializers/jsonld/JsonLdTransformer.java      |   1 +
 .../streampipes/storage/api/INoSqlStorage.java     |   2 +
 ...eInfoStorage.java => INodeDataStreamRelay.java} |  22 +-
 .../storage/couchdb/CouchDbStorageManager.java     |   5 +-
 .../couchdb/impl/NodeDataStreamRelayImpl.java      |  76 +++
 .../streampipes/storage/couchdb/utils/Utils.java   |   6 +
 .../org/apache/streampipes/vocabulary/Geo.java     |   7 +
 .../apache/streampipes/vocabulary/Geonames.java    |  13 +
 .../java/org/apache/streampipes/vocabulary/SO.java | 439 ++++++++++++++++++
 .../apache/streampipes/vocabulary/SPSensor.java    |  14 +
 .../vocabulary/SemanticTypeRegistry.java           |  37 +-
 .../event-schema/event-schema.component.html       |   6 +-
 .../adapter-started-dialog.component.html          |  31 +-
 .../adapter-started-dialog.component.scss          |   9 +-
 .../adapter-started-dialog.component.ts            |   4 +-
 .../edit-event-property.component.html             |   9 +-
 .../edit-event-property.component.ts               |  20 +-
 ui/src/app/connect/services/rest.service.ts        |   6 +-
 ui/src/app/core-model/gen/streampipes-model.ts     |  14 +-
 .../components/grid/dashboard-grid.component.ts    |   3 +
 .../components/panel/dashboard-panel.component.ts  |   4 +-
 .../widgets/table/table-widget.component.css       |   9 +-
 .../widgets/table/table-widget.component.html      |   6 +-
 .../widgets/table/table-widget.component.ts        |   3 +-
 .../pipeline-element-template-generator.ts         |  54 ++-
 .../pipeline-status-dialog.component.ts            |  12 +-
 ...toring.service.ts => semantic-types.service.ts} |  19 +-
 ui/src/app/platform-services/platform.module.ts    |   5 +-
 85 files changed, 2649 insertions(+), 1269 deletions(-)
 rename streampipes-model/src/main/java/org/apache/streampipes/model/{ => eventrelay}/SpDataStreamRelay.java (98%)
 rename streampipes-model/src/main/java/org/apache/streampipes/model/{ => eventrelay}/SpDataStreamRelayContainer.java (66%)
 rename {streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/management/relay => streampipes-model/src/main/java/org/apache/streampipes/model/eventrelay}/metrics/RelayMetrics.java (96%)
 create mode 100644 streampipes-model/src/main/java/org/apache/streampipes/model/pipeline/PipelineElementMigrationEntity.java
 delete mode 100644 streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/api/DataProcessorPipelineElementResource.java
 delete mode 100644 streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/api/DataSinkPipelineElementResource.java
 rename streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/api/{AdapterDataStreamRelayResource.java => DataStreamRelayResource.java} (82%)
 rename streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/api/{InfoStatusResource.java => NodeInfoDescriptionResource.java} (97%)
 create mode 100644 streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/http/ElementSubmitter.java
 delete mode 100644 streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/http/MigrationHelpers.java
 delete mode 100644 streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/http/PipelineExecutor.java
 create mode 100644 streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/pipeline/AbstractPipelineExecutor.java
 create mode 100644 streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/pipeline/PipelineExecutor.java
 create mode 100644 streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/pipeline/PipelineMigrationExecutor.java
 copy streampipes-commons/src/main/java/org/apache/streampipes/commons/exceptions/NoValidSepaStructureException.java => streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/pipeline/PipelineMigrationHelpers.java (89%)
 rename streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/{http => pipeline}/PipelineStorageService.java (98%)
 copy streampipes-client/src/main/java/org/apache/streampipes/client/api/SupportsPipelineApi.java => streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/pipeline/migration/Command.java (85%)
 copy ui/src/app/info/about/about.component.ts => streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/pipeline/migration/PipelineElementStartCommand.java (78%)
 copy ui/src/app/info/about/about.component.ts => streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/pipeline/migration/PipelineElementStopCommand.java (78%)
 copy ui/src/app/core-model/datalake/DateRange.ts => streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/pipeline/migration/RelayStartCommand.java (79%)
 copy ui/src/app/core-model/datalake/DateRange.ts => streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/pipeline/migration/RelayStopCommand.java (79%)
 create mode 100644 streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/migration/PipelineElementMigrationHandler.java
 copy streampipes-client/src/main/java/org/apache/streampipes/client/api/SupportsPipelineApi.java => streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/node/NodeSyncOptions.java (85%)
 copy streampipes-storage-api/src/main/java/org/apache/streampipes/storage/api/{INodeInfoStorage.java => INodeDataStreamRelay.java} (63%)
 create mode 100644 streampipes-storage-couchdb/src/main/java/org/apache/streampipes/storage/couchdb/impl/NodeDataStreamRelayImpl.java
 copy streampipes-maven-plugin/src/main/java/org/apache/streampipes/smp/util/DuplicateRemover.java => streampipes-vocabulary/src/main/java/org/apache/streampipes/vocabulary/SemanticTypeRegistry.java (58%)
 copy ui/src/app/platform-services/apis/{pipeline-monitoring.service.ts => semantic-types.service.ts} (73%)


[incubator-streampipes] 01/02: [WIP] refactoring pipeline element migration and node controller sync on rejoin

Posted by wi...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

wiener pushed a commit to branch edge-extensions
in repository https://gitbox.apache.org/repos/asf/incubator-streampipes.git

commit afacf8916c4433f233a34b883dd6dc5209c4eddb
Author: Patrick Wiener <wi...@fzi.de>
AuthorDate: Thu Mar 4 01:40:02 2021 +0100

    [WIP] refactoring pipeline element migration and node controller sync on rejoin
---
 .../model/{ => eventrelay}/SpDataStreamRelay.java  |   2 +-
 .../SpDataStreamRelayContainer.java                |  58 ++-
 .../model/eventrelay}/metrics/RelayMetrics.java    |   2 +-
 .../model/graph/DataProcessorDescription.java      |   2 +-
 .../model/graph/DataProcessorInvocation.java       |   2 +-
 .../model/message/NotificationType.java            |   5 +-
 .../pipeline/PipelineElementMigrationEntity.java   |  51 ++
 .../org/apache/streampipes/model/util/Cloner.java  |   2 +-
 streampipes-node-controller-container/pom.xml      |   4 +
 ...yResource.java => DataStreamRelayResource.java} |   8 +-
 .../container/api/InvocableEntityResource.java     |  55 +--
 .../api/NodeControllerResourceConfig.java          |   4 +-
 ...ource.java => NodeInfoDescriptionResource.java} |   4 +-
 .../container/management/node/NodeManager.java     |   5 +-
 .../management/pe/InvocableElementManager.java     | 111 +++--
 .../management/pe/PipelineElementLifeCycle.java    |   3 +-
 .../management/relay/DataStreamRelayManager.java   | 108 ++---
 .../container/management/relay/EventRelay.java     |   6 +-
 .../relay/bridges/MultiBrokerBridge.java           |   3 +-
 .../manager/execution/http/ElementSubmitter.java   | 162 +++++++
 .../manager/execution/http/GraphSubmitter.java     | 208 ++++-----
 .../manager/execution/http/HttpRequestBuilder.java |   9 +-
 .../manager/execution/http/MigrationHelpers.java   |  75 ---
 .../manager/execution/http/PipelineExecutor.java   | 513 ---------------------
 .../http/StreamRelayEndpointUrlGenerator.java      |   2 +-
 .../pipeline/AbstractPipelineExecutor.java         | 458 ++++++++++++++++++
 .../execution/pipeline/PipelineExecutor.java       | 110 +++++
 .../pipeline/PipelineMigrationExecutor.java        | 349 ++++++++++++++
 .../pipeline/PipelineMigrationHelpers.java         |  15 +-
 .../{http => pipeline}/PipelineStorageService.java |   2 +-
 .../execution/pipeline/migration/Command.java      |  17 +-
 .../migration/PipelineElementStartCommand.java     |  21 +-
 .../migration/PipelineElementStopCommand.java      |  20 +-
 .../pipeline/migration/RelayStartCommand.java      |  20 +-
 .../pipeline/migration/RelayStopCommand.java       |  20 +-
 .../manager/matching/InvocationGraphBuilder.java   |   2 +-
 .../migration/PipelineElementMigrationHandler.java | 147 ++++++
 .../manager/node/AbstractClusterManager.java       | 112 +++--
 .../manager/node/AvailableNodesFetcher.java        |   1 +
 .../manager/node/NodeClusterManager.java           | 112 ++++-
 .../streampipes/manager/node/NodeSyncOptions.java  |  16 +-
 .../streampipes/manager/operations/Operations.java |  51 +-
 .../org/apache/streampipes/rest/impl/Node.java     |   6 +-
 .../streampipes/rest/impl/PipelineResource.java    |   9 +-
 .../serializers/json/GsonSerializer.java           |   2 +
 .../jsonld/CustomAnnotationProvider.java           |   2 +
 .../serializers/jsonld/JsonLdTransformer.java      |   1 +
 .../streampipes/storage/api/INoSqlStorage.java     |   2 +
 .../storage/api/INodeDataStreamRelay.java          |  22 +-
 .../storage/couchdb/CouchDbStorageManager.java     |   5 +-
 .../couchdb/impl/NodeDataStreamRelayImpl.java      |  76 +++
 .../streampipes/storage/couchdb/utils/Utils.java   |   6 +
 ui/src/app/core-model/gen/streampipes-model.ts     |  14 +-
 .../pipeline-status-dialog.component.ts            |  12 +-
 54 files changed, 1910 insertions(+), 1124 deletions(-)

diff --git a/streampipes-model/src/main/java/org/apache/streampipes/model/SpDataStreamRelay.java b/streampipes-model/src/main/java/org/apache/streampipes/model/eventrelay/SpDataStreamRelay.java
similarity index 98%
rename from streampipes-model/src/main/java/org/apache/streampipes/model/SpDataStreamRelay.java
rename to streampipes-model/src/main/java/org/apache/streampipes/model/eventrelay/SpDataStreamRelay.java
index addbd55..0078b18 100644
--- a/streampipes-model/src/main/java/org/apache/streampipes/model/SpDataStreamRelay.java
+++ b/streampipes-model/src/main/java/org/apache/streampipes/model/eventrelay/SpDataStreamRelay.java
@@ -15,7 +15,7 @@
  * limitations under the License.
  *
  */
-package org.apache.streampipes.model;
+package org.apache.streampipes.model.eventrelay;
 
 import io.fogsy.empire.annotations.RdfProperty;
 import io.fogsy.empire.annotations.RdfsClass;
diff --git a/streampipes-model/src/main/java/org/apache/streampipes/model/SpDataStreamRelayContainer.java b/streampipes-model/src/main/java/org/apache/streampipes/model/eventrelay/SpDataStreamRelayContainer.java
similarity index 66%
rename from streampipes-model/src/main/java/org/apache/streampipes/model/SpDataStreamRelayContainer.java
rename to streampipes-model/src/main/java/org/apache/streampipes/model/eventrelay/SpDataStreamRelayContainer.java
index cb1cd40..9db3117 100644
--- a/streampipes-model/src/main/java/org/apache/streampipes/model/SpDataStreamRelayContainer.java
+++ b/streampipes-model/src/main/java/org/apache/streampipes/model/eventrelay/SpDataStreamRelayContainer.java
@@ -15,12 +15,16 @@
  * limitations under the License.
  *
  */
-package org.apache.streampipes.model;
+package org.apache.streampipes.model.eventrelay;
 
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.google.gson.annotations.SerializedName;
 import io.fogsy.empire.annotations.RdfProperty;
 import io.fogsy.empire.annotations.RdfsClass;
 import org.apache.commons.lang.RandomStringUtils;
+import org.apache.streampipes.model.SpDataStream;
 import org.apache.streampipes.model.base.NamedStreamPipesEntity;
+import org.apache.streampipes.model.graph.DataProcessorInvocation;
 import org.apache.streampipes.model.grounding.EventGrounding;
 import org.apache.streampipes.vocabulary.StreamPipes;
 
@@ -39,6 +43,13 @@ public class SpDataStreamRelayContainer extends NamedStreamPipesEntity {
     private static final long serialVersionUID = -4675162465357705480L;
 
     private static final String prefix = "urn:apache.org:relaystreamcontainer:";
+    private static final String RELAY_SUFFIX = "(Stream Relay)";
+
+    @JsonProperty("_id")
+    private @SerializedName("_id") String couchDbId;
+
+    @JsonProperty("_rev")
+    private @SerializedName("_rev") String couchDbRev;
 
     @OneToOne(fetch = FetchType.EAGER,
             cascade = {CascadeType.PERSIST, CascadeType.MERGE})
@@ -77,6 +88,31 @@ public class SpDataStreamRelayContainer extends NamedStreamPipesEntity {
         this.eventRelayStrategy = eventRelayStrategy;
     }
 
+    public SpDataStreamRelayContainer(DataProcessorInvocation desc) {
+        super(desc.getElementId());
+        this.setName(makeRelayName(desc.getName()));
+        this.setEventRelayStrategy(desc.getEventRelayStrategy());
+        this.setRunningStreamRelayInstanceId(desc.getDeploymentRunningInstanceId());
+        this.setInputGrounding(new EventGrounding(desc.getOutputStream().getEventGrounding()));
+        this.setOutputStreamRelays(desc.getOutputStreamRelays());
+        this.setDeploymentTargetNodeHostname(desc.getDeploymentTargetNodeHostname());
+        this.setDeploymentTargetNodePort(desc.getDeploymentTargetNodePort());
+        this.setDeploymentTargetNodeId(desc.getDeploymentTargetNodeId());
+    }
+
+    public SpDataStreamRelayContainer(String runningStreamRelayInstanceId, String eventRelayStrategy, SpDataStream desc,
+                                      List<SpDataStreamRelay> dataStreamRelays) {
+        super(desc.getElementId());
+        this.setRunningStreamRelayInstanceId(runningStreamRelayInstanceId);
+        this.setEventRelayStrategy(eventRelayStrategy);
+        this.setName(makeRelayName(desc.getName()));
+        this.setInputGrounding(new EventGrounding(desc.getEventGrounding()));
+        this.setDeploymentTargetNodeId(desc.getDeploymentTargetNodeId());
+        this.setDeploymentTargetNodeHostname(desc.getDeploymentTargetNodeHostname());
+        this.setDeploymentTargetNodePort(desc.getDeploymentTargetNodePort());
+        this.setOutputStreamRelays(dataStreamRelays);
+    }
+
     public SpDataStreamRelayContainer(NamedStreamPipesEntity other) {
         super(other);
     }
@@ -136,4 +172,24 @@ public class SpDataStreamRelayContainer extends NamedStreamPipesEntity {
     public void setDeploymentTargetNodePort(Integer deploymentTargetNodePort) {
         this.deploymentTargetNodePort = deploymentTargetNodePort;
     }
+
+    private String makeRelayName(String name) {
+        return name + " " + RELAY_SUFFIX;
+    }
+
+    public String getCouchDbId() {
+        return couchDbId;
+    }
+
+    public void setCouchDbId(String couchDbId) {
+        this.couchDbId = couchDbId;
+    }
+
+    public String getCouchDbRev() {
+        return couchDbRev;
+    }
+
+    public void setCouchDbRev(String couchDbRev) {
+        this.couchDbRev = couchDbRev;
+    }
 }
diff --git a/streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/management/relay/metrics/RelayMetrics.java b/streampipes-model/src/main/java/org/apache/streampipes/model/eventrelay/metrics/RelayMetrics.java
similarity index 96%
rename from streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/management/relay/metrics/RelayMetrics.java
rename to streampipes-model/src/main/java/org/apache/streampipes/model/eventrelay/metrics/RelayMetrics.java
index 6a019ab..e70667a 100644
--- a/streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/management/relay/metrics/RelayMetrics.java
+++ b/streampipes-model/src/main/java/org/apache/streampipes/model/eventrelay/metrics/RelayMetrics.java
@@ -15,7 +15,7 @@
  * limitations under the License.
  *
  */
-package org.apache.streampipes.node.controller.container.management.relay.metrics;
+package org.apache.streampipes.model.eventrelay.metrics;
 
 import org.apache.streampipes.model.grounding.TransportProtocol;
 
diff --git a/streampipes-model/src/main/java/org/apache/streampipes/model/graph/DataProcessorDescription.java b/streampipes-model/src/main/java/org/apache/streampipes/model/graph/DataProcessorDescription.java
index ac43c71..cee32d4 100644
--- a/streampipes-model/src/main/java/org/apache/streampipes/model/graph/DataProcessorDescription.java
+++ b/streampipes-model/src/main/java/org/apache/streampipes/model/graph/DataProcessorDescription.java
@@ -21,7 +21,7 @@ package org.apache.streampipes.model.graph;
 import io.fogsy.empire.annotations.RdfProperty;
 import io.fogsy.empire.annotations.RdfsClass;
 import org.apache.streampipes.model.SpDataStream;
-import org.apache.streampipes.model.SpDataStreamRelay;
+import org.apache.streampipes.model.eventrelay.SpDataStreamRelay;
 import org.apache.streampipes.model.base.ConsumableStreamPipesEntity;
 import org.apache.streampipes.model.output.OutputStrategy;
 import org.apache.streampipes.model.staticproperty.StaticProperty;
diff --git a/streampipes-model/src/main/java/org/apache/streampipes/model/graph/DataProcessorInvocation.java b/streampipes-model/src/main/java/org/apache/streampipes/model/graph/DataProcessorInvocation.java
index 5c3236b..1e7a4d6 100644
--- a/streampipes-model/src/main/java/org/apache/streampipes/model/graph/DataProcessorInvocation.java
+++ b/streampipes-model/src/main/java/org/apache/streampipes/model/graph/DataProcessorInvocation.java
@@ -21,7 +21,7 @@ package org.apache.streampipes.model.graph;
 import io.fogsy.empire.annotations.RdfProperty;
 import io.fogsy.empire.annotations.RdfsClass;
 import org.apache.streampipes.model.SpDataStream;
-import org.apache.streampipes.model.SpDataStreamRelay;
+import org.apache.streampipes.model.eventrelay.SpDataStreamRelay;
 import org.apache.streampipes.model.base.InvocableStreamPipesEntity;
 import org.apache.streampipes.model.output.OutputStrategy;
 import org.apache.streampipes.model.staticproperty.StaticProperty;
diff --git a/streampipes-model/src/main/java/org/apache/streampipes/model/message/NotificationType.java b/streampipes-model/src/main/java/org/apache/streampipes/model/message/NotificationType.java
index 4a1c81e..d7bf388 100644
--- a/streampipes-model/src/main/java/org/apache/streampipes/model/message/NotificationType.java
+++ b/streampipes-model/src/main/java/org/apache/streampipes/model/message/NotificationType.java
@@ -68,7 +68,10 @@ public enum NotificationType {
 	INSTALLATION_SUCCESSFUL("Installation successful", ""), 
 	
 	PROPERTY_FILE_WRITTEN("Writing properties file...", ""), 
-	ADMIN_USER_CREATED("Creating admin user...", "");
+	ADMIN_USER_CREATED("Creating admin user...", ""),
+
+	NODE_JOIN_SUCCESS("Success", "Node successfully joined"),
+	NODE_JOIN_ERROR("Error", "Node could not be joined");
 
 
 	private final String title;
diff --git a/streampipes-model/src/main/java/org/apache/streampipes/model/pipeline/PipelineElementMigrationEntity.java b/streampipes-model/src/main/java/org/apache/streampipes/model/pipeline/PipelineElementMigrationEntity.java
new file mode 100644
index 0000000..1973d6e
--- /dev/null
+++ b/streampipes-model/src/main/java/org/apache/streampipes/model/pipeline/PipelineElementMigrationEntity.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.apache.streampipes.model.pipeline;
+
+import org.apache.streampipes.model.base.InvocableStreamPipesEntity;
+
+public class PipelineElementMigrationEntity {
+
+    private InvocableStreamPipesEntity sourceElement;
+    private InvocableStreamPipesEntity targetElement;
+
+    public PipelineElementMigrationEntity() {
+    }
+
+    public PipelineElementMigrationEntity(InvocableStreamPipesEntity sourceElement,
+                                          InvocableStreamPipesEntity targetElement) {
+        this.sourceElement = sourceElement;
+        this.targetElement = targetElement;
+    }
+
+    public InvocableStreamPipesEntity getSourceElement() {
+        return sourceElement;
+    }
+
+    public void setSourceElement(InvocableStreamPipesEntity sourceElement) {
+        this.sourceElement = sourceElement;
+    }
+
+    public InvocableStreamPipesEntity getTargetElement() {
+        return targetElement;
+    }
+
+    public void setTargetElement(InvocableStreamPipesEntity targetElement) {
+        this.targetElement = targetElement;
+    }
+}
diff --git a/streampipes-model/src/main/java/org/apache/streampipes/model/util/Cloner.java b/streampipes-model/src/main/java/org/apache/streampipes/model/util/Cloner.java
index 2ac64de..dc92659 100644
--- a/streampipes-model/src/main/java/org/apache/streampipes/model/util/Cloner.java
+++ b/streampipes-model/src/main/java/org/apache/streampipes/model/util/Cloner.java
@@ -18,7 +18,7 @@
 
 package org.apache.streampipes.model.util;
 
-import org.apache.streampipes.model.SpDataStreamRelay;
+import org.apache.streampipes.model.eventrelay.SpDataStreamRelay;
 import org.apache.streampipes.model.grounding.*;
 import org.apache.streampipes.model.output.*;
 import org.apache.streampipes.model.resource.Hardware;
diff --git a/streampipes-node-controller-container/pom.xml b/streampipes-node-controller-container/pom.xml
index 4076064..d5e0f49 100644
--- a/streampipes-node-controller-container/pom.xml
+++ b/streampipes-node-controller-container/pom.xml
@@ -131,6 +131,10 @@
             <groupId>org.apache.kafka</groupId>
             <artifactId>kafka-clients</artifactId>
         </dependency>
+        <dependency>
+            <groupId>com.fasterxml.jackson.module</groupId>
+            <artifactId>jackson-module-jaxb-annotations</artifactId>
+        </dependency>
     </dependencies>
 
     <build>
diff --git a/streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/api/AdapterDataStreamRelayResource.java b/streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/api/DataStreamRelayResource.java
similarity index 82%
rename from streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/api/AdapterDataStreamRelayResource.java
rename to streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/api/DataStreamRelayResource.java
index 00bb34a..0883a68 100644
--- a/streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/api/AdapterDataStreamRelayResource.java
+++ b/streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/api/DataStreamRelayResource.java
@@ -17,7 +17,7 @@
  */
 package org.apache.streampipes.node.controller.container.api;
 
-import org.apache.streampipes.model.SpDataStreamRelayContainer;
+import org.apache.streampipes.model.eventrelay.SpDataStreamRelayContainer;
 import org.apache.streampipes.node.controller.container.management.relay.DataStreamRelayManager;
 import org.apache.streampipes.rest.shared.annotation.JacksonSerialized;
 
@@ -25,7 +25,7 @@ import javax.ws.rs.*;
 import javax.ws.rs.core.MediaType;
 
 @Path("/api/v2/node/stream/relay")
-public class AdapterDataStreamRelayResource extends AbstractResource {
+public class DataStreamRelayResource extends AbstractResource {
 
     @POST
     @JacksonSerialized
@@ -33,7 +33,7 @@ public class AdapterDataStreamRelayResource extends AbstractResource {
     @Consumes(MediaType.APPLICATION_JSON)
     @Produces(MediaType.APPLICATION_JSON)
     public javax.ws.rs.core.Response invoke(SpDataStreamRelayContainer graph) {
-        return ok(DataStreamRelayManager.getInstance().startAdapterDataStreamRelay(graph));
+        return ok(DataStreamRelayManager.getInstance().start(graph));
     }
 
     @DELETE
@@ -41,6 +41,6 @@ public class AdapterDataStreamRelayResource extends AbstractResource {
     @Consumes(MediaType.APPLICATION_JSON)
     @Produces(MediaType.APPLICATION_JSON)
     public javax.ws.rs.core.Response detach(@PathParam("runningInstanceId") String runningInstanceId) {
-        return ok(DataStreamRelayManager.getInstance().stopAdapterDataStreamRelay(runningInstanceId));
+        return ok(DataStreamRelayManager.getInstance().stop(runningInstanceId));
     }
 }
diff --git a/streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/api/InvocableEntityResource.java b/streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/api/InvocableEntityResource.java
index 57f8c29..6028438 100644
--- a/streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/api/InvocableEntityResource.java
+++ b/streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/api/InvocableEntityResource.java
@@ -21,12 +21,9 @@ import com.fasterxml.jackson.core.JsonProcessingException;
 import org.apache.streampipes.commons.exceptions.SpRuntimeException;
 import org.apache.streampipes.container.model.node.InvocableRegistration;
 import org.apache.streampipes.model.Response;
-import org.apache.streampipes.model.SpDataStreamRelayContainer;
 import org.apache.streampipes.model.base.InvocableStreamPipesEntity;
-import org.apache.streampipes.model.graph.DataProcessorInvocation;
 import org.apache.streampipes.node.controller.container.management.pe.InvocableElementManager;
 import org.apache.streampipes.node.controller.container.management.pe.RunningInvocableInstances;
-import org.apache.streampipes.node.controller.container.management.relay.DataStreamRelayManager;
 import org.apache.streampipes.rest.shared.annotation.JacksonSerialized;
 import org.apache.streampipes.serializers.json.JacksonSerializer;
 import org.slf4j.Logger;
@@ -34,12 +31,6 @@ import org.slf4j.LoggerFactory;
 
 import javax.ws.rs.*;
 import javax.ws.rs.core.*;
-import java.lang.annotation.Annotation;
-import java.net.URI;
-import java.util.Date;
-import java.util.Locale;
-import java.util.Map;
-import java.util.Set;
 
 @Path("/api/v2/node/element")
 public class InvocableEntityResource extends AbstractResource {
@@ -62,26 +53,22 @@ public class InvocableEntityResource extends AbstractResource {
     @Produces(MediaType.APPLICATION_JSON)
     public javax.ws.rs.core.Response invoke(@PathParam("identifier") String identifier,
                                             @PathParam("elementId") String elementId, InvocableStreamPipesEntity graph) {
-        String endpoint;
 
         if (identifier.equals(DATA_PROCESSOR_PREFIX)) {
-            endpoint = graph.getBelongsTo();
-            DataStreamRelayManager.getInstance().startPipelineElementDataStreamRelay((DataProcessorInvocation) graph);
-            Response resp = InvocableElementManager.getInstance().invoke(endpoint, toJson(graph));
-            if (resp.isSuccess()) {
+            Response elementResponse = InvocableElementManager.getInstance().invoke(graph);
+            if (elementResponse.isSuccess()) {
                 RunningInvocableInstances.INSTANCE.add(graph.getDeploymentRunningInstanceId(), graph);
             }
-            return ok(resp);
+            return ok(elementResponse);
         }
         // Currently no data sinks are registered at node controller. If we, at some point, want to also run data
         // sinks on edge nodes we need to register there Declarer at the node controller one startup.
         else if (identifier.equals(DATA_SINK_PREFIX)) {
-            endpoint = graph.getBelongsTo();
-            Response resp = InvocableElementManager.getInstance().invoke(endpoint, toJson(graph));
-            if (resp.isSuccess()) {
+            Response elementResponse = InvocableElementManager.getInstance().invoke(graph);
+            if (elementResponse.isSuccess()) {
                 RunningInvocableInstances.INSTANCE.add(graph.getDeploymentRunningInstanceId(), graph);
             }
-            return ok(resp);
+            return ok(elementResponse);
         }
 
         return ok();
@@ -96,37 +83,7 @@ public class InvocableEntityResource extends AbstractResource {
         String endpoint = RunningInvocableInstances.INSTANCE.get(runningInstanceId).getBelongsTo();
         Response resp = InvocableElementManager.getInstance().detach(endpoint + SLASH + runningInstanceId);
         RunningInvocableInstances.INSTANCE.remove(runningInstanceId);
-        DataStreamRelayManager.getInstance().stopPipelineElementDataStreamRelay(runningInstanceId);
 
         return ok(resp);
     }
-
-
-    @DELETE
-    @Path("{identifier}/{elementId}/{runningInstanceId}/relay")
-    @Produces(MediaType.APPLICATION_JSON)
-    public Response detachRelay(@PathParam("identifier") String identifier,
-                                            @PathParam("elementId") String elementId,
-                                            @PathParam("runningInstanceId") String runningInstanceId) {
-        return DataStreamRelayManager.getInstance().stopDataStreamRelay(runningInstanceId);
-    }
-
-    @POST
-    @Path("{identifier}/{elementId}/{runningInstanceId}/relay")
-    @Produces(MediaType.APPLICATION_JSON)
-    public Response invokeRelay(@PathParam("identifier") String identifier,
-                                @PathParam("elementId") String elementId,
-                                @PathParam("runningInstanceId") String runningInstanceId,
-                                SpDataStreamRelayContainer relay) {
-        return DataStreamRelayManager.getInstance().startDataStreamRelay(relay, runningInstanceId);
-    }
-
-    private String toJson(InvocableStreamPipesEntity graph) {
-        try {
-            return JacksonSerializer.getObjectMapper().writeValueAsString(graph);
-        } catch (JsonProcessingException e) {
-            e.printStackTrace();
-        }
-        throw new SpRuntimeException("Could not serialize object: " + graph);
-    }
 }
diff --git a/streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/api/NodeControllerResourceConfig.java b/streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/api/NodeControllerResourceConfig.java
index 5fcd3dc..ac277e8 100644
--- a/streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/api/NodeControllerResourceConfig.java
+++ b/streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/api/NodeControllerResourceConfig.java
@@ -25,9 +25,9 @@ public class NodeControllerResourceConfig extends ResourceConfig {
 
     public NodeControllerResourceConfig() {
         register(HealthCheckResource.class);
-        register(InfoStatusResource.class);
+        register(NodeInfoDescriptionResource.class);
         register(InvocableEntityResource.class);
-        register(AdapterDataStreamRelayResource.class);
+        register(DataStreamRelayResource.class);
         register(ConnectResource.class);
         register(ContainerResource.class);
     }
diff --git a/streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/api/InfoStatusResource.java b/streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/api/NodeInfoDescriptionResource.java
similarity index 97%
rename from streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/api/InfoStatusResource.java
rename to streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/api/NodeInfoDescriptionResource.java
index 7e7b36a..6bbe2f5 100644
--- a/streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/api/InfoStatusResource.java
+++ b/streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/api/NodeInfoDescriptionResource.java
@@ -28,7 +28,7 @@ import javax.ws.rs.core.MediaType;
 import javax.ws.rs.core.Response;
 
 @Path("/api/v2/node/info")
-public class InfoStatusResource extends AbstractResource {
+public class NodeInfoDescriptionResource extends AbstractResource {
 
     private static final String ACTIVATE = "activate";
     private static final String DEACTIVATE = "deactivate";
@@ -70,6 +70,6 @@ public class InfoStatusResource extends AbstractResource {
     @Path("/relays")
     @Produces(MediaType.APPLICATION_JSON)
     public Response getAllRelays() {
-        return ok(DataStreamRelayManager.getInstance().getAllRelays());
+        return ok(DataStreamRelayManager.getInstance().getAllRelaysMetrics());
     }
 }
diff --git a/streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/management/node/NodeManager.java b/streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/management/node/NodeManager.java
index d9ec40d..0f01751 100644
--- a/streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/management/node/NodeManager.java
+++ b/streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/management/node/NodeManager.java
@@ -127,13 +127,13 @@ public class NodeManager {
     }
 
     public Message activate() {
-        LOG.info("Deactivate node controller");
+        LOG.info("Activate node controller");
         this.nodeInfo.setActive(true);
         return Notifications.success(NotificationType.OPERATION_SUCCESS);
     }
 
     public Message deactivate() {
-        LOG.info("Activate node controller");
+        LOG.info("Deactivate node controller");
         this.nodeInfo.setActive(false);
         return Notifications.success(NotificationType.OPERATION_SUCCESS);
     }
@@ -151,6 +151,7 @@ public class NodeManager {
         SuccessMessage message = JacksonSerializer
                 .getObjectMapper()
                 .readValue(resp, SuccessMessage.class);
+        LOG.info(message.getNotifications().get(0).getDescription());
         return message.isSuccess();
     }
 
diff --git a/streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/management/pe/InvocableElementManager.java b/streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/management/pe/InvocableElementManager.java
index 4db36b3..9891ee6 100644
--- a/streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/management/pe/InvocableElementManager.java
+++ b/streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/management/pe/InvocableElementManager.java
@@ -17,12 +17,16 @@
  */
 package org.apache.streampipes.node.controller.container.management.pe;
 
+import com.fasterxml.jackson.core.JsonProcessingException;
 import com.google.gson.JsonSyntaxException;
 import org.apache.http.client.fluent.Request;
 import org.apache.http.entity.ContentType;
 import org.apache.http.entity.StringEntity;
+import org.apache.streampipes.commons.exceptions.SpRuntimeException;
 import org.apache.streampipes.container.model.node.InvocableRegistration;
 import org.apache.streampipes.model.Response;
+import org.apache.streampipes.model.base.InvocableStreamPipesEntity;
+import org.apache.streampipes.model.node.NodeInfoDescription;
 import org.apache.streampipes.node.controller.container.config.NodeControllerConfig;
 import org.apache.streampipes.node.controller.container.management.node.NodeManager;
 import org.apache.streampipes.serializers.json.JacksonSerializer;
@@ -31,6 +35,7 @@ import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
 import java.util.Collections;
+import java.util.List;
 
 public class InvocableElementManager implements PipelineElementLifeCycle {
 
@@ -58,50 +63,19 @@ public class InvocableElementManager implements PipelineElementLifeCycle {
 
     @Override
     public void register(InvocableRegistration registration) {
-        try {
-            Request.Put(makeConsulRegistrationEndpoint())
-                    .addHeader("accept", "application/json")
-                    .body(new StringEntity(JacksonSerializer
-                            .getObjectMapper()
-                            .writeValueAsString(registration.getConsulServiceRegistrationBody())))
-                    .execute();
-
-            // TODO: persistent storage to survive failures
-            NodeManager.getInstance()
-                    .retrieveNodeInfoDescription()
-                    .setSupportedElements(registration.getSupportedPipelineElementAppIds());
-
-            String url = "http://"
-                            + NodeControllerConfig.INSTANCE.getBackendHost()
-                            + ":"
-                            + NodeControllerConfig.INSTANCE.getBackendPort()
-                            + "/"
-                            + "streampipes-backend/api/v2/users/admin@streampipes.org/nodes"
-                            + "/"
-                            + NodeControllerConfig.INSTANCE.getNodeControllerId();
-
-            String desc = JacksonSerializer.getObjectMapper()
-                    .writeValueAsString(NodeManager.getInstance().retrieveNodeInfoDescription());
-
-            Request.Put(url)
-                    .bodyString(desc, ContentType.APPLICATION_JSON)
-//                    .connectTimeout(1000)
-//                    .socketTimeout(100000)
-                    .execute();
-
-            LOG.info("Successfully registered pipeline element container");
-        } catch (IOException e) {
-            e.printStackTrace();
-        }
+        registerAtConsul(registration);
+        updateAndSyncNodeInfoDescription(registration);
+        LOG.info("Successfully registered pipeline element container");
     }
 
     @Override
-    public Response invoke(String endpoint, String payload) {
+    public Response invoke(InvocableStreamPipesEntity graph) {
+        String endpoint = graph.getBelongsTo();
         LOG.info("Invoke pipeline element: {}", endpoint);
         try {
             org.apache.http.client.fluent.Response httpResp = Request
                     .Post(endpoint)
-                    .bodyString(payload, ContentType.APPLICATION_JSON)
+                    .bodyString(toJson(graph), ContentType.APPLICATION_JSON)
                     .connectTimeout(CONNECT_TIMEOUT)
                     .execute();
             return handleResponse(httpResp);
@@ -129,29 +103,58 @@ public class InvocableElementManager implements PipelineElementLifeCycle {
     @Override
     public void unregister(){
         // TODO: unregister element from Consul and
+        setSupportedPipelineElements(Collections.emptyList());
+        try {
+            String url = generateBackendEndpoint();
+            String desc = toJson(getNodeInfoDescription());
+            Request.Put(url)
+                    .bodyString(desc, ContentType.APPLICATION_JSON)
+                    .execute();
+        } catch (IOException e) {
+            e.printStackTrace();
+        }
+    }
+
+    private void updateAndSyncNodeInfoDescription(InvocableRegistration registration) {
+        setSupportedPipelineElements(registration.getSupportedPipelineElementAppIds());
+        try {
+            String url = generateBackendEndpoint();
+            String desc = toJson(getNodeInfoDescription());
+            Request.Put(url)
+                    .bodyString(desc, ContentType.APPLICATION_JSON)
+                    .execute();
+        } catch (IOException e) {
+            e.printStackTrace();
+        }
+    }
+
+    private NodeInfoDescription getNodeInfoDescription() {
+        return NodeManager.getInstance().retrieveNodeInfoDescription();
+    }
+
+    private void setSupportedPipelineElements(List<String> supportedPipelineElements) {
         NodeManager.getInstance()
                 .retrieveNodeInfoDescription()
-                .setSupportedElements(Collections.emptyList());
+                .setSupportedElements(supportedPipelineElements);
+    }
 
-        String url = "http://"
+    private String generateBackendEndpoint() {
+        return HTTP_PROTOCOL
                 + NodeControllerConfig.INSTANCE.getBackendHost()
-                + ":"
+                + COLON
                 + NodeControllerConfig.INSTANCE.getBackendPort()
-                + "/"
+                + SLASH
                 + "streampipes-backend/api/v2/users/admin@streampipes.org/nodes"
-                + "/"
+                + SLASH
                 + NodeControllerConfig.INSTANCE.getNodeControllerId();
+    }
 
+    private void registerAtConsul(InvocableRegistration registration) {
         try {
-            String desc = JacksonSerializer.getObjectMapper()
-                    .writeValueAsString(NodeManager.getInstance().retrieveNodeInfoDescription());
-
-            Request.Put(url)
-                    .bodyString(desc, ContentType.APPLICATION_JSON)
-                    .connectTimeout(1000)
-                    .socketTimeout(100000)
+            Request.Put(makeConsulRegistrationEndpoint())
+                    .addHeader("accept", "application/json")
+                    .body(new StringEntity(toJson(registration.getConsulServiceRegistrationBody())))
                     .execute();
-
         } catch (IOException e) {
             e.printStackTrace();
         }
@@ -183,4 +186,12 @@ public class InvocableElementManager implements PipelineElementLifeCycle {
         }
     }
 
+    private <T> String toJson(T element) {
+        try {
+            return JacksonSerializer.getObjectMapper().writeValueAsString(element);
+        } catch (JsonProcessingException e) {
+            throw new SpRuntimeException("Could not serialize object: " + element, e);
+        }
+    }
+
 }
diff --git a/streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/management/pe/PipelineElementLifeCycle.java b/streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/management/pe/PipelineElementLifeCycle.java
index 0ad32ee..107c717 100644
--- a/streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/management/pe/PipelineElementLifeCycle.java
+++ b/streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/management/pe/PipelineElementLifeCycle.java
@@ -19,12 +19,13 @@ package org.apache.streampipes.node.controller.container.management.pe;
 
 import org.apache.streampipes.container.model.node.InvocableRegistration;
 import org.apache.streampipes.model.Response;
+import org.apache.streampipes.model.base.InvocableStreamPipesEntity;
 
 public interface PipelineElementLifeCycle {
 
     void register(InvocableRegistration registration);
 
-    Response invoke(String endpoint, String payload);
+    Response invoke(InvocableStreamPipesEntity graph);
 
     Response detach(String runningInstanceId);
 
diff --git a/streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/management/relay/DataStreamRelayManager.java b/streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/management/relay/DataStreamRelayManager.java
index 638e9bc..0fbe538 100644
--- a/streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/management/relay/DataStreamRelayManager.java
+++ b/streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/management/relay/DataStreamRelayManager.java
@@ -17,16 +17,17 @@
  */
 package org.apache.streampipes.node.controller.container.management.relay;
 
+import org.apache.streampipes.commons.exceptions.SpRuntimeException;
 import org.apache.streampipes.model.Response;
-import org.apache.streampipes.model.SpDataStreamRelay;
-import org.apache.streampipes.model.SpDataStreamRelayContainer;
+import org.apache.streampipes.model.base.InvocableStreamPipesEntity;
+import org.apache.streampipes.model.eventrelay.SpDataStreamRelay;
+import org.apache.streampipes.model.eventrelay.SpDataStreamRelayContainer;
 import org.apache.streampipes.model.graph.DataProcessorInvocation;
 import org.apache.streampipes.model.grounding.TransportProtocol;
-import org.apache.streampipes.node.controller.container.management.relay.metrics.RelayMetrics;
+import org.apache.streampipes.model.eventrelay.metrics.RelayMetrics;
 
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
+
+import java.util.*;
 import java.util.stream.Collectors;
 
 public class DataStreamRelayManager {
@@ -45,80 +46,65 @@ public class DataStreamRelayManager {
         return instance;
     }
 
-    public Response startAdapterDataStreamRelay(SpDataStreamRelayContainer desc) {
-        String strategy = desc.getEventRelayStrategy();
-        String runningInstanceId = desc.getRunningStreamRelayInstanceId();
-        TransportProtocol source = desc.getInputGrounding().getTransportProtocol();
-
-        Map<String, EventRelay> eventRelayMap = new HashMap<>();
-
-        desc.getOutputStreamRelays().forEach(r -> {
-            TransportProtocol target = r.getEventGrounding().getTransportProtocol();
-            EventRelay eventRelay = new EventRelay(source, target, strategy);
-            eventRelay.start();
-            eventRelayMap.put(r.getElementId(), eventRelay);
-        });
-        RunningRelayInstances.INSTANCE.add(desc.getRunningStreamRelayInstanceId(), eventRelayMap);
-        return new Response(runningInstanceId,true,"");
-    }
-
-    public Response stopAdapterDataStreamRelay(String id) {
-        return stopDataStreamRelay(id);
+    public Response start(InvocableStreamPipesEntity graph) {
+        return start(convert(graph));
     }
 
-
-    public Response startDataStreamRelay(SpDataStreamRelayContainer desc, String id) {
+    public Response start(SpDataStreamRelayContainer desc) {
         String strategy = desc.getEventRelayStrategy();
+        String id = desc.getRunningStreamRelayInstanceId();
         TransportProtocol source = desc.getInputGrounding().getTransportProtocol();
 
         Map<String, EventRelay> eventRelayMap = new HashMap<>();
 
-        desc.getOutputStreamRelays().forEach(r -> {
-            TransportProtocol target = r.getEventGrounding().getTransportProtocol();
-            EventRelay eventRelay = new EventRelay(source, target, strategy);
-            eventRelay.start();
-            eventRelayMap.put(r.getElementId(), eventRelay);
+        // start data stream relay
+        // 1:1 mapping -> remote forward
+        // 1:n mamping -> remote fan-out
+        desc.getOutputStreamRelays().forEach(relay -> {
+            // add new relay if not running
+            if(!runningRelayExists(id, relay)) {
+                TransportProtocol target = relay.getEventGrounding().getTransportProtocol();
+                EventRelay eventRelay = new EventRelay(source, target, strategy);
+                eventRelay.start();
+                eventRelayMap.put(relay.getElementId(), eventRelay);
+            }
         });
-        RunningRelayInstances.INSTANCE.add(id, eventRelayMap);
+        RunningRelayInstances.INSTANCE.add(desc.getRunningStreamRelayInstanceId(), eventRelayMap);
         return new Response(id,true,"");
     }
 
-    public Response stopDataStreamRelay(String id) {
-        Map<String, EventRelay> relay = RunningRelayInstances.INSTANCE.get(id);
-        if (relay != null) {
-            relay.values().forEach(EventRelay::stop);
+    public Response stop(String id) {
+        Map<String, EventRelay> relays = RunningRelayInstances.INSTANCE.get(id);
+        if (relays != null) {
+            relays.values().forEach(EventRelay::stop);
         }
         RunningRelayInstances.INSTANCE.remove(id);
         return new Response(id, true, "");
     }
 
-    public void startPipelineElementDataStreamRelay(DataProcessorInvocation graph) {
-        TransportProtocol source = graph
-                .getOutputStream()
-                .getEventGrounding()
-                .getTransportProtocol();
-
-        String strategy = graph.getEventRelayStrategy();
-        Map<String, EventRelay> eventRelayMap = new HashMap<>();
-
-        List<SpDataStreamRelay> dataStreamRelays = graph.getOutputStreamRelays();
-        dataStreamRelays.forEach(r -> {
-            TransportProtocol target = r.getEventGrounding().getTransportProtocol();
-            EventRelay eventRelay = new EventRelay(source, target, strategy);
-            eventRelay.start();
-            eventRelayMap.put(r.getElementId(), eventRelay);
-        });
-        RunningRelayInstances.INSTANCE.add(graph.getDeploymentRunningInstanceId(), eventRelayMap);
-    }
-
-    public void stopPipelineElementDataStreamRelay(String id) {
-        stopDataStreamRelay(id);
-    }
-
-    public List<RelayMetrics> getAllRelays() {
+    public List<RelayMetrics> getAllRelaysMetrics() {
        return RunningRelayInstances.INSTANCE.getRunningInstances()
                     .stream()
                     .map(EventRelay::getRelayMetrics)
                     .collect(Collectors.toList());
     }
+
+    private boolean runningRelayExists(String id, SpDataStreamRelay relay) {
+        Map<String, EventRelay> relays = RunningRelayInstances.INSTANCE.get(id);
+        if (relays != null) {
+            return relays.keySet().stream()
+                    .anyMatch(eventRelay -> eventRelay.equals(relay.getElementId()));
+        } else {
+            return false;
+        }
+    }
+
+    // Helpers
+
+    private SpDataStreamRelayContainer convert(InvocableStreamPipesEntity graph) {
+        if (graph instanceof DataProcessorInvocation) {
+            return new SpDataStreamRelayContainer((DataProcessorInvocation) graph);
+        }
+        throw new SpRuntimeException("Could not convert pipeline element description to data stream relay");
+    }
 }
diff --git a/streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/management/relay/EventRelay.java b/streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/management/relay/EventRelay.java
index 7c52867..f22a815 100644
--- a/streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/management/relay/EventRelay.java
+++ b/streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/management/relay/EventRelay.java
@@ -19,7 +19,7 @@ package org.apache.streampipes.node.controller.container.management.relay;
 
 import org.apache.streampipes.commons.exceptions.SpRuntimeException;
 import org.apache.streampipes.model.grounding.*;
-import org.apache.streampipes.node.controller.container.management.relay.metrics.RelayMetrics;
+import org.apache.streampipes.model.eventrelay.metrics.RelayMetrics;
 
 public class EventRelay extends BaseEventRelay {
 
@@ -29,11 +29,11 @@ public class EventRelay extends BaseEventRelay {
         super(source, target, DEFAULT_EVENT_RELAY_STRATEGY);
     }
 
-    public EventRelay(TransportProtocol source, TransportProtocol target, String relayStrategy) {
+    public EventRelay(TransportProtocol source, TransportProtocol target, String relayStrategy)  {
         super(source, target, relayStrategy);
     }
 
-    public void start() throws SpRuntimeException{
+    public void start() throws SpRuntimeException {
         this.multiBrokerBridge.start();
     }
 
diff --git a/streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/management/relay/bridges/MultiBrokerBridge.java b/streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/management/relay/bridges/MultiBrokerBridge.java
index cc7c0c5..09b9057 100644
--- a/streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/management/relay/bridges/MultiBrokerBridge.java
+++ b/streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/management/relay/bridges/MultiBrokerBridge.java
@@ -21,10 +21,9 @@ import org.apache.streampipes.commons.exceptions.SpRuntimeException;
 import org.apache.streampipes.messaging.EventConsumer;
 import org.apache.streampipes.messaging.EventProducer;
 import org.apache.streampipes.messaging.EventRelay;
-import org.apache.streampipes.model.Response;
 import org.apache.streampipes.model.grounding.*;
+import org.apache.streampipes.model.eventrelay.metrics.RelayMetrics;
 import org.apache.streampipes.node.controller.container.config.NodeControllerConfig;
-import org.apache.streampipes.node.controller.container.management.relay.metrics.RelayMetrics;
 import org.apache.streampipes.sdk.helpers.Tuple3;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
diff --git a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/http/ElementSubmitter.java b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/http/ElementSubmitter.java
new file mode 100644
index 0000000..57715b9
--- /dev/null
+++ b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/http/ElementSubmitter.java
@@ -0,0 +1,162 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.apache.streampipes.manager.execution.http;
+
+import org.apache.streampipes.model.SpDataSet;
+import org.apache.streampipes.model.base.InvocableStreamPipesEntity;
+import org.apache.streampipes.model.base.NamedStreamPipesEntity;
+import org.apache.streampipes.model.eventrelay.SpDataStreamRelay;
+import org.apache.streampipes.model.eventrelay.SpDataStreamRelayContainer;
+import org.apache.streampipes.model.pipeline.PipelineElementStatus;
+import org.apache.streampipes.model.pipeline.PipelineOperationStatus;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.List;
+import java.util.Optional;
+
+public abstract class ElementSubmitter {
+    protected final static Logger LOG = LoggerFactory.getLogger(ElementSubmitter.class);
+
+    protected final String pipelineId;
+    protected final String pipelineName;
+    protected final List<InvocableStreamPipesEntity> graphs;
+    protected final List<SpDataSet> dataSets;
+    protected final List<SpDataStreamRelayContainer> relays;
+
+    public ElementSubmitter(String pipelineId, String pipelineName, List<InvocableStreamPipesEntity> graphs,
+                            List<SpDataSet> dataSets, List<SpDataStreamRelayContainer> relays) {
+        this.pipelineId = pipelineId;
+        this.pipelineName = pipelineName;
+        this.graphs = graphs;
+        this.dataSets = dataSets;
+        this.relays = relays;
+    }
+
+    protected abstract PipelineOperationStatus invokePipelineElementsAndRelays();
+
+    protected abstract PipelineOperationStatus detachPipelineElementsAndRelays();
+
+    protected abstract PipelineOperationStatus invokeRelaysOnMigration();
+
+    protected abstract PipelineOperationStatus detachRelaysOnMigration();
+
+
+    protected void invokePipelineElements(PipelineOperationStatus status) {
+        graphs.forEach(graph ->
+                status.addPipelineElementStatus(makeInvokeHttpRequest(new InvocableEntityUrlGenerator(graph), graph)));
+    }
+
+    protected void detachPipelineElements(PipelineOperationStatus status) {
+        graphs.forEach(graph ->
+                status.addPipelineElementStatus(makeDetachHttpRequest(new InvocableEntityUrlGenerator(graph), graph)));
+    }
+
+    protected void invokeDataSets(PipelineOperationStatus status) {
+        dataSets.forEach(dataset ->
+                status.addPipelineElementStatus(makeInvokeHttpRequest(new DataSetEntityUrlGenerator(dataset), dataset)));
+    }
+
+    protected void detachDataSets(PipelineOperationStatus status) {
+        dataSets.forEach(dataset ->
+                status.addPipelineElementStatus(makeDetachHttpRequest(new DataSetEntityUrlGenerator(dataset), dataset)));
+    }
+
+    protected void invokeRelays(PipelineOperationStatus status) {
+        relays.forEach(relay ->
+                status.addPipelineElementStatus(makeInvokeHttpRequest(new StreamRelayEndpointUrlGenerator(relay), relay)));
+    }
+
+    protected void detachRelays(PipelineOperationStatus status) {
+        relays.forEach(relay ->
+                status.addPipelineElementStatus(makeDetachHttpRequest(new StreamRelayEndpointUrlGenerator(relay), relay)));
+    }
+
+
+    protected PipelineElementStatus makeInvokeHttpRequest(EndpointUrlGenerator<?> urlGenerator,
+                                                        NamedStreamPipesEntity entity) {
+        if (entity instanceof SpDataStreamRelay) {
+            // data stream relays
+            return new HttpRequestBuilder(entity, urlGenerator.generateRelayEndpoint()).invoke();
+        } else {
+            // data sets, data processors, data sinks
+            return new HttpRequestBuilder(entity, urlGenerator.generateInvokeEndpoint()).invoke();
+        }
+    }
+
+    protected PipelineElementStatus makeDetachHttpRequest(EndpointUrlGenerator<?> urlGenerator,
+                                                        NamedStreamPipesEntity entity) {
+        if (entity instanceof SpDataStreamRelay) {
+            // data stream relays
+            return new HttpRequestBuilder(entity, urlGenerator.generateRelayEndpoint()).detach();
+        } else {
+            // data sets, data processors, data sinks
+            return new HttpRequestBuilder(entity, urlGenerator.generateDetachEndpoint()).detach();
+        }
+    }
+
+    protected PipelineOperationStatus verifyPipelineOperationStatus(PipelineOperationStatus status, String successMessage,
+                                                                  String errorMessage, boolean rollbackIfFailed) {
+        status.setSuccess(status.getElementStatus().stream()
+                .allMatch(PipelineElementStatus::isSuccess));
+
+        if (status.isSuccess()) {
+            status.setTitle(successMessage);
+        } else {
+            if (rollbackIfFailed) {
+                LOG.info("Could not start pipeline, initializing rollback...");
+                rollbackInvokedEntities(status);
+            }
+            status.setTitle(errorMessage);
+        }
+        return status;
+    }
+
+    private void rollbackInvokedEntities(PipelineOperationStatus status) {
+        for (PipelineElementStatus s : status.getElementStatus()) {
+            if (s.isSuccess()) {
+                Optional<InvocableStreamPipesEntity> graphs = findGraphs(s.getElementId());
+                graphs.ifPresent(graph -> {
+                    LOG.info("Rolling back element " + graph.getElementId());
+                    makeDetachHttpRequest(new InvocableEntityUrlGenerator(graph), graph);
+                });
+            }
+        }
+    }
+
+    // Helper methods
+
+    protected PipelineOperationStatus initPipelineOperationStatus() {
+        PipelineOperationStatus status = new PipelineOperationStatus();
+        status.setPipelineId(pipelineId);
+        status.setPipelineName(pipelineName);
+        return status;
+    }
+
+    private Optional<InvocableStreamPipesEntity> findGraphs(String elementId) {
+        return graphs.stream().filter(i -> i.getBelongsTo().equals(elementId)).findFirst();
+    }
+
+    protected boolean allInvocableEntitiesRunning(PipelineOperationStatus status) {
+        return status.getElementStatus().stream().allMatch(PipelineElementStatus::isSuccess);
+    }
+
+    protected boolean relaysExist() {
+        return relays.stream().anyMatch(s -> s.getOutputStreamRelays().size() > 0);
+    }
+}
diff --git a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/http/GraphSubmitter.java b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/http/GraphSubmitter.java
index ab14532..726d1a6 100644
--- a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/http/GraphSubmitter.java
+++ b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/http/GraphSubmitter.java
@@ -18,88 +18,57 @@
 
 package org.apache.streampipes.manager.execution.http;
 
-import org.apache.streampipes.model.SpDataStreamRelayContainer;
+import org.apache.streampipes.model.eventrelay.SpDataStreamRelayContainer;
 import org.apache.streampipes.model.base.NamedStreamPipesEntity;
 import org.apache.streampipes.model.graph.DataProcessorInvocation;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 import org.apache.streampipes.model.SpDataSet;
 import org.apache.streampipes.model.base.InvocableStreamPipesEntity;
-import org.apache.streampipes.model.pipeline.PipelineElementStatus;
 import org.apache.streampipes.model.pipeline.PipelineOperationStatus;
 
+import java.util.ArrayList;
 import java.util.List;
 import java.util.Map;
-import java.util.Optional;
 
-public class GraphSubmitter {
+public class GraphSubmitter extends ElementSubmitter {
 
-  private final static Logger LOG = LoggerFactory.getLogger(GraphSubmitter.class);
-
-  private final List<InvocableStreamPipesEntity> graphs;
-  private final List<SpDataSet> dataSets;
-  private final String pipelineId;
-  private final String pipelineName;
-  private final List<SpDataStreamRelayContainer> streamRelays;
-
-  public GraphSubmitter(String pipelineId, String pipelineName, List<InvocableStreamPipesEntity> graphs,
-                        List<SpDataSet> dataSets, List<SpDataStreamRelayContainer> streamRelays) {
-    this.graphs = graphs;
-    this.pipelineId = pipelineId;
-    this.pipelineName = pipelineName;
-    this.dataSets = dataSets;
-    this.streamRelays = streamRelays;
+  public GraphSubmitter(String pipelineId, String pipelineName) {
+    super(pipelineId, pipelineName, new ArrayList<>(), new ArrayList<>(), new ArrayList<>());
   }
 
-
-  public PipelineOperationStatus invokeRelays(Map<NamedStreamPipesEntity, SpDataStreamRelayContainer> relays) {
-    PipelineOperationStatus status = initPipelineOperationStatus();
-
-    relays.entrySet().forEach(e -> {
-      if(e.getValue().getOutputStreamRelays().size() > 0){
-          if(e.getKey() instanceof DataProcessorInvocation)
-            status.addPipelineElementStatus(makeHttpRequest(new InvocableEntityUrlGenerator((DataProcessorInvocation) e.getKey()), e.getValue(), "invokeRelay"));
-          else
-            status.addPipelineElementStatus(makeHttpRequest(new StreamRelayEndpointUrlGenerator(e.getValue()), e.getValue(), "invoke"));
-      }
-    });
-
-    return verifyPipelineOperationStatus(
-            status,
-            "Successfully started relays in pipeline " + pipelineName,
-            "Could not start relays in pipeline" + pipelineName,
-            true);
+  public GraphSubmitter(String pipelineId, String pipelineName, List<InvocableStreamPipesEntity> graphs) {
+    super(pipelineId, pipelineName, graphs, new ArrayList<>(), new ArrayList<>());
   }
 
-  public PipelineOperationStatus detachRelays(Map<NamedStreamPipesEntity, SpDataStreamRelayContainer> relays) {
-    PipelineOperationStatus status = initPipelineOperationStatus();
-
-    relays.entrySet().forEach(e -> {
-      if(e.getValue().getOutputStreamRelays().size() > 0){
-        if(e.getKey() instanceof DataProcessorInvocation)
-          status.addPipelineElementStatus(makeHttpRequest(new InvocableEntityUrlGenerator((DataProcessorInvocation) e.getKey()), e.getValue(), "detachRelay"));
-        else
-          status.addPipelineElementStatus(makeHttpRequest(new StreamRelayEndpointUrlGenerator(e.getValue()), e.getValue(), "detach"));
-      }
-    });
-
-    return verifyPipelineOperationStatus(
-            status,
-            "Successfully stopped relays in pipeline " + pipelineName,
-            "Could not stop all relays in pipeline " + pipelineName,
-            false);
+  public GraphSubmitter(String pipelineId, String pipelineName, List<InvocableStreamPipesEntity> graphs,
+                        List<SpDataSet> dataSets, List<SpDataStreamRelayContainer> relays) {
+    super(pipelineId, pipelineName, graphs, dataSets, relays);
   }
 
-  public PipelineOperationStatus invokeGraphs() {
+  /**
+   * Called when pipeline is started. This invokes all pipeline elements including relays between adjacent pipeline
+   * element nodes in the pipeline graph. Thereby, we do this along the following procedure:
+   *
+   * - start relays (if any)
+   * - start pipeline elements
+   * - start data sets (only if pipeline elements and relays started)
+   *
+   * We verify the status of this procedure and trigger a graceful rollback in case of any failure. In any case, we
+   * return an appropriate success/error message.
+   *
+   * @return PipelineOperationStatus
+   */
+  @Override
+  public PipelineOperationStatus invokePipelineElementsAndRelays() {
     PipelineOperationStatus status = initPipelineOperationStatus();
 
-    if (streamRelays.stream().anyMatch(s -> s.getOutputStreamRelays().size() > 0)) {
-      streamRelays.forEach(streamRelay -> invoke(new StreamRelayEndpointUrlGenerator(streamRelay), streamRelay, status));
+    if (relaysExist()) {
+      invokeRelays(status);
     }
-    graphs.forEach(graph -> invoke(new InvocableEntityUrlGenerator(graph), graph, status));
-    // only invoke datasets when following pipeline elements are started
+
+    invokePipelineElements(status);
+
     if (allInvocableEntitiesRunning(status)) {
-        dataSets.forEach(dataset -> invoke(new DataSetEntityUrlGenerator(dataset), dataset, status));
+      invokeDataSets(status);
     }
 
     return verifyPipelineOperationStatus(
@@ -109,13 +78,27 @@ public class GraphSubmitter {
             true);
   }
 
-  public PipelineOperationStatus detachGraphs() {
+  /**
+   * Called when pipeline is stopped. This detaches all pipeline elements including relays between adjacent pipeline
+   * element nodes in the pipeline graph. Thereby, we do this along the following procedure:
+   *
+   * - stop pipeline elements
+   * - stop data sets
+   * - stop relays (if any)
+   *
+   * We verify the status of this procedure and return the success/error message.
+   *
+   * @return PipelineOperationStatus
+   */
+  @Override
+  public PipelineOperationStatus detachPipelineElementsAndRelays() {
     PipelineOperationStatus status = initPipelineOperationStatus();
 
-    graphs.forEach(graph -> detach(new InvocableEntityUrlGenerator(graph), graph, status));
-    dataSets.forEach(dataset -> detach(new DataSetEntityUrlGenerator(dataset), dataset, status));
-    if (streamRelays.stream().anyMatch(s -> s.getOutputStreamRelays().size() > 0)) {
-      streamRelays.forEach(streamRelay -> detach(new StreamRelayEndpointUrlGenerator(streamRelay), streamRelay, status));
+    detachPipelineElements(status);
+    detachDataSets(status);
+
+    if (relaysExist()) {
+      detachRelays(status);
     }
 
     return verifyPipelineOperationStatus(
@@ -125,74 +108,41 @@ public class GraphSubmitter {
             false);
   }
 
-  private PipelineOperationStatus verifyPipelineOperationStatus(PipelineOperationStatus status, String successMessage,
-                                             String errorMessage, boolean rollbackIfFailed) {
-    status.setSuccess(status.getElementStatus().stream().allMatch(PipelineElementStatus::isSuccess));
-
-    if (status.isSuccess()) {
-      status.setTitle(successMessage);
-    } else {
-      if (rollbackIfFailed) {
-        LOG.info("Could not start pipeline, initializing rollback...");
-        rollbackInvokedEntities(status);
-      }
-      status.setTitle(errorMessage);
-    }
-    return status;
-  }
-
-  private void rollbackInvokedEntities(PipelineOperationStatus status) {
-    for (PipelineElementStatus s : status.getElementStatus()) {
-      if (s.isSuccess()) {
-        Optional<InvocableStreamPipesEntity> graphs = findGraphs(s.getElementId());
-        graphs.ifPresent(graph -> {
-          LOG.info("Rolling back element " + graph.getElementId());
-          makeHttpRequest(new InvocableEntityUrlGenerator(graph), graph, "detach");
-        });
-      }
-    }
-  }
+  /**
+   * Called when pipeline elements are migrated and new relays need to be invoked between adjacent pipeline elements
+   *
+   * @return PipelineOperationStatus
+   */
+  @Override
+  public PipelineOperationStatus invokeRelaysOnMigration() {
+    PipelineOperationStatus status = initPipelineOperationStatus();
 
-  private void invoke(EndpointUrlGenerator<?> urlGenerator,
-                      NamedStreamPipesEntity namedEntity, PipelineOperationStatus status) {
-    status.addPipelineElementStatus(makeHttpRequest(urlGenerator, namedEntity, "invoke"));
-  }
+    invokeRelays(status);
 
-  private void detach(EndpointUrlGenerator<?> urlGenerator,
-                      NamedStreamPipesEntity namedEntity, PipelineOperationStatus status) {
-    status.addPipelineElementStatus(makeHttpRequest(urlGenerator, namedEntity, "detach"));
+    return verifyPipelineOperationStatus(
+            status,
+            "Successfully started relays in pipeline " + pipelineName,
+            "Could not start relays in pipeline" + pipelineName,
+            true);
   }
 
-  // Helper methods
-
-  private PipelineOperationStatus initPipelineOperationStatus() {
-    PipelineOperationStatus status = new PipelineOperationStatus();
-    status.setPipelineId(pipelineId);
-    status.setPipelineName(pipelineName);
-    return status;
-  }
+  /**
+   * Called when pipeline elements are migrated and new relays need to be detached, e.g. from predecessors to old
+   * pipeline element
+   *
+   * @return PipelineOperationStatus
+   */
+  @Override
+  public PipelineOperationStatus detachRelaysOnMigration() {
+    PipelineOperationStatus status = initPipelineOperationStatus();
 
-  private PipelineElementStatus makeHttpRequest(EndpointUrlGenerator<?> urlGenerator,
-                                                NamedStreamPipesEntity namedEntity, String type) {
-    switch (type) {
-      case "invoke":
-        return new HttpRequestBuilder(namedEntity, urlGenerator.generateInvokeEndpoint()).invoke();
-      case "detach":
-        return new HttpRequestBuilder(namedEntity, urlGenerator.generateDetachEndpoint()).detach();
-      case "invokeRelay":
-        return new HttpRequestBuilder(namedEntity, urlGenerator.generateRelayEndpoint()).invoke();
-      case "detachRelay":
-        return new HttpRequestBuilder(namedEntity, urlGenerator.generateRelayEndpoint()).detach();
-      default:
-        throw new IllegalArgumentException("Type not known: " + type);
-    }
-  }
+    detachRelays(status);
 
-  private Optional<InvocableStreamPipesEntity> findGraphs(String elementId) {
-    return graphs.stream().filter(i -> i.getBelongsTo().equals(elementId)).findFirst();
+    return verifyPipelineOperationStatus(
+            status,
+            "Successfully stopped relays in pipeline " + pipelineName,
+            "Could not stop all relays in pipeline " + pipelineName,
+            false);
   }
 
-  private boolean allInvocableEntitiesRunning(PipelineOperationStatus status) {
-    return status.getElementStatus().stream().allMatch(PipelineElementStatus::isSuccess);
-  }
 }
diff --git a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/http/HttpRequestBuilder.java b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/http/HttpRequestBuilder.java
index 7097577..dc0fdd9 100644
--- a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/http/HttpRequestBuilder.java
+++ b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/http/HttpRequestBuilder.java
@@ -18,21 +18,20 @@
 
 package org.apache.streampipes.manager.execution.http;
 
-import com.fasterxml.jackson.core.JsonProcessingException;
-import com.google.gson.Gson;
+
 import com.google.gson.JsonSyntaxException;
 import org.apache.http.client.fluent.Request;
 import org.apache.http.client.fluent.Response;
 import org.apache.http.entity.ContentType;
-import org.apache.streampipes.commons.Utils;
-import org.apache.streampipes.model.SpDataStreamRelayContainer;
 import org.apache.streampipes.model.base.InvocableStreamPipesEntity;
 import org.apache.streampipes.model.base.NamedStreamPipesEntity;
+import org.apache.streampipes.model.eventrelay.SpDataStreamRelayContainer;
+import org.apache.streampipes.model.graph.DataProcessorInvocation;
 import org.apache.streampipes.model.pipeline.PipelineElementStatus;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.apache.streampipes.serializers.json.JacksonSerializer;
-import org.apache.streampipes.serializers.jsonld.JsonLdTransformer;
+
 
 import java.io.IOException;
 
diff --git a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/http/MigrationHelpers.java b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/http/MigrationHelpers.java
deleted file mode 100644
index 43aa454..0000000
--- a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/http/MigrationHelpers.java
+++ /dev/null
@@ -1,75 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-
-package org.apache.streampipes.manager.execution.http;
-
-import com.fasterxml.jackson.core.JsonProcessingException;
-import com.fasterxml.jackson.databind.ObjectMapper;
-import org.apache.streampipes.model.base.NamedStreamPipesEntity;
-import org.apache.streampipes.model.graph.DataProcessorInvocation;
-import org.apache.streampipes.model.pipeline.Pipeline;
-import org.apache.streampipes.model.pipeline.PipelineElementStatus;
-import org.apache.streampipes.model.pipeline.PipelineOperationStatus;
-import org.apache.streampipes.sdk.helpers.Tuple2;
-
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Optional;
-
-public class MigrationHelpers {
-
-    static public List<Tuple2<DataProcessorInvocation, DataProcessorInvocation>> getDelta(Pipeline pipelineX, Pipeline pipelineY){
-        List<Tuple2<DataProcessorInvocation, DataProcessorInvocation>> delta = new ArrayList<>();
-        pipelineX.getSepas().forEach(iX -> {
-            if (pipelineY.getSepas().stream().filter(iY -> iY.getElementId().equals(iX.getElementId()))
-                    .noneMatch(iY -> iY.getDeploymentTargetNodeId().equals(iX.getDeploymentTargetNodeId()))){
-                Optional<DataProcessorInvocation> invocationY = pipelineY.getSepas().stream().
-                        filter(iY -> iY.getDeploymentRunningInstanceId().equals(iX.getDeploymentRunningInstanceId())).findFirst();
-                invocationY.ifPresent(dataProcessorInvocation -> delta.add(new Tuple2<>(iX, dataProcessorInvocation)));
-            }
-        });
-        return delta;
-    }
-
-    static public PipelineOperationStatus verifyPipelineOperationStatus(PipelineOperationStatus status, String successMessage,
-                                                                         String errorMessage) {
-        //Duplicate from method in GraphSubmitter
-        status.setSuccess(status.getElementStatus().stream().allMatch(PipelineElementStatus::isSuccess));
-        if (status.isSuccess()) {
-            status.setTitle(successMessage);
-        } else {
-            status.setTitle(errorMessage);
-        }
-        return status;
-    }
-
-    public static void exchangePipelineElement(Pipeline exchangePipeline, Tuple2<? extends NamedStreamPipesEntity, ? extends NamedStreamPipesEntity> entity){
-
-        if (entity.a instanceof DataProcessorInvocation){
-            int index = exchangePipeline.getSepas().indexOf(entity.b);
-            exchangePipeline.getSepas().remove(index);
-            exchangePipeline.getSepas().add(index, (DataProcessorInvocation) entity.a);
-        }
-    }
-
-    public static Pipeline deepCopyPipeline(Pipeline object) throws JsonProcessingException {
-        ObjectMapper objectMapper = new ObjectMapper();
-        return objectMapper.readValue(objectMapper.writeValueAsString(object), Pipeline.class);
-    }
-
-}
diff --git a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/http/PipelineExecutor.java b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/http/PipelineExecutor.java
deleted file mode 100644
index 11ecbf4..0000000
--- a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/http/PipelineExecutor.java
+++ /dev/null
@@ -1,513 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-
-package org.apache.streampipes.manager.execution.http;
-
-import org.apache.streampipes.manager.data.PipelineGraph;
-import org.apache.streampipes.manager.data.PipelineGraphBuilder;
-import org.apache.streampipes.manager.data.PipelineGraphHelpers;
-import org.apache.streampipes.manager.matching.InvocationGraphBuilder;
-import org.apache.streampipes.model.SpDataStream;
-import org.apache.streampipes.model.SpDataStreamRelay;
-import org.apache.streampipes.model.SpDataStreamRelayContainer;
-import org.apache.streampipes.model.base.NamedStreamPipesEntity;
-import org.apache.streampipes.model.grounding.EventGrounding;
-import org.apache.streampipes.model.pipeline.PipelineElementStatus;
-import org.apache.streampipes.sdk.helpers.Tuple2;
-import org.apache.streampipes.manager.execution.status.PipelineStatusManager;
-import org.apache.streampipes.manager.util.TemporaryGraphStorage;
-import org.apache.streampipes.model.SpDataSet;
-import org.apache.streampipes.model.base.InvocableStreamPipesEntity;
-import org.apache.streampipes.model.graph.DataProcessorInvocation;
-import org.apache.streampipes.model.graph.DataSinkInvocation;
-import org.apache.streampipes.model.grounding.KafkaTransportProtocol;
-import org.apache.streampipes.model.message.PipelineStatusMessage;
-import org.apache.streampipes.model.message.PipelineStatusMessageType;
-import org.apache.streampipes.model.pipeline.Pipeline;
-import org.apache.streampipes.model.pipeline.PipelineOperationStatus;
-import org.apache.streampipes.model.staticproperty.SecretStaticProperty;
-import org.apache.streampipes.storage.api.IPipelineStorage;
-import org.apache.streampipes.storage.management.StorageDispatcher;
-import org.apache.streampipes.user.management.encryption.CredentialsManager;
-import org.lightcouch.DocumentConflictException;
-
-import java.security.GeneralSecurityException;
-import java.util.*;
-import java.util.concurrent.atomic.AtomicReference;
-import java.util.stream.Collectors;
-
-public class PipelineExecutor {
-
-  private Pipeline pipeline;
-  private boolean visualize;
-  private boolean storeStatus;
-  private boolean monitor;
-
-  public PipelineExecutor(Pipeline pipeline, boolean visualize, boolean storeStatus,
-                          boolean monitor) {
-    this.pipeline = pipeline;
-    this.visualize = visualize;
-    this.storeStatus = storeStatus;
-    this.monitor = monitor;
-  }
-
-  public PipelineOperationStatus startPipeline() {
-
-    pipeline.getSepas().forEach(this::updateGroupIds);
-    pipeline.getActions().forEach(this::updateGroupIds);
-
-    List<DataProcessorInvocation> sepas = pipeline.getSepas();
-    List<DataSinkInvocation> secs = pipeline.getActions();
-
-    List<SpDataSet> dataSets = pipeline.getStreams().stream().filter(s -> s instanceof SpDataSet).map(s -> new
-            SpDataSet((SpDataSet) s)).collect(Collectors.toList());
-
-    for (SpDataSet ds : dataSets) {
-      ds.setCorrespondingPipeline(pipeline.getPipelineId());
-    }
-
-    List<InvocableStreamPipesEntity> graphs = new ArrayList<>();
-    graphs.addAll(sepas);
-    graphs.addAll(secs);
-
-    List<InvocableStreamPipesEntity> decryptedGraphs = decryptSecrets(graphs);
-
-    graphs.forEach(g -> g.setStreamRequirements(Collections.emptyList()));
-
-    List<SpDataStreamRelayContainer> dataStreamRelayContainers = generateDataStreamRelays(decryptedGraphs);
-
-    PipelineOperationStatus status = new GraphSubmitter(
-            pipeline.getPipelineId(),
-            pipeline.getName(),
-            decryptedGraphs,
-            dataSets,
-            dataStreamRelayContainers).invokeGraphs();
-
-    if (status.isSuccess()) {
-      storeInvocationGraphs(pipeline.getPipelineId(), graphs, dataSets);
-
-      PipelineStatusManager.addPipelineStatus(
-              pipeline.getPipelineId(),
-              new PipelineStatusMessage(pipeline.getPipelineId(),
-                      System.currentTimeMillis(),
-                      PipelineStatusMessageType.PIPELINE_STARTED.title(),
-                      PipelineStatusMessageType.PIPELINE_STARTED.description()));
-
-      if (storeStatus) {
-        setPipelineStarted(pipeline);
-      }
-    }
-    return status;
-  }
-
-  public PipelineOperationStatus stopPipeline() {
-    List<InvocableStreamPipesEntity> graphs = TemporaryGraphStorage.graphStorage.get(pipeline.getPipelineId());
-    List<SpDataSet> dataSets = TemporaryGraphStorage.datasetStorage.get(pipeline.getPipelineId());
-
-    List<SpDataStreamRelayContainer> dataStreamRelayContainers = generateDataStreamRelays(graphs);
-
-    PipelineOperationStatus status = new GraphSubmitter(
-            pipeline.getPipelineId(),
-            pipeline.getName(),
-            graphs,
-            dataSets,
-            dataStreamRelayContainers).detachGraphs();
-
-    if (status.isSuccess()) {
-      if (visualize) {
-        StorageDispatcher
-                .INSTANCE
-                .getNoSqlStore()
-                .getVisualizationStorageApi()
-                .deleteVisualization(pipeline.getPipelineId());
-      }
-      if (storeStatus) {
-        setPipelineStopped(pipeline);
-      }
-
-      PipelineStatusManager.addPipelineStatus(pipeline.getPipelineId(),
-              new PipelineStatusMessage(pipeline.getPipelineId(),
-                      System.currentTimeMillis(),
-                      PipelineStatusMessageType.PIPELINE_STOPPED.title(),
-                      PipelineStatusMessageType.PIPELINE_STOPPED.description()));
-
-    }
-    return status;
-  }
-
-  public PipelineOperationStatus migratePipelineElement(Pipeline currentPipeline,
-                                                         Tuple2<DataProcessorInvocation, DataProcessorInvocation> targetElement){
-    PipelineOperationStatus status = initPipelineOperationStatus();
-    //Purge existing relays
-    pipeline.getSepas().forEach(s -> s.setOutputStreamRelays(new ArrayList<>()));
-    //Generate new relays
-    PipelineGraph newPipelineGraph = new PipelineGraphBuilder(pipeline).buildGraph();
-    new InvocationGraphBuilder(newPipelineGraph, pipeline.getPipelineId()).buildGraphs();
-    //Get predecessors
-    List<NamedStreamPipesEntity> predecessors = new ArrayList<>();
-    PipelineGraph currentPipelineGraph = new PipelineGraphBuilder(currentPipeline).buildGraph();
-    PipelineGraphHelpers
-            .findStreams(newPipelineGraph)
-            .forEach(source -> predecessors.addAll(getPredecessors(source, targetElement.a, newPipelineGraph, new ArrayList<>())));
-    //Find counterpart for predecessors in currentPipeline
-    List<Tuple2<NamedStreamPipesEntity, NamedStreamPipesEntity>> matchingPredecessors = new ArrayList<>();
-    for(NamedStreamPipesEntity e : predecessors){
-      matchingPredecessors.add(new Tuple2<>(e, findMatching(e, currentPipelineGraph)));
-    }
-    List<NamedStreamPipesEntity> currentPredecessors = matchingPredecessors.stream().map(t -> t.b).collect(Collectors.toList());
-
-
-    PipelineOperationStatus statusStartTarget = startPipelineElement(Collections.singletonList(targetElement.a));
-    statusStartTarget.getElementStatus().forEach(status::addPipelineElementStatus);
-    if(!statusStartTarget.isSuccess()){
-      //Target PE could not be started; nothing to roll back
-      return status;
-    }
-    //Stop relay to origin
-    PipelineOperationStatus statusStopRelays = stopRelays(currentPredecessors, targetElement.b);
-    statusStopRelays.getElementStatus().forEach(status::addPipelineElementStatus);
-    if(!statusStopRelays.isSuccess()){
-      //Rollback of target PE and all successfully stopped Relays
-      stopPipelineElements(Collections.singletonList(targetElement.a))
-              .getElementStatus().forEach(status::addPipelineElementStatus);
-
-      Set<String> relaysToRollBack = statusStopRelays.getElementStatus().stream().filter(PipelineElementStatus::isSuccess)
-              .map(PipelineElementStatus::getElementId).collect(Collectors.toSet());
-      Map<NamedStreamPipesEntity, SpDataStreamRelayContainer> rollbackRelays = findRelays(currentPredecessors, targetElement.b)
-              .entrySet()
-              .stream()
-              .filter(entry -> relaysToRollBack.contains(entry.getValue().getRunningStreamRelayInstanceId()))
-              .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
-      new GraphSubmitter(pipeline.getPipelineId(),
-              pipeline.getName(),  new ArrayList<>(), new ArrayList<>(), new ArrayList<>()).invokeRelays(rollbackRelays)
-              .getElementStatus().forEach(status::addPipelineElementStatus);
-      return status;
-    }
-    //start Relay to target
-    PipelineOperationStatus statusStartRelays = startRelays(predecessors, targetElement.a);
-    statusStartRelays.getElementStatus().forEach(status::addPipelineElementStatus);
-    if(!statusStartRelays.isSuccess()){
-      //Rollback target PE, stopped relays and all successfully started Relays
-      stopPipelineElements(Collections.singletonList(targetElement.a))
-              .getElementStatus().forEach(status::addPipelineElementStatus);
-
-      startRelays(currentPredecessors, targetElement.b)
-              .getElementStatus().forEach(status::addPipelineElementStatus);
-
-      Set<String> relaysToRollBack = statusStartRelays.getElementStatus().stream().filter(PipelineElementStatus::isSuccess)
-              .map(PipelineElementStatus::getElementId).collect(Collectors.toSet());
-      Map<NamedStreamPipesEntity, SpDataStreamRelayContainer> rollbackRelays = findRelays(predecessors, targetElement.a)
-              .entrySet()
-              .stream()
-              .filter(entry -> relaysToRollBack.contains(entry.getValue().getRunningStreamRelayInstanceId()))
-              .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
-      new GraphSubmitter(pipeline.getPipelineId(),
-              pipeline.getName(),  new ArrayList<>(), new ArrayList<>(), new ArrayList<>()).detachRelays(rollbackRelays)
-              .getElementStatus().forEach(status::addPipelineElementStatus);
-      return status;
-    }
-    //Stop origin and associated relay
-    PipelineOperationStatus statusStopOrigin = stopPipelineElements(Collections.singletonList(targetElement.b));
-    statusStopOrigin.getElementStatus().forEach(status::addPipelineElementStatus);
-
-    if(!statusStopOrigin.isSuccess()){
-      //Rollback everything
-      stopRelays(predecessors, targetElement.a)
-              .getElementStatus().forEach(status::addPipelineElementStatus);
-      startRelays(currentPredecessors, targetElement.b)
-              .getElementStatus().forEach(status::addPipelineElementStatus);
-      stopPipelineElements(Collections.singletonList(targetElement.a))
-              .getElementStatus().forEach(status::addPipelineElementStatus);
-      return status;
-    }
-
-    List<InvocableStreamPipesEntity> graphs = new ArrayList<>();
-    graphs.addAll(pipeline.getActions());
-    graphs.addAll(pipeline.getSepas());
-    List<SpDataSet> dataSets = pipeline.getStreams().stream().filter(s -> s instanceof SpDataSet).map(s -> new
-            SpDataSet((SpDataSet) s)).collect(Collectors.toList());
-    storeInvocationGraphs(pipeline.getPipelineId(), graphs, dataSets);
-
-    status.setSuccess(status.getElementStatus().stream().allMatch(PipelineElementStatus::isSuccess));
-    return status;
-  }
-
-  private PipelineOperationStatus stopPipelineElements(List<NamedStreamPipesEntity> graphs){
-    List<InvocableStreamPipesEntity> invocations = new ArrayList<>();
-    graphs.stream().filter(i -> i instanceof InvocableStreamPipesEntity).forEach(i -> invocations.add((InvocableStreamPipesEntity) i));
-
-    if (invocations.isEmpty()) {
-      PipelineOperationStatus ret = initPipelineOperationStatus();
-      ret.setSuccess(true);
-      return ret;
-    }
-    return new GraphSubmitter(pipeline.getPipelineId(),
-            pipeline.getName(),  invocations, new ArrayList<>(), new ArrayList<>())
-            .detachGraphs();
-  }
-
-  private PipelineOperationStatus startPipelineElement(List<NamedStreamPipesEntity> graphs){
-    List<InvocableStreamPipesEntity> invocations = new ArrayList<>();
-    graphs.stream().filter(i -> i instanceof InvocableStreamPipesEntity).forEach(i -> invocations.add((InvocableStreamPipesEntity) i));
-
-    if (invocations.isEmpty()) {
-      PipelineOperationStatus ret = initPipelineOperationStatus();
-      ret.setSuccess(true);
-      return ret;
-    }
-    List<InvocableStreamPipesEntity> decryptedGraphs = decryptSecrets(invocations);
-    return new GraphSubmitter(pipeline.getPipelineId(),
-            pipeline.getName(), decryptedGraphs, new ArrayList<>(), new ArrayList<>())
-            .invokeGraphs();
-  }
-
-  private PipelineOperationStatus stopRelays(List<NamedStreamPipesEntity> predecessors, InvocableStreamPipesEntity target){
-    Map<NamedStreamPipesEntity, SpDataStreamRelayContainer> relays = findRelays(predecessors, target);
-    return new GraphSubmitter(pipeline.getPipelineId(),
-            pipeline.getName(),  new ArrayList<>(), new ArrayList<>(), new ArrayList<>()).detachRelays(relays);
-  }
-
-  private PipelineOperationStatus startRelays(List<NamedStreamPipesEntity> predecessors, InvocableStreamPipesEntity target){
-    Map<NamedStreamPipesEntity, SpDataStreamRelayContainer> relays = findRelays(predecessors, target);
-    return new GraphSubmitter(pipeline.getPipelineId(),
-            pipeline.getName(),  new ArrayList<>(), new ArrayList<>(), new ArrayList<>()).invokeRelays(relays);
-  }
-
-  private Map<NamedStreamPipesEntity, SpDataStreamRelayContainer> findRelays(List<NamedStreamPipesEntity> predecessors,
-                                                                             InvocableStreamPipesEntity target){
-
-    Map<NamedStreamPipesEntity, SpDataStreamRelayContainer> relays = new HashMap<>();
-
-    predecessors.forEach(pred -> {
-      List<SpDataStreamRelay> dataStreamRelays = new ArrayList<>();
-      SpDataStreamRelayContainer dsRelayContainer = new SpDataStreamRelayContainer();
-
-      if (pred instanceof DataProcessorInvocation){
-        //Data Processor
-        DataProcessorInvocation processorInvocation = (DataProcessorInvocation) pred;
-        dataStreamRelays.addAll(processorInvocation.getOutputStreamRelays().stream().
-                filter(r ->
-                        target.getInputStreams().stream().map(s ->
-                                s.getEventGrounding().getTransportProtocol().getTopicDefinition().getActualTopicName())
-                                .collect(Collectors.toSet()).contains(r.getEventGrounding().getTransportProtocol()
-                                .getTopicDefinition().getActualTopicName()))
-                .collect(Collectors.toList()));
-        dsRelayContainer.setRunningStreamRelayInstanceId(pipeline.getPipelineId());
-        dsRelayContainer.setEventRelayStrategy(pipeline.getEventRelayStrategy());
-        dsRelayContainer.setName(processorInvocation.getName() + " (Stream Relay)");
-        dsRelayContainer.setInputGrounding(new EventGrounding(processorInvocation.getOutputStream().getEventGrounding()));
-        dsRelayContainer.setDeploymentTargetNodeId(processorInvocation.getDeploymentTargetNodeId());
-        dsRelayContainer.setDeploymentTargetNodeHostname(processorInvocation.getDeploymentTargetNodeHostname());
-        dsRelayContainer.setDeploymentTargetNodePort(processorInvocation.getDeploymentTargetNodePort());
-        dsRelayContainer.setOutputStreamRelays(dataStreamRelays);
-        relays.put(pred, dsRelayContainer);
-      } else if (pred instanceof SpDataStream){
-        //DataStream
-        SpDataStream dataStream = (SpDataStream) pred;
-        if (!matchingDeploymentTargets(dataStream, target)){
-          //There is a relay that needs to be removed
-          dataStreamRelays.add(new SpDataStreamRelay(new EventGrounding(target.getInputStreams()
-                  .get(getIndex(pred.getDOM(), target))
-                  .getEventGrounding())));
-
-          dsRelayContainer.setRunningStreamRelayInstanceId(pipeline.getPipelineId());
-          dsRelayContainer.setEventRelayStrategy(pipeline.getEventRelayStrategy());
-          dsRelayContainer.setName(dataStream.getName() + " (Stream Relay)");
-          dsRelayContainer.setInputGrounding(new EventGrounding(dataStream.getEventGrounding()));
-          dsRelayContainer.setDeploymentTargetNodeId(dataStream.getDeploymentTargetNodeId());
-          dsRelayContainer.setDeploymentTargetNodeHostname(dataStream.getDeploymentTargetNodeHostname());
-          dsRelayContainer.setDeploymentTargetNodePort(dataStream.getDeploymentTargetNodePort());
-          dsRelayContainer.setOutputStreamRelays(dataStreamRelays);
-          relays.put(pred, dsRelayContainer);
-        }
-      }
-    });
-    return relays;
-  }
-
-
-  private List<NamedStreamPipesEntity> getPredecessors(NamedStreamPipesEntity source,
-                                                           InvocableStreamPipesEntity target, PipelineGraph pipelineGraph,
-                                                           List<NamedStreamPipesEntity> foundPredecessors){
-    //TODO: Check if this works for all graph topologies
-    if (pipelineGraph.outgoingEdgesOf(source)
-            .stream()
-            .map(pipelineGraph::getEdgeTarget)
-            .map(g -> (InvocableStreamPipesEntity) g)
-            .collect(Collectors.toSet()).contains(target)){
-      foundPredecessors.add(source);
-    } else{
-      List<NamedStreamPipesEntity> successors = pipelineGraph.outgoingEdgesOf(source)
-              .stream()
-              .map(pipelineGraph::getEdgeTarget)
-              .map(g -> (InvocableStreamPipesEntity) g)
-              .collect(Collectors.toList());
-      if (successors.isEmpty()) return foundPredecessors;
-      successors.forEach(successor -> getPredecessors(successor, target, pipelineGraph, foundPredecessors));
-    }
-    return foundPredecessors;
-  }
-
-  private NamedStreamPipesEntity findMatching(NamedStreamPipesEntity entity, PipelineGraph pipelineGraph){
-    AtomicReference<NamedStreamPipesEntity> match = new AtomicReference<>();
-    PipelineGraphHelpers.findStreams(pipelineGraph).forEach(ds -> {
-    NamedStreamPipesEntity foundEntity = compareGraphs(ds, entity, pipelineGraph, new ArrayList<>());
-    if (foundEntity != null) match.set(foundEntity);
-    });
-    return match.get();
-  }
-
-  private NamedStreamPipesEntity compareGraphs(NamedStreamPipesEntity source, NamedStreamPipesEntity searchedEntity, PipelineGraph pipelineGraph, List<NamedStreamPipesEntity> successors){
-    if(source.getDOM().equals(searchedEntity.getDOM())) return source;
-    else if (successors.isEmpty())
-      successors = pipelineGraph.outgoingEdgesOf(source)
-            .stream()
-            .map(pipelineGraph::getEdgeTarget)
-            .map(g -> (InvocableStreamPipesEntity) g)
-            .collect(Collectors.toList());
-    Optional<NamedStreamPipesEntity> successor= successors.stream().findFirst();
-    if (successor.isPresent()) {
-      successors.remove(successor.get());
-      return compareGraphs(successor.get(), searchedEntity, pipelineGraph, successors);
-    }
-    return null;
-  }
-
-  private PipelineOperationStatus initPipelineOperationStatus() {
-    //Duplicate from method in GraphSubmitter
-    PipelineOperationStatus status = new PipelineOperationStatus();
-    status.setPipelineId(pipeline.getPipelineId());
-    status.setPipelineName(pipeline.getName());
-    return status;
-  }
-
-
-  private String extractTopic(EventGrounding eg) {
-    return eg.getTransportProtocol().getTopicDefinition().getActualTopicName();
-  }
-
-  private List<SpDataStreamRelayContainer> generateDataStreamRelays(List<InvocableStreamPipesEntity> graphs) {
-    Set<String> topicSet = new HashSet<>();
-    List<SpDataStreamRelayContainer> dsRelayContainers = new ArrayList<>();
-    SpDataStreamRelayContainer dsRelayContainer = new SpDataStreamRelayContainer();
-    List<SpDataStreamRelay> dataStreamRelays = new ArrayList<>();
-
-    graphs.stream()
-            .filter(g -> pipeline.getStreams().stream()
-                    .filter(ds -> topicSet.add(extractTopic(ds.getEventGrounding())))
-                    .anyMatch(ds -> (!matchingDeploymentTargets(ds, g) && connected(ds, g))))
-            .forEach(g -> pipeline.getStreams()
-                    .forEach(ds -> {
-                      dsRelayContainer.setRunningStreamRelayInstanceId(pipeline.getPipelineId());
-                      dsRelayContainer.setEventRelayStrategy(pipeline.getEventRelayStrategy());
-                      dsRelayContainer.setName(ds.getName() + " (Stream Relay)");
-                      dsRelayContainer.setInputGrounding(new EventGrounding(ds.getEventGrounding()));
-                      dsRelayContainer.setDeploymentTargetNodeId(ds.getDeploymentTargetNodeId());
-                      dsRelayContainer.setDeploymentTargetNodeHostname(ds.getDeploymentTargetNodeHostname());
-                      dsRelayContainer.setDeploymentTargetNodePort(ds.getDeploymentTargetNodePort());
-
-                      dataStreamRelays.add(new SpDataStreamRelay(new EventGrounding(g.getInputStreams()
-                              .get(getIndex(ds.getDOM(), g))
-                              .getEventGrounding())));
-                    })
-            );
-
-    dsRelayContainer.setOutputStreamRelays(dataStreamRelays);
-    dsRelayContainers.add(dsRelayContainer);
-
-    return dsRelayContainers;
-  }
-
-  private boolean matchingDeploymentTargets(SpDataStream source, InvocableStreamPipesEntity target) {
-    return source.getDeploymentTargetNodeId().equals(target.getDeploymentTargetNodeId());
-  }
-
-  private boolean connected(SpDataStream source, InvocableStreamPipesEntity target) {
-    int index = getIndex(source.getDOM(), target);
-    if (index != -1) {
-      return target.getConnectedTo().get(index).equals(source.getDOM());
-    }
-    return false;
-  }
-
-  private void updateGroupIds(InvocableStreamPipesEntity entity) {
-    entity.getInputStreams()
-            .stream()
-            .filter(is -> is.getEventGrounding().getTransportProtocol() instanceof KafkaTransportProtocol)
-            .map(is -> is.getEventGrounding().getTransportProtocol())
-            .map(KafkaTransportProtocol.class::cast)
-            .forEach(tp -> tp.setGroupId(UUID.randomUUID().toString()));
-  }
-
-  private List<InvocableStreamPipesEntity> decryptSecrets(List<InvocableStreamPipesEntity> graphs) {
-    List<InvocableStreamPipesEntity> decryptedGraphs = new ArrayList<>();
-    graphs.stream().map(g -> {
-      if (g instanceof DataProcessorInvocation) {
-        return new DataProcessorInvocation((DataProcessorInvocation) g);
-      } else {
-        return new DataSinkInvocation((DataSinkInvocation) g);
-      }
-    }).forEach(g -> {
-      g.getStaticProperties()
-              .stream()
-              .filter(SecretStaticProperty.class::isInstance)
-              .forEach(sp -> {
-                try {
-                  String decrypted = CredentialsManager.decrypt(pipeline.getCreatedByUser(),
-                          ((SecretStaticProperty) sp).getValue());
-                  ((SecretStaticProperty) sp).setValue(decrypted);
-                  ((SecretStaticProperty) sp).setEncrypted(false);
-                } catch (GeneralSecurityException e) {
-                  e.printStackTrace();
-                }
-              });
-      decryptedGraphs.add(g);
-    });
-    return decryptedGraphs;
-  }
-
-  private void setPipelineStarted(Pipeline pipeline) {
-    pipeline.setRunning(true);
-    pipeline.setStartedAt(new Date().getTime());
-    try {
-      getPipelineStorageApi().updatePipeline(pipeline);
-    } catch (DocumentConflictException dce) {
-      //dce.printStackTrace();
-    }
-  }
-
-  private void setPipelineStopped(Pipeline pipeline) {
-    pipeline.setRunning(false);
-    getPipelineStorageApi().updatePipeline(pipeline);
-  }
-
-  private void storeInvocationGraphs(String pipelineId, List<InvocableStreamPipesEntity> graphs,
-                                     List<SpDataSet> dataSets) {
-    TemporaryGraphStorage.graphStorage.put(pipelineId, graphs);
-    TemporaryGraphStorage.datasetStorage.put(pipelineId, dataSets);
-  }
-
-  private IPipelineStorage getPipelineStorageApi() {
-    return StorageDispatcher.INSTANCE.getNoSqlStore().getPipelineStorageAPI();
-  }
-
-  private Integer getIndex(String sourceDomId, InvocableStreamPipesEntity targetElement) {
-    return targetElement.getConnectedTo().indexOf(sourceDomId);
-  }
-
-}
diff --git a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/http/StreamRelayEndpointUrlGenerator.java b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/http/StreamRelayEndpointUrlGenerator.java
index e62ff41..5ee8b51 100644
--- a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/http/StreamRelayEndpointUrlGenerator.java
+++ b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/http/StreamRelayEndpointUrlGenerator.java
@@ -16,7 +16,7 @@
  *
  */
 package org.apache.streampipes.manager.execution.http;
-import org.apache.streampipes.model.SpDataStreamRelayContainer;
+import org.apache.streampipes.model.eventrelay.SpDataStreamRelayContainer;
 
 public class StreamRelayEndpointUrlGenerator extends EndpointUrlGenerator<SpDataStreamRelayContainer> {
 
diff --git a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/pipeline/AbstractPipelineExecutor.java b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/pipeline/AbstractPipelineExecutor.java
new file mode 100644
index 0000000..af0aced
--- /dev/null
+++ b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/pipeline/AbstractPipelineExecutor.java
@@ -0,0 +1,458 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.apache.streampipes.manager.execution.pipeline;
+
+import org.apache.streampipes.commons.exceptions.SpRuntimeException;
+import org.apache.streampipes.manager.data.PipelineGraph;
+import org.apache.streampipes.manager.data.PipelineGraphHelpers;
+import org.apache.streampipes.manager.execution.http.GraphSubmitter;
+import org.apache.streampipes.manager.node.NodeClusterManager;
+import org.apache.streampipes.manager.util.TemporaryGraphStorage;
+import org.apache.streampipes.model.SpDataSet;
+import org.apache.streampipes.model.SpDataStream;
+import org.apache.streampipes.model.eventrelay.SpDataStreamRelay;
+import org.apache.streampipes.model.eventrelay.SpDataStreamRelayContainer;
+import org.apache.streampipes.model.base.InvocableStreamPipesEntity;
+import org.apache.streampipes.model.base.NamedStreamPipesEntity;
+import org.apache.streampipes.model.graph.DataProcessorInvocation;
+import org.apache.streampipes.model.graph.DataSinkInvocation;
+import org.apache.streampipes.model.grounding.EventGrounding;
+import org.apache.streampipes.model.grounding.KafkaTransportProtocol;
+import org.apache.streampipes.model.pipeline.Pipeline;
+import org.apache.streampipes.model.pipeline.PipelineOperationStatus;
+import org.apache.streampipes.model.staticproperty.SecretStaticProperty;
+import org.apache.streampipes.storage.api.IPipelineStorage;
+import org.apache.streampipes.storage.management.StorageDispatcher;
+import org.apache.streampipes.user.management.encryption.CredentialsManager;
+
+import java.security.GeneralSecurityException;
+import java.util.*;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.stream.Collectors;
+
+public abstract class AbstractPipelineExecutor {
+
+    protected Pipeline pipeline;
+    protected boolean visualize;
+    protected boolean storeStatus;
+    protected boolean monitor;
+
+    public AbstractPipelineExecutor(Pipeline pipeline, boolean visualize, boolean storeStatus, boolean monitor) {
+        this.pipeline = pipeline;
+        this.visualize = visualize;
+        this.storeStatus = storeStatus;
+        this.monitor = monitor;
+    }
+
+    // standard methods
+    protected void setPipelineStarted(Pipeline pipeline) {
+        pipeline.setRunning(true);
+        pipeline.setStartedAt(new Date().getTime());
+        getPipelineStorageApi().updatePipeline(pipeline);
+    }
+
+    protected void setPipelineStopped(Pipeline pipeline) {
+        pipeline.setRunning(false);
+        getPipelineStorageApi().updatePipeline(pipeline);
+    }
+
+    protected void deleteVisualization(String pipelineId) {
+        StorageDispatcher.INSTANCE
+                .getNoSqlStore()
+                .getVisualizationStorageApi()
+                .deleteVisualization(pipelineId);
+    }
+
+    protected void storeInvocationGraphs(String pipelineId, List<InvocableStreamPipesEntity> graphs,
+                                       List<SpDataSet> dataSets) {
+        TemporaryGraphStorage.graphStorage.put(pipelineId, graphs);
+        TemporaryGraphStorage.datasetStorage.put(pipelineId, dataSets);
+    }
+
+    protected void storeDataStreamRelayContainer(List<SpDataStreamRelayContainer> relays) {
+        relays.forEach(NodeClusterManager::persistDataStreamRelay);
+    }
+
+    protected void deleteDataStreamRelayContainer(List<SpDataStreamRelayContainer> relays) {
+        relays.forEach(NodeClusterManager::deleteDataStreamRelay);
+    }
+
+    protected void updateDataStreamRelayContainer(List<SpDataStreamRelayContainer> relays) {
+        relays.forEach(NodeClusterManager::updateDataStreamRelay);
+    }
+
+
+    protected PipelineOperationStatus startPipelineElementsAndRelays(List<InvocableStreamPipesEntity> graphs,
+                                                                     List<SpDataStreamRelayContainer> relays){
+        if (graphs.isEmpty()) {
+            return initPipelineOperationStatus();
+        }
+        return new GraphSubmitter(pipeline.getPipelineId(), pipeline.getName(),
+                graphs, new ArrayList<>(), relays).invokePipelineElementsAndRelays();
+    }
+
+    protected PipelineOperationStatus stopPipelineElementsAndRelays(List<InvocableStreamPipesEntity> graphs,
+                                                                    List<SpDataStreamRelayContainer> relays){
+        if (graphs.isEmpty()) {
+            return initPipelineOperationStatus();
+        }
+        return new GraphSubmitter(pipeline.getPipelineId(), pipeline.getName(),
+                graphs, new ArrayList<>(),relays).detachPipelineElementsAndRelays();
+    }
+
+    protected PipelineOperationStatus startRelays(List<SpDataStreamRelayContainer> relays){
+        return new GraphSubmitter(pipeline.getPipelineId(), pipeline.getName(), new ArrayList<>(), new ArrayList<>(),
+                relays).invokeRelaysOnMigration();
+    }
+
+    protected PipelineOperationStatus stopRelays(List<SpDataStreamRelayContainer> relays){
+        return new GraphSubmitter(pipeline.getPipelineId(), pipeline.getName(),new ArrayList<>(), new ArrayList<>(),
+                relays).detachRelaysOnMigration();
+    }
+
+    protected List<SpDataStreamRelayContainer> findRelays(List<NamedStreamPipesEntity> predecessors,
+                                                          InvocableStreamPipesEntity target){
+
+        List<SpDataStreamRelayContainer> relays = new ArrayList<>();
+
+        predecessors.forEach(pred -> {
+            List<SpDataStreamRelay> dataStreamRelays = new ArrayList<>();
+            SpDataStreamRelayContainer relayContainer = new SpDataStreamRelayContainer();
+
+            if (pred instanceof DataProcessorInvocation){
+                //Data Processor
+                DataProcessorInvocation graph = (DataProcessorInvocation) pred;
+                if (differentDeploymentTargets(pred, target)) {
+                    dataStreamRelays.addAll(findRelaysWithMatchingTopic(graph, target));
+
+                    //dsRelayContainer.setRunningStreamRelayInstanceId(pipeline.getPipelineId());
+                    relayContainer = new SpDataStreamRelayContainer(graph);
+                    relayContainer.setOutputStreamRelays(dataStreamRelays);
+
+                    relays.add(relayContainer);
+                }
+            } else if (pred instanceof SpDataStream){
+                //DataStream
+                SpDataStream stream = (SpDataStream) pred;
+                if (differentDeploymentTargets(stream, target)){
+                    //There is a relay that needs to be removed
+                    dataStreamRelays.add(new SpDataStreamRelay(new EventGrounding(target.getInputStreams()
+                            .get(getIndex(pred.getDOM(), target))
+                            .getEventGrounding())));
+
+                    String id = extractUniqueAdpaterId(stream.getElementId());
+                    String relayStrategy = pipeline.getEventRelayStrategy();
+
+                    relays.add(new SpDataStreamRelayContainer(id, relayStrategy, stream, dataStreamRelays));
+                }
+            }
+        });
+        return relays;
+    }
+
+    protected List<NamedStreamPipesEntity> getPredecessors(NamedStreamPipesEntity source,
+                                                           InvocableStreamPipesEntity target,
+                                                           PipelineGraph pipelineGraph,
+                                                           List<NamedStreamPipesEntity> foundPredecessors){
+
+        Set<InvocableStreamPipesEntity> targets = getTargetsAsSet(source, pipelineGraph,
+                InvocableStreamPipesEntity.class);
+
+        //TODO: Check if this works for all graph topologies
+        if (targets.contains(target)){
+            foundPredecessors.add(source);
+        } else {
+            List<NamedStreamPipesEntity> successors = getTargetsAsList(source, pipelineGraph,
+                    NamedStreamPipesEntity.class);
+
+            if (successors.isEmpty()) return foundPredecessors;
+            successors.forEach(successor -> getPredecessors(successor, target, pipelineGraph, foundPredecessors));
+        }
+        return foundPredecessors;
+    }
+
+    protected NamedStreamPipesEntity findMatching(NamedStreamPipesEntity entity, PipelineGraph pipelineGraph){
+        AtomicReference<NamedStreamPipesEntity> match = new AtomicReference<>();
+        List<SpDataStream> dataStreams = PipelineGraphHelpers.findStreams(pipelineGraph);
+
+        for (SpDataStream stream : dataStreams) {
+            NamedStreamPipesEntity foundEntity = compareGraphs(stream, entity, pipelineGraph, new ArrayList<>());
+            if (foundEntity != null) {
+                match.set(foundEntity);
+            }
+        }
+        return match.get();
+    }
+
+    private NamedStreamPipesEntity compareGraphs(NamedStreamPipesEntity source,
+                                                   NamedStreamPipesEntity searchedEntity,
+                                                   PipelineGraph pipelineGraph,
+                                                   List<NamedStreamPipesEntity> successors){
+        if(matchingDOM(source, searchedEntity)) {
+            return source;
+        } else if (successors.isEmpty()) {
+            successors = getTargetsAsList(source, pipelineGraph, NamedStreamPipesEntity.class);
+            Optional<NamedStreamPipesEntity> successor = successors.stream().findFirst();
+            if (successor.isPresent()) {
+                successors.remove(successor.get());
+                return compareGraphs(successor.get(), searchedEntity, pipelineGraph, successors);
+            }
+        }
+        return null;
+    }
+
+    protected List<SpDataStreamRelayContainer> generateRelays(List<InvocableStreamPipesEntity> graphs) {
+        return generateDataStreamRelays(graphs).stream()
+                .filter(r -> r.getOutputStreamRelays().size() > 0)
+                .collect(Collectors.toList());
+    }
+
+    private List<SpDataStreamRelayContainer> generateDataStreamRelays(List<InvocableStreamPipesEntity> graphs) {
+        List<SpDataStreamRelayContainer> relays = new ArrayList<>();
+
+        for (InvocableStreamPipesEntity graph : graphs) {
+            for (SpDataStream stream: pipeline.getStreams()) {
+                if (differentDeploymentTargets(stream, graph) && connected(stream, graph)) {
+
+                    List<SpDataStreamRelay> dataStreamRelays = new ArrayList<>();
+                    dataStreamRelays.add(new SpDataStreamRelay(new EventGrounding(graph.getInputStreams()
+                            .get(getIndex(stream.getDOM(), graph))
+                            .getEventGrounding())));
+
+                    String id = extractUniqueAdpaterId(stream.getElementId());
+                    String relayStrategy = pipeline.getEventRelayStrategy();
+
+                    relays.add(new SpDataStreamRelayContainer(id, relayStrategy, stream, dataStreamRelays));
+                }
+            }
+            for (DataProcessorInvocation processor : pipeline.getSepas()) {
+                if (differentDeploymentTargets(processor, graph) && connected(processor, graph)) {
+                    SpDataStreamRelayContainer processorRelay = new SpDataStreamRelayContainer(processor);
+                    relays.add(processorRelay);
+                }
+            }
+        }
+        return relays;
+    }
+
+
+    // Helpers
+
+    /**
+     * Updates group.id for data processor/sink. Note: KafkaTransportProtocol only!!
+     *
+     * @param entity    data processor/sink
+     */
+    protected void updateKafkaGroupIds(InvocableStreamPipesEntity entity) {
+        entity.getInputStreams()
+                .stream()
+                .filter(is -> is.getEventGrounding().getTransportProtocol() instanceof KafkaTransportProtocol)
+                .map(is -> is.getEventGrounding().getTransportProtocol())
+                .map(KafkaTransportProtocol.class::cast)
+                .forEach(tp -> tp.setGroupId(UUID.randomUUID().toString()));
+    }
+
+    /**
+     * Decrypt potential secrets contained in static properties, e.g., passwords
+     *
+     * @param graphs    List of graphs
+     * @return  List of decrypted graphs
+     */
+    protected List<InvocableStreamPipesEntity> decryptSecrets(List<InvocableStreamPipesEntity> graphs) {
+        List<InvocableStreamPipesEntity> decryptedGraphs = new ArrayList<>();
+        graphs.stream().map(g -> {
+            if (g instanceof DataProcessorInvocation) {
+                return new DataProcessorInvocation((DataProcessorInvocation) g);
+            } else {
+                return new DataSinkInvocation((DataSinkInvocation) g);
+            }
+        }).forEach(g -> {
+            g.getStaticProperties()
+                    .stream()
+                    .filter(SecretStaticProperty.class::isInstance)
+                    .forEach(sp -> {
+                        try {
+                            String decrypted = CredentialsManager.decrypt(pipeline.getCreatedByUser(),
+                                    ((SecretStaticProperty) sp).getValue());
+                            ((SecretStaticProperty) sp).setValue(decrypted);
+                            ((SecretStaticProperty) sp).setEncrypted(false);
+                        } catch (GeneralSecurityException e) {
+                            e.printStackTrace();
+                        }
+                    });
+            decryptedGraphs.add(g);
+        });
+        return decryptedGraphs;
+    }
+
+    /**
+     * Get pipeline storage dispatcher API
+     *
+     * @return IPipelineStorage NoSQL storage interface for pipelines
+     */
+    private IPipelineStorage getPipelineStorageApi() {
+        return StorageDispatcher.INSTANCE.getNoSqlStore().getPipelineStorageAPI();
+    }
+
+    /**
+     * Extract topic name
+     *
+     * @param entity
+     * @return
+     */
+    private String extractActualTopic(NamedStreamPipesEntity entity) {
+        if (entity instanceof SpDataStream) {
+            return ((SpDataStream) entity)
+                    .getEventGrounding().getTransportProtocol().getTopicDefinition().getActualTopicName();
+        } else if (entity instanceof SpDataStreamRelay) {
+            return ((SpDataStreamRelay) entity)
+                    .getEventGrounding().getTransportProtocol().getTopicDefinition().getActualTopicName();
+        }
+        throw new SpRuntimeException("Could not extract actual topic name from entity");
+    }
+
+    // Edge / Migration Helpers
+
+    /**
+     * Compare deployment targets of two pipeline elements, namely data stream/processor (source) and data
+     * processor/sink (target)
+     *
+     * @param source    data stream/processor
+     * @param target    data processor/sink
+     * @return boolean value that returns true if source and target share the same deployment target, else false
+     */
+    private boolean differentDeploymentTargets(NamedStreamPipesEntity source, InvocableStreamPipesEntity target) {
+        if (source instanceof SpDataStream) {
+            return !((SpDataStream) source).getDeploymentTargetNodeId().equals(target.getDeploymentTargetNodeId());
+        } else if (source instanceof DataProcessorInvocation) {
+            return !((DataProcessorInvocation) source).getDeploymentTargetNodeId().equals(target.getDeploymentTargetNodeId());
+        }
+        throw new SpRuntimeException("Matching deployment targets check failed");
+    }
+
+    /**
+     * Find relays with matching topics
+     *
+     * @param graph     data processor
+     * @param target    data processor/sink
+     * @return collection of data stream relays
+     */
+    private Collection<? extends SpDataStreamRelay> findRelaysWithMatchingTopic(DataProcessorInvocation graph,
+                                                                                InvocableStreamPipesEntity target) {
+        return graph.getOutputStreamRelays().stream().
+                filter(relay ->
+                        target.getInputStreams().stream()
+                                .map(this::extractActualTopic)
+                                .collect(Collectors.toSet())
+                                .contains(extractActualTopic(relay)))
+                .collect(Collectors.toList());
+    }
+
+
+    private <T> Set<T> getTargetsAsSet(NamedStreamPipesEntity source, PipelineGraph pipelineGraph,
+                                       Class<T> clazz){
+        return pipelineGraph.outgoingEdgesOf(source)
+                .stream()
+                .map(pipelineGraph::getEdgeTarget)
+                .map(clazz::cast)
+                .collect(Collectors.toSet());
+    }
+
+    private <T> List<T> getTargetsAsList(NamedStreamPipesEntity source, PipelineGraph pipelineGraph,
+                                         Class<T> clazz){
+        return new ArrayList<>(getTargetsAsSet(source, pipelineGraph, clazz));
+    }
+
+    /**
+     * Compare connection of two pipeline elements, namely data stream/processor (source) and data processor/sink
+     * (target) by DOM identifier.
+     *
+     * @param source    data stream or data processor
+     * @param target    data processor/sink
+     * @return boolean value that returns true if source and target are connected, else false
+     */
+    private boolean connected(NamedStreamPipesEntity source, InvocableStreamPipesEntity target) {
+        int index = getIndex(source.getDOM(), target);
+        if (index != -1) {
+            return target.getConnectedTo().get(index).equals(source.getDOM());
+        }
+        return false;
+    }
+
+    /**
+     * Get index of data processor/sink connection based on source DOM identifier
+     *
+     * @param sourceDomId   source DOM identifier
+     * @param target        data processor/sink
+     * @return Integer with index of connection, if invalid returns -1.
+     */
+    private Integer getIndex(String sourceDomId, InvocableStreamPipesEntity target) {
+        return target.getConnectedTo().indexOf(sourceDomId);
+    }
+
+    /**
+     * Checks if DOM are equal
+     *
+     * @param source pipeline element
+     * @param target pipeline element
+     * @return true if DOM is the same, else false
+     */
+    private boolean matchingDOM(NamedStreamPipesEntity source, NamedStreamPipesEntity target) {
+        return source.getDOM().equals(target.getDOM());
+    }
+
+    /**
+     * Get List of InvocableStreamPipes entities, i.e., data processors/sinks from list of NameStreamPipesEntity
+     *
+     * @param graphs    List<NamedStreamPipesEntity> graphs
+     * @return
+     */
+    private List<InvocableStreamPipesEntity> getListOfInvocableStreamPipesEntity(List<NamedStreamPipesEntity> graphs) {
+        List<InvocableStreamPipesEntity> invocableEntities = new ArrayList<>();
+        graphs.stream()
+                .filter(i -> i instanceof InvocableStreamPipesEntity)
+                .forEach(i -> invocableEntities.add((InvocableStreamPipesEntity) i));
+        return invocableEntities;
+    }
+
+    /**
+     * Create pipeline operation status with pipeline id and name and set success to true
+     *
+     * @return PipelineOperationStatus
+     */
+    protected PipelineOperationStatus initPipelineOperationStatus() {
+        PipelineOperationStatus status = new PipelineOperationStatus();
+        status.setPipelineId(pipeline.getPipelineId());
+        status.setPipelineName(pipeline.getName());
+        status.setSuccess(true);
+        return status;
+    }
+
+    private <T> List<T> filter(List<InvocableStreamPipesEntity> graphs, Class<T> clazz) {
+        return graphs
+                .stream()
+                .filter(clazz::isInstance)
+                .map(clazz::cast)
+                .collect(Collectors.toList());
+    }
+
+    private String extractUniqueAdpaterId(String s) {
+        return s.substring(s.lastIndexOf("/") + 1);
+    }
+
+}
diff --git a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/pipeline/PipelineExecutor.java b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/pipeline/PipelineExecutor.java
new file mode 100644
index 0000000..17aa1ce
--- /dev/null
+++ b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/pipeline/PipelineExecutor.java
@@ -0,0 +1,110 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.streampipes.manager.execution.pipeline;
+
+import org.apache.streampipes.manager.execution.http.GraphSubmitter;
+import org.apache.streampipes.model.eventrelay.SpDataStreamRelayContainer;
+import org.apache.streampipes.manager.execution.status.PipelineStatusManager;
+import org.apache.streampipes.manager.util.TemporaryGraphStorage;
+import org.apache.streampipes.model.SpDataSet;
+import org.apache.streampipes.model.base.InvocableStreamPipesEntity;
+import org.apache.streampipes.model.graph.DataProcessorInvocation;
+import org.apache.streampipes.model.graph.DataSinkInvocation;
+import org.apache.streampipes.model.message.PipelineStatusMessage;
+import org.apache.streampipes.model.message.PipelineStatusMessageType;
+import org.apache.streampipes.model.pipeline.Pipeline;
+import org.apache.streampipes.model.pipeline.PipelineOperationStatus;
+
+import java.util.*;
+import java.util.stream.Collectors;
+
+public class PipelineExecutor extends AbstractPipelineExecutor {
+
+    public PipelineExecutor(Pipeline pipeline, boolean visualize, boolean storeStatus, boolean monitor) {
+        super(pipeline, visualize, storeStatus, monitor);
+    }
+
+    public PipelineOperationStatus startPipeline() {
+
+        pipeline.getSepas().forEach(this::updateKafkaGroupIds);
+        pipeline.getActions().forEach(this::updateKafkaGroupIds);
+
+        List<DataProcessorInvocation> sepas = pipeline.getSepas();
+        List<DataSinkInvocation> secs = pipeline.getActions();
+
+        List<SpDataSet> dataSets = pipeline.getStreams().stream().filter(s -> s instanceof SpDataSet).map(s -> new
+                SpDataSet((SpDataSet) s)).collect(Collectors.toList());
+
+        for (SpDataSet ds : dataSets) {
+          ds.setCorrespondingPipeline(pipeline.getPipelineId());
+        }
+
+        List<InvocableStreamPipesEntity> graphs = new ArrayList<>();
+        graphs.addAll(sepas);
+        graphs.addAll(secs);
+
+        List<InvocableStreamPipesEntity> decryptedGraphs = decryptSecrets(graphs);
+
+        graphs.forEach(g -> g.setStreamRequirements(Collections.emptyList()));
+
+        List<SpDataStreamRelayContainer> relays = generateRelays(decryptedGraphs);
+
+        PipelineOperationStatus status = new GraphSubmitter(pipeline.getPipelineId(), pipeline.getName(),
+                decryptedGraphs, dataSets, relays).invokePipelineElementsAndRelays();
+
+        if (status.isSuccess()) {
+            storeInvocationGraphs(pipeline.getPipelineId(), graphs, dataSets);
+            storeDataStreamRelayContainer(relays);
+
+            PipelineStatusManager.addPipelineStatus(
+                  pipeline.getPipelineId(),
+                  new PipelineStatusMessage(pipeline.getPipelineId(),
+                          System.currentTimeMillis(),
+                          PipelineStatusMessageType.PIPELINE_STARTED.title(),
+                          PipelineStatusMessageType.PIPELINE_STARTED.description()));
+
+          if (storeStatus) setPipelineStarted(pipeline);
+        }
+        return status;
+    }
+
+    public PipelineOperationStatus stopPipeline() {
+        List<InvocableStreamPipesEntity> graphs = TemporaryGraphStorage.graphStorage.get(pipeline.getPipelineId());
+        List<SpDataSet> dataSets = TemporaryGraphStorage.datasetStorage.get(pipeline.getPipelineId());
+        List<SpDataStreamRelayContainer> relays = generateRelays(graphs);
+
+        PipelineOperationStatus status = new GraphSubmitter(pipeline.getPipelineId(), pipeline.getName(), graphs,
+                dataSets, relays).detachPipelineElementsAndRelays();
+
+        if (status.isSuccess()) {
+            if (visualize) deleteVisualization(pipeline.getPipelineId());
+            if (storeStatus) setPipelineStopped(pipeline);
+
+            deleteDataStreamRelayContainer(relays);
+
+            PipelineStatusManager.addPipelineStatus(pipeline.getPipelineId(),
+                  new PipelineStatusMessage(pipeline.getPipelineId(),
+                          System.currentTimeMillis(),
+                          PipelineStatusMessageType.PIPELINE_STOPPED.title(),
+                          PipelineStatusMessageType.PIPELINE_STOPPED.description()));
+        }
+        return status;
+    }
+
+}
diff --git a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/pipeline/PipelineMigrationExecutor.java b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/pipeline/PipelineMigrationExecutor.java
new file mode 100644
index 0000000..e610d24
--- /dev/null
+++ b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/pipeline/PipelineMigrationExecutor.java
@@ -0,0 +1,349 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.apache.streampipes.manager.execution.pipeline;
+
+import org.apache.streampipes.manager.data.PipelineGraph;
+import org.apache.streampipes.manager.data.PipelineGraphBuilder;
+import org.apache.streampipes.manager.data.PipelineGraphHelpers;
+import org.apache.streampipes.manager.execution.http.GraphSubmitter;
+import org.apache.streampipes.manager.matching.InvocationGraphBuilder;
+import org.apache.streampipes.model.SpDataSet;
+import org.apache.streampipes.model.eventrelay.SpDataStreamRelayContainer;
+import org.apache.streampipes.model.base.InvocableStreamPipesEntity;
+import org.apache.streampipes.model.base.NamedStreamPipesEntity;
+import org.apache.streampipes.model.graph.DataProcessorInvocation;
+import org.apache.streampipes.model.pipeline.Pipeline;
+import org.apache.streampipes.model.pipeline.PipelineElementMigrationEntity;
+import org.apache.streampipes.model.pipeline.PipelineElementStatus;
+import org.apache.streampipes.model.pipeline.PipelineOperationStatus;
+
+import java.util.*;
+import java.util.stream.Collectors;
+
+public class PipelineMigrationExecutor extends AbstractPipelineExecutor {
+
+    /**
+     * Old pipeline before migration
+     */
+    private final Pipeline pipelineBeforeMigration;
+
+    /**
+     * Pipeline element to be migrated. Contains pair of source and target element description
+     */
+    private final PipelineElementMigrationEntity migrationEntity;
+
+    /**
+     * Predecessors of the pipeline element to be migrated in the migration pipeline
+     */
+    private final List<NamedStreamPipesEntity> predecessorsAfterMigration;
+
+    /**
+     * Predecessors of the pipeline element to be migrated in the old pipeline before migration
+     */
+    private final List<NamedStreamPipesEntity> predecessorsBeforeMigration;
+
+    /**
+     * Collection of relays that were created in the migration process needing to be stored
+     */
+    private final List<SpDataStreamRelayContainer> relaysToBePersisted;
+
+    /**
+     * Collection of relays that were removed in the migration process needing to be deleted from persistent storage.
+     */
+    private final List<SpDataStreamRelayContainer> relaysToBeDeleted;
+
+
+    public PipelineMigrationExecutor(Pipeline pipeline, Pipeline pipelineBeforeMigration,
+                                     PipelineElementMigrationEntity migrationEntity,
+                                     boolean visualize, boolean storeStatus, boolean monitor) {
+        super(pipeline, visualize, storeStatus, monitor);
+        this.pipelineBeforeMigration = pipelineBeforeMigration;
+        this.migrationEntity = migrationEntity;
+        this.predecessorsAfterMigration = new ArrayList<>();
+        this.predecessorsBeforeMigration = new ArrayList<>();
+        this.relaysToBePersisted = new ArrayList<>();
+        this.relaysToBeDeleted = new ArrayList<>();
+    }
+
+    // TODO: refactor!
+    public PipelineOperationStatus migratePipelineElement() {
+
+        PipelineOperationStatus status = initPipelineOperationStatus();
+
+        // 1. start new element
+        // 2. stop relay to origin element
+        // 3. start relay to new element
+        // 4. stop origin element
+        // 5. stop origin relay
+
+        prepareMigration();
+
+        // Start target pipeline elements and relays on new target node
+        PipelineOperationStatus statusStartTarget = startTargetPipelineElementsAndRelays(status);
+        if(!statusStartTarget.isSuccess()){
+            //Target PE could not be started; nothing to roll back
+            return status;
+        }
+
+        // Stop relays from origin predecessor
+        PipelineOperationStatus statusStopRelays = stopRelaysFromPredecessorsBeforeMigration(status);
+        if(!statusStopRelays.isSuccess()){
+            rollbackToPreMigrationStepOne(statusStopRelays, status);
+            return status;
+        }
+
+        // Start relays to target after migration
+        PipelineOperationStatus statusStartRelays = startRelaysFromPredecessorsAfterMigration(status);
+        if(!statusStartRelays.isSuccess()){
+            rollbackToPreMigrationStepTwo(statusStartRelays, status);
+            return status;
+        }
+
+        //Stop origin and associated relay
+        PipelineOperationStatus statusStopOrigin = stopOriginPipelineElementsAndRelays(status);
+        if(!statusStopOrigin.isSuccess()){
+            rollbackToPreMigrationStepThree(status);
+            return status;
+        }
+
+        List<InvocableStreamPipesEntity> graphs = new ArrayList<>();
+        graphs.addAll(pipeline.getActions());
+        graphs.addAll(pipeline.getSepas());
+
+        List<SpDataSet> dataSets = findDataSets();
+
+        // store new pipeline and relays
+        storeInvocationGraphs(pipeline.getPipelineId(), graphs, dataSets);
+        storeDataStreamRelayContainer(relaysToBePersisted);
+        deleteDataStreamRelayContainer(relaysToBeDeleted);
+
+        // set global status
+        status.setSuccess(status.getElementStatus().stream().allMatch(PipelineElementStatus::isSuccess));
+
+        return status;
+    }
+
+    private void prepareMigration() {
+        //Purge existing relays
+        purgeExistingRelays();
+
+        //Generate new relays
+        PipelineGraph pipelineGraphAfterMigration = new PipelineGraphBuilder(pipeline).buildGraph();
+        buildGraphWithRelays(pipelineGraphAfterMigration);
+
+        //Get predecessors
+        PipelineGraph pipelineGraphBeforeMigration = new PipelineGraphBuilder(pipelineBeforeMigration).buildGraph();
+        findPredecessorsInMigrationPipeline(pipelineGraphAfterMigration);
+
+        //Find counterpart for predecessors in currentPipeline
+        findAndComparePredecessorsInCurrentPipeline(pipelineGraphBeforeMigration);
+    }
+
+    private void rollbackToPreMigrationStepOne(PipelineOperationStatus statusStopRelays,
+                                               PipelineOperationStatus status) {
+        // Stop target pipeline element and relays on new target node
+        PipelineOperationStatus statusTargetRollback = rollbackTargetPipelineElementsAndRelays();
+
+        // Restart relays before migration attempt
+        // extract unique running instance ids of original relays
+        Set<String> relayIdsToRollback = extractUniqueRelayIds(statusStopRelays);
+        PipelineOperationStatus statusRelaysRollback = rollbackToOriginRelaysByInvoke(relayIdsToRollback);
+
+        // Add status to global migration status
+        updateMigrationStatus(statusTargetRollback, status);
+        updateMigrationStatus(statusRelaysRollback, status);
+    }
+
+    private void rollbackToPreMigrationStepTwo(PipelineOperationStatus statusStartRelays,
+                                               PipelineOperationStatus status) {
+        //Rollback target PE, stopped relays and all successfully started relays
+        PipelineOperationStatus statusTargetRollback = rollbackTargetPipelineElementsAndRelays();
+
+        // Restart relays to origin from predecessors before migration
+        PipelineOperationStatus statusRelaysInvokeRollback = startRelays(findRelays(predecessorsBeforeMigration,
+                migrationEntity.getSourceElement()));
+
+        // Stop relays that were started due to migration attempt
+        // extract unique running instance ids of original relays
+        Set<String> relayIdsToRollback = extractUniqueRelayIds(statusStartRelays);
+        PipelineOperationStatus statusRelaysDetachRollback = rollbackTargetRelaysByDetach(relayIdsToRollback);
+
+        // Add status to global migration status
+        updateMigrationStatus(statusTargetRollback, status);
+        updateMigrationStatus(statusRelaysInvokeRollback, status);
+        updateMigrationStatus(statusRelaysDetachRollback, status);
+    }
+
+    private void rollbackToPreMigrationStepThree(PipelineOperationStatus status) {
+        //Rollback everything
+        PipelineOperationStatus statusStopRelays = stopRelays(findRelays(predecessorsAfterMigration,
+                migrationEntity.getTargetElement()));
+
+        PipelineOperationStatus statusStartRelays = startRelays(findRelays(predecessorsBeforeMigration,
+                migrationEntity.getSourceElement()));
+
+        List<InvocableStreamPipesEntity> graphs = Collections.singletonList(migrationEntity.getTargetElement());
+        List<SpDataStreamRelayContainer> relays = extractRelaysFromDataProcessor(graphs);
+
+        PipelineOperationStatus statusStopTargetAndRelays = stopPipelineElementsAndRelays(graphs, relays);
+
+        // Add status to global migration status
+        updateMigrationStatus(statusStopRelays, status);
+        updateMigrationStatus(statusStartRelays, status);
+        updateMigrationStatus(statusStopTargetAndRelays, status);
+    }
+
+
+    private PipelineOperationStatus rollbackToOriginRelaysByInvoke(Set<String> relayIdsToRollback) {
+        List<SpDataStreamRelayContainer> rollbackRelays = findRelaysAndFilterById(relayIdsToRollback,
+                predecessorsBeforeMigration, migrationEntity.getSourceElement());
+
+        return startRelays(rollbackRelays);
+    }
+
+    private PipelineOperationStatus rollbackTargetRelaysByDetach(Set<String> relayIdsToRollback) {
+        List<SpDataStreamRelayContainer> rollbackRelays = findRelaysAndFilterById(relayIdsToRollback,
+                predecessorsAfterMigration, migrationEntity.getTargetElement());
+
+        return stopRelays(rollbackRelays);
+    }
+
+    private PipelineOperationStatus rollbackTargetPipelineElementsAndRelays() {
+        List<InvocableStreamPipesEntity> graphs = Collections.singletonList(migrationEntity.getTargetElement());
+        List<SpDataStreamRelayContainer> relays = extractRelaysFromDataProcessor(graphs);
+
+        return new GraphSubmitter(pipeline.getPipelineId(),
+                pipeline.getName(), graphs, new ArrayList<>(), relays).detachPipelineElementsAndRelays();
+    }
+
+    private PipelineOperationStatus startRelaysFromPredecessorsAfterMigration(PipelineOperationStatus status) {
+        List<SpDataStreamRelayContainer> relays = findRelays(predecessorsAfterMigration,
+                migrationEntity.getTargetElement());
+
+        updateRelaysToBePersisted(relays);
+
+        PipelineOperationStatus statusStartRelays = startRelays(relays);
+        updateMigrationStatus(statusStartRelays, status);
+
+        return statusStartRelays;
+    }
+
+    private PipelineOperationStatus stopRelaysFromPredecessorsBeforeMigration(PipelineOperationStatus status) {
+        List<SpDataStreamRelayContainer> relays = findRelays(predecessorsBeforeMigration,
+                migrationEntity.getSourceElement());
+
+        updateRelaysToBeDeleted(relays);
+
+        PipelineOperationStatus statusStopRelays = stopRelays(relays);
+        updateMigrationStatus(statusStopRelays, status);
+
+        return statusStopRelays;
+    }
+
+    private PipelineOperationStatus startTargetPipelineElementsAndRelays(PipelineOperationStatus status) {
+        List<InvocableStreamPipesEntity> decryptedGraphs =
+                decryptSecrets(Collections.singletonList(migrationEntity.getTargetElement()));
+        List<SpDataStreamRelayContainer> relays = extractRelaysFromDataProcessor(decryptedGraphs);
+
+        updateRelaysToBePersisted(relays);
+
+        PipelineOperationStatus statusStartTarget = startPipelineElementsAndRelays(decryptedGraphs, relays);
+        updateMigrationStatus(statusStartTarget, status);
+
+        return statusStartTarget;
+    }
+
+    private PipelineOperationStatus stopOriginPipelineElementsAndRelays(PipelineOperationStatus status) {
+        List<InvocableStreamPipesEntity> graphs = Collections.singletonList(migrationEntity.getSourceElement());
+        List<SpDataStreamRelayContainer> relays = extractRelaysFromDataProcessor(graphs);
+
+        updateRelaysToBeDeleted(relays);
+
+        PipelineOperationStatus statusStopOrigin = stopPipelineElementsAndRelays(graphs, relays);
+        updateMigrationStatus(statusStopOrigin, status);
+
+        return statusStopOrigin;
+    }
+
+    // Helpers
+
+    private void updateRelaysToBePersisted(List<SpDataStreamRelayContainer> relays) {
+        relays.stream()
+                .filter(r -> r.getOutputStreamRelays().size() > 0)
+                .forEach(relaysToBePersisted::add);
+    }
+
+    private void updateRelaysToBeDeleted(List<SpDataStreamRelayContainer> relays) {
+        relays.stream()
+                .filter(r -> r.getOutputStreamRelays().size() > 0)
+                .forEach(relaysToBeDeleted::add);
+    }
+
+    private List<SpDataStreamRelayContainer> findRelaysAndFilterById(Set<String> relayIdsToRollback,
+                                                                     List<NamedStreamPipesEntity> predecessor,
+                                                                     InvocableStreamPipesEntity target) {
+        return findRelays(predecessor, target).stream()
+                .filter(relay -> relayIdsToRollback.contains(relay.getRunningStreamRelayInstanceId()))
+                .collect(Collectors.toList());
+    }
+
+    private void findAndComparePredecessorsInCurrentPipeline(PipelineGraph pipelineGraphBeforeMigration) {
+        predecessorsAfterMigration.forEach(migrationPredecessor ->
+                predecessorsBeforeMigration.add(findMatching(migrationPredecessor, pipelineGraphBeforeMigration)));
+    }
+
+    private void findPredecessorsInMigrationPipeline(PipelineGraph pipelineGraphAfterMigration) {
+        PipelineGraphHelpers.findStreams(pipelineGraphAfterMigration).forEach(stream ->
+                predecessorsAfterMigration.addAll(getPredecessors(stream, migrationEntity.getTargetElement(),
+                        pipelineGraphAfterMigration, new ArrayList<>())));
+    }
+
+    private List<SpDataSet> findDataSets() {
+        return pipeline.getStreams().stream()
+                .filter(s -> s instanceof SpDataSet)
+                .map(s -> new SpDataSet((SpDataSet) s))
+                .collect(Collectors.toList());
+    }
+
+    private void buildGraphWithRelays(PipelineGraph pipelineGraphAfterMigration) {
+        new InvocationGraphBuilder(pipelineGraphAfterMigration, pipeline.getPipelineId()).buildGraphs();
+    }
+
+    private void purgeExistingRelays() {
+        pipeline.getSepas().forEach(s -> s.setOutputStreamRelays(new ArrayList<>()));
+    }
+
+    private void updateMigrationStatus(PipelineOperationStatus partialStatus, PipelineOperationStatus status) {
+        // Add status to global migration status
+        partialStatus.getElementStatus().forEach(status::addPipelineElementStatus);
+    }
+
+    private List<SpDataStreamRelayContainer> extractRelaysFromDataProcessor(List<InvocableStreamPipesEntity> graphs) {
+        return graphs.stream()
+                .map(DataProcessorInvocation.class::cast)
+                .map(SpDataStreamRelayContainer::new)
+                .collect(Collectors.toList());
+    }
+
+    private Set<String> extractUniqueRelayIds(PipelineOperationStatus status) {
+        return status.getElementStatus().stream()
+                .filter(PipelineElementStatus::isSuccess)
+                .map(PipelineElementStatus::getElementId)
+                .collect(Collectors.toSet());
+    }
+}
diff --git a/streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/api/DataSinkPipelineElementResource.java b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/pipeline/PipelineMigrationHelpers.java
similarity index 67%
copy from streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/api/DataSinkPipelineElementResource.java
copy to streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/pipeline/PipelineMigrationHelpers.java
index c404896..2258cb9 100644
--- a/streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/api/DataSinkPipelineElementResource.java
+++ b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/pipeline/PipelineMigrationHelpers.java
@@ -15,16 +15,7 @@
  * limitations under the License.
  *
  */
-package org.apache.streampipes.node.controller.container.api;
+package org.apache.streampipes.manager.execution.pipeline;
 
-import org.apache.streampipes.model.graph.DataSinkInvocation;
-
-import javax.ws.rs.Path;
-
-//@Path("/api/v2/node/element/sec")
-//public class DataSinkPipelineElementResource extends InvocableEntityResource<DataSinkInvocation> {
-//
-//    public DataSinkPipelineElementResource() {
-//        super(DataSinkInvocation.class);
-//    }
-//}
+public class PipelineMigrationHelpers {
+}
diff --git a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/http/PipelineStorageService.java b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/pipeline/PipelineStorageService.java
similarity index 98%
rename from streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/http/PipelineStorageService.java
rename to streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/pipeline/PipelineStorageService.java
index 9df92ac..23fc3e2 100644
--- a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/http/PipelineStorageService.java
+++ b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/pipeline/PipelineStorageService.java
@@ -16,7 +16,7 @@
  *
  */
 
-package org.apache.streampipes.manager.execution.http;
+package org.apache.streampipes.manager.execution.pipeline;
 
 import org.apache.streampipes.manager.data.PipelineGraph;
 import org.apache.streampipes.manager.data.PipelineGraphBuilder;
diff --git a/streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/api/DataSinkPipelineElementResource.java b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/pipeline/migration/Command.java
similarity index 67%
copy from streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/api/DataSinkPipelineElementResource.java
copy to streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/pipeline/migration/Command.java
index c404896..fd4bdd9 100644
--- a/streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/api/DataSinkPipelineElementResource.java
+++ b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/pipeline/migration/Command.java
@@ -15,16 +15,9 @@
  * limitations under the License.
  *
  */
-package org.apache.streampipes.node.controller.container.api;
+package org.apache.streampipes.manager.execution.pipeline.migration;
 
-import org.apache.streampipes.model.graph.DataSinkInvocation;
-
-import javax.ws.rs.Path;
-
-//@Path("/api/v2/node/element/sec")
-//public class DataSinkPipelineElementResource extends InvocableEntityResource<DataSinkInvocation> {
-//
-//    public DataSinkPipelineElementResource() {
-//        super(DataSinkInvocation.class);
-//    }
-//}
+public interface Command {
+    void execute();
+    void rollback();
+}
diff --git a/streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/api/DataSinkPipelineElementResource.java b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/pipeline/migration/PipelineElementStartCommand.java
similarity index 67%
copy from streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/api/DataSinkPipelineElementResource.java
copy to streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/pipeline/migration/PipelineElementStartCommand.java
index c404896..4afbf46 100644
--- a/streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/api/DataSinkPipelineElementResource.java
+++ b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/pipeline/migration/PipelineElementStartCommand.java
@@ -15,16 +15,17 @@
  * limitations under the License.
  *
  */
-package org.apache.streampipes.node.controller.container.api;
+package org.apache.streampipes.manager.execution.pipeline.migration;
 
-import org.apache.streampipes.model.graph.DataSinkInvocation;
+public class PipelineElementStartCommand implements Command {
 
-import javax.ws.rs.Path;
+    @Override
+    public void execute() {
 
-//@Path("/api/v2/node/element/sec")
-//public class DataSinkPipelineElementResource extends InvocableEntityResource<DataSinkInvocation> {
-//
-//    public DataSinkPipelineElementResource() {
-//        super(DataSinkInvocation.class);
-//    }
-//}
+    }
+
+    @Override
+    public void rollback() {
+
+    }
+}
diff --git a/streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/api/DataProcessorPipelineElementResource.java b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/pipeline/migration/PipelineElementStopCommand.java
similarity index 65%
rename from streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/api/DataProcessorPipelineElementResource.java
rename to streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/pipeline/migration/PipelineElementStopCommand.java
index bb186e8..5a2bbd6 100644
--- a/streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/api/DataProcessorPipelineElementResource.java
+++ b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/pipeline/migration/PipelineElementStopCommand.java
@@ -15,16 +15,16 @@
  * limitations under the License.
  *
  */
-package org.apache.streampipes.node.controller.container.api;
+package org.apache.streampipes.manager.execution.pipeline.migration;
 
-import org.apache.streampipes.model.graph.DataProcessorInvocation;
+public class PipelineElementStopCommand implements Command {
+    @Override
+    public void execute() {
 
-import javax.ws.rs.Path;
+    }
 
-//@Path("/api/v2/node/element/sepa")
-//public class DataProcessorPipelineElementResource extends InvocableEntityResource<DataProcessorInvocation> {
-//
-//    public DataProcessorPipelineElementResource() {
-//        super(DataProcessorInvocation.class);
-//    }
-//}
+    @Override
+    public void rollback() {
+
+    }
+}
diff --git a/streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/api/DataSinkPipelineElementResource.java b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/pipeline/migration/RelayStartCommand.java
similarity index 67%
copy from streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/api/DataSinkPipelineElementResource.java
copy to streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/pipeline/migration/RelayStartCommand.java
index c404896..b2e242d 100644
--- a/streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/api/DataSinkPipelineElementResource.java
+++ b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/pipeline/migration/RelayStartCommand.java
@@ -15,16 +15,16 @@
  * limitations under the License.
  *
  */
-package org.apache.streampipes.node.controller.container.api;
+package org.apache.streampipes.manager.execution.pipeline.migration;
 
-import org.apache.streampipes.model.graph.DataSinkInvocation;
+public class RelayStartCommand implements Command {
+    @Override
+    public void execute() {
 
-import javax.ws.rs.Path;
+    }
 
-//@Path("/api/v2/node/element/sec")
-//public class DataSinkPipelineElementResource extends InvocableEntityResource<DataSinkInvocation> {
-//
-//    public DataSinkPipelineElementResource() {
-//        super(DataSinkInvocation.class);
-//    }
-//}
+    @Override
+    public void rollback() {
+
+    }
+}
diff --git a/streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/api/DataSinkPipelineElementResource.java b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/pipeline/migration/RelayStopCommand.java
similarity index 67%
copy from streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/api/DataSinkPipelineElementResource.java
copy to streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/pipeline/migration/RelayStopCommand.java
index c404896..1938268 100644
--- a/streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/api/DataSinkPipelineElementResource.java
+++ b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/pipeline/migration/RelayStopCommand.java
@@ -15,16 +15,16 @@
  * limitations under the License.
  *
  */
-package org.apache.streampipes.node.controller.container.api;
+package org.apache.streampipes.manager.execution.pipeline.migration;
 
-import org.apache.streampipes.model.graph.DataSinkInvocation;
+public class RelayStopCommand implements Command {
+    @Override
+    public void execute() {
 
-import javax.ws.rs.Path;
+    }
 
-//@Path("/api/v2/node/element/sec")
-//public class DataSinkPipelineElementResource extends InvocableEntityResource<DataSinkInvocation> {
-//
-//    public DataSinkPipelineElementResource() {
-//        super(DataSinkInvocation.class);
-//    }
-//}
+    @Override
+    public void rollback() {
+
+    }
+}
diff --git a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/matching/InvocationGraphBuilder.java b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/matching/InvocationGraphBuilder.java
index f7cae4f..533b8a2 100644
--- a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/matching/InvocationGraphBuilder.java
+++ b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/matching/InvocationGraphBuilder.java
@@ -28,7 +28,7 @@ import org.apache.streampipes.manager.data.PipelineGraphHelpers;
 import org.apache.streampipes.manager.matching.output.OutputSchemaFactory;
 import org.apache.streampipes.manager.matching.output.OutputSchemaGenerator;
 import org.apache.streampipes.model.SpDataStream;
-import org.apache.streampipes.model.SpDataStreamRelay;
+import org.apache.streampipes.model.eventrelay.SpDataStreamRelay;
 import org.apache.streampipes.model.base.InvocableStreamPipesEntity;
 import org.apache.streampipes.model.base.NamedStreamPipesEntity;
 import org.apache.streampipes.model.graph.DataProcessorInvocation;
diff --git a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/migration/PipelineElementMigrationHandler.java b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/migration/PipelineElementMigrationHandler.java
new file mode 100644
index 0000000..4d3f6d5
--- /dev/null
+++ b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/migration/PipelineElementMigrationHandler.java
@@ -0,0 +1,147 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.apache.streampipes.manager.migration;
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.streampipes.commons.exceptions.SpRuntimeException;
+import org.apache.streampipes.manager.execution.pipeline.PipelineMigrationExecutor;
+import org.apache.streampipes.manager.operations.Operations;
+import org.apache.streampipes.model.graph.DataProcessorInvocation;
+import org.apache.streampipes.model.pipeline.Pipeline;
+import org.apache.streampipes.model.pipeline.PipelineElementMigrationEntity;
+import org.apache.streampipes.model.pipeline.PipelineElementStatus;
+import org.apache.streampipes.model.pipeline.PipelineOperationStatus;
+import org.apache.streampipes.storage.api.IPipelineStorage;
+import org.apache.streampipes.storage.management.StorageDispatcher;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Optional;
+
+public class PipelineElementMigrationHandler {
+
+    private final PipelineOperationStatus pipelineMigrationStatus;
+    private final Pipeline desiredPipeline;
+    private final Pipeline migrationPipeline;
+    private Pipeline currentPipeline;
+    private final boolean visualize;
+    private final boolean storeStatus;
+    private final boolean monitor;
+
+    public PipelineElementMigrationHandler(Pipeline desiredPipeline, boolean visualize, boolean storeStatus,
+                                           boolean monitor) {
+        this.pipelineMigrationStatus = new PipelineOperationStatus();
+        this.desiredPipeline = desiredPipeline;
+        this.currentPipeline = getPipelineById(desiredPipeline.getPipelineId());
+        this.migrationPipeline = getPipelineById(desiredPipeline.getPipelineId());
+        this.visualize = visualize;
+        this.storeStatus = storeStatus;
+        this.monitor = monitor;
+    }
+
+    public PipelineOperationStatus handlePipelineMigration() {
+        migratePipelineElementOrRollback();
+        return verifyPipelineMigrationStatus(pipelineMigrationStatus,
+                "Successfully migrated Pipeline Elements in Pipeline " + desiredPipeline.getName(),
+                "Could not migrate all Pipeline Elements in Pipeline " + desiredPipeline.getName());
+    }
+
+    private void migratePipelineElementOrRollback() {
+        List<PipelineElementMigrationEntity> migrationEntityList = getPipelineDelta(desiredPipeline, migrationPipeline);
+
+        migrationEntityList.forEach(entity -> {
+            swapPipelineElement(migrationPipeline, entity);
+
+            PipelineOperationStatus entityStatus = migratePipelineElement(entity);
+
+            entityStatus.getElementStatus()
+                    .forEach(this.pipelineMigrationStatus::addPipelineElementStatus);
+
+            if (entityStatus.isSuccess()) {
+                try {
+                    currentPipeline = deepCopyPipeline(migrationPipeline);
+                } catch (JsonProcessingException e) {
+                    throw new SpRuntimeException("Could not deep copy pipeline for migration: " + e.getMessage(), e);
+                }
+            } else {
+                PipelineElementMigrationEntity failedEntity =
+                        new PipelineElementMigrationEntity(entity.getTargetElement(), entity.getSourceElement());
+
+                swapPipelineElement(migrationPipeline, failedEntity);
+            }
+        });
+
+        Operations.overwritePipeline(migrationPipeline);
+    }
+
+    private PipelineOperationStatus migratePipelineElement(PipelineElementMigrationEntity migrationEntity) {
+        return new PipelineMigrationExecutor(migrationPipeline, currentPipeline, migrationEntity, visualize,
+                storeStatus, monitor).migratePipelineElement();
+    }
+
+
+    // Helpers
+
+    private Pipeline getPipelineById(String pipelineId) {
+        return getPipelineStorageApi().getPipeline(pipelineId);
+    }
+
+    private IPipelineStorage getPipelineStorageApi() {
+        return StorageDispatcher.INSTANCE.getNoSqlStore().getPipelineStorageAPI();
+    }
+
+    private List<PipelineElementMigrationEntity> getPipelineDelta(Pipeline pipelineX, Pipeline pipelineY){
+        List<PipelineElementMigrationEntity> delta = new ArrayList<>();
+        pipelineX.getSepas().forEach(iX -> {
+            if (pipelineY.getSepas().stream().filter(iY -> iY.getElementId().equals(iX.getElementId()))
+                    .noneMatch(iY -> iY.getDeploymentTargetNodeId().equals(iX.getDeploymentTargetNodeId()))){
+                Optional<DataProcessorInvocation> invocationY = pipelineY.getSepas().stream().
+                        filter(iY -> iY.getDeploymentRunningInstanceId().equals(iX.getDeploymentRunningInstanceId())).findFirst();
+                invocationY.ifPresent(dataProcessorInvocation -> delta.add(new PipelineElementMigrationEntity(dataProcessorInvocation, iX)));
+            }
+        });
+        return delta;
+    }
+
+    private PipelineOperationStatus verifyPipelineMigrationStatus(PipelineOperationStatus status, String successMessage,
+                                                                  String errorMessage) {
+        //Duplicate from method in GraphSubmitter
+        status.setSuccess(status.getElementStatus().stream().allMatch(PipelineElementStatus::isSuccess));
+        if (status.isSuccess()) {
+            status.setTitle(successMessage);
+        } else {
+            status.setTitle(errorMessage);
+        }
+        return status;
+    }
+
+    private void swapPipelineElement(Pipeline exchangePipeline,
+                                           PipelineElementMigrationEntity migrationEntity){
+        if (migrationEntity.getTargetElement() instanceof DataProcessorInvocation){
+            int index = exchangePipeline.getSepas().indexOf(migrationEntity.getSourceElement());
+            exchangePipeline.getSepas().remove(index);
+            exchangePipeline.getSepas().add(index, (DataProcessorInvocation) migrationEntity.getTargetElement());
+        }
+    }
+
+    public static Pipeline deepCopyPipeline(Pipeline object) throws JsonProcessingException {
+        ObjectMapper objectMapper = new ObjectMapper();
+        return objectMapper.readValue(objectMapper.writeValueAsString(object), Pipeline.class);
+    }
+}
diff --git a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/node/AbstractClusterManager.java b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/node/AbstractClusterManager.java
index b5803d5..dce1e06 100644
--- a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/node/AbstractClusterManager.java
+++ b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/node/AbstractClusterManager.java
@@ -17,8 +17,11 @@
  */
 package org.apache.streampipes.manager.node;
 
+import com.fasterxml.jackson.core.JsonProcessingException;
 import org.apache.http.client.fluent.Request;
 import org.apache.http.entity.ContentType;
+import org.apache.streampipes.commons.exceptions.SpRuntimeException;
+import org.apache.streampipes.model.eventrelay.SpDataStreamRelayContainer;
 import org.apache.streampipes.model.node.NodeInfoDescription;
 import org.apache.streampipes.serializers.json.JacksonSerializer;
 import org.slf4j.Logger;
@@ -30,25 +33,50 @@ public abstract class AbstractClusterManager {
     private static final Logger LOG = LoggerFactory.getLogger(AbstractClusterManager.class.getCanonicalName());
 
     private static final String PROTOCOL = "http://";
-    private static final String SLASH = "/";
     private static final String COLON = ":";
     private static final long RETRY_INTERVAL_MS = 5000;
-    private static final Object BASE_NODE_CONTROLLER_INFO_ROUTE = "/api/v2/node/info";
     private static final int CONNECT_TIMEOUT = 1000;
+    private static final String BASE_NODE_CONTROLLER_INFO_ROUTE = "/api/v2/node/info";
+    private static final String BASE_NODE_CONTROLLER_RELAY_ROUTE = "/api/v2/node/stream/relay";
 
-    protected static boolean syncStateUpdateWithRemoteNodeController(NodeInfoDescription desc, boolean activate) {
+    public enum RequestOptions {
+        GET,POST,PUT,DELETE
+    }
+
+    protected static <T> boolean syncWithNodeController(T element, NodeSyncOptions sync) {
+        switch (sync) {
+            case ACTIVATE_NODE:
+                return sync(element, "/activate", RequestOptions.POST, false, NodeInfoDescription.class);
+            case DEACTIVATE_NODE:
+                return sync(element, "/deactivate", RequestOptions.POST, false, NodeInfoDescription.class);
+            case UPDATE_NODE:
+                return sync(element, "", RequestOptions.PUT, true, NodeInfoDescription.class);
+            case RESTART_RELAYS:
+                return sync(element, "/invoke", RequestOptions.POST, true, SpDataStreamRelayContainer.class);
+            default:
+                return false;
+        }
+    }
+
+    private static <T> boolean sync(T element, String subroute, RequestOptions request, boolean withBody, Class<?> type) {
         boolean synced = false;
-        String url;
-        if (activate) {
-            url = generateEndpoint(desc, "/activate");
-        } else {
-            url = generateEndpoint(desc, "/deactivate");
+
+        String body = "{}";
+        if (withBody) {
+            body = jackson(element);
         }
-        LOG.info("Trying to sync state update with node controller=" + url);
+
+        String url = generateEndpoint(element, subroute, type);
+        LOG.info("Trying to sync with node controller=" + url);
 
         boolean connected = false;
         while (!connected) {
-            connected = post(url);
+            // call node controller REST endpoints
+            switch (request) {
+                case POST: connected = post(url, body);
+                case PUT : connected = put(url, body);
+            }
+
             if (!connected) {
                 LOG.info("Retrying in {} seconds", (RETRY_INTERVAL_MS / 10000));
                 try {
@@ -57,46 +85,43 @@ public abstract class AbstractClusterManager {
                     e.printStackTrace();
                 }
             }
-        }
-        synced = true;
-        return synced;
-    }
-
-    protected static boolean syncWithRemoteNodeController(NodeInfoDescription desc) {
-        boolean synced = false;
-        try {
-            String body = JacksonSerializer.getObjectMapper().writeValueAsString(desc);
-            String url = generateEndpoint(desc);
-            LOG.info("Trying to sync description updates with node controller=" + url);
-
-            boolean connected = false;
-            while (!connected) {
-                connected = put(url, body);
-                if (!connected) {
-                    LOG.info("Retrying in {} seconds", (RETRY_INTERVAL_MS / 10000));
-                    try {
-                        Thread.sleep(RETRY_INTERVAL_MS);
-                    } catch (InterruptedException e) {
-                        e.printStackTrace();
-                    }
-                }
-            }
             synced = true;
-        } catch (IOException e) {
-            e.printStackTrace();
         }
+        LOG.info("Successfully synced with node controller=" + url);
         return synced;
     }
 
-    protected static String generateEndpoint(NodeInfoDescription desc) {
-        return generateEndpoint(desc, "");
+    // Helpers
+
+    private static <T> String generateEndpoint(T desc, String subroute, Class<?> type) {
+        if (type.equals(NodeInfoDescription.class)) {
+            NodeInfoDescription d = (NodeInfoDescription) desc;
+            return PROTOCOL
+                    + d.getHostname()
+                    + COLON
+                    + d.getPort()
+                    + BASE_NODE_CONTROLLER_INFO_ROUTE
+                    + subroute;
+        } else {
+            SpDataStreamRelayContainer d = (SpDataStreamRelayContainer) desc;
+            return PROTOCOL
+                    + d.getDeploymentTargetNodeHostname()
+                    + COLON
+                    + d.getDeploymentTargetNodePort()
+                    + BASE_NODE_CONTROLLER_RELAY_ROUTE
+                    + subroute;
+        }
     }
 
-    protected static String generateEndpoint(NodeInfoDescription desc, String subroute) {
-        return PROTOCOL + desc.getHostname() + COLON + desc.getPort() + BASE_NODE_CONTROLLER_INFO_ROUTE + subroute;
+    private static <T> String jackson(T desc) {
+        try {
+            return JacksonSerializer.getObjectMapper().writeValueAsString(desc);
+        } catch (JsonProcessingException e) {
+            throw new SpRuntimeException("Could not serialize node controller description");
+        }
     }
 
-    protected static boolean put(String url, String body) {
+    private static boolean put(String url, String body) {
         try {
             Request.Put(url)
                     .bodyString(body, ContentType.APPLICATION_JSON)
@@ -109,10 +134,10 @@ public abstract class AbstractClusterManager {
         return false;
     }
 
-    protected static boolean post(String url) {
+    private static boolean post(String url, String body) {
         try {
             Request.Post(url)
-                    .bodyString("{}", ContentType.APPLICATION_JSON)
+                    .bodyString(body, ContentType.APPLICATION_JSON)
                     .connectTimeout(CONNECT_TIMEOUT)
                     .execute();
             return true;
@@ -121,4 +146,5 @@ public abstract class AbstractClusterManager {
         }
         return false;
     }
+
 }
diff --git a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/node/AvailableNodesFetcher.java b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/node/AvailableNodesFetcher.java
index c2d9278..2071e12 100644
--- a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/node/AvailableNodesFetcher.java
+++ b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/node/AvailableNodesFetcher.java
@@ -28,6 +28,7 @@ import java.util.List;
 
 import javax.ws.rs.core.MediaType;
 
+@Deprecated
 public class AvailableNodesFetcher {
 
     public AvailableNodesFetcher() {
diff --git a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/node/NodeClusterManager.java b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/node/NodeClusterManager.java
index 3f58f2f..007dc15 100644
--- a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/node/NodeClusterManager.java
+++ b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/node/NodeClusterManager.java
@@ -17,9 +17,13 @@
  */
 package org.apache.streampipes.manager.node;
 
+import org.apache.streampipes.model.eventrelay.SpDataStreamRelayContainer;
 import org.apache.streampipes.model.message.Message;
+import org.apache.streampipes.model.message.NotificationType;
 import org.apache.streampipes.model.message.Notifications;
 import org.apache.streampipes.model.node.NodeInfoDescription;
+import org.apache.streampipes.storage.api.INodeDataStreamRelay;
+import org.apache.streampipes.storage.api.INodeInfoStorage;
 import org.apache.streampipes.storage.management.StorageDispatcher;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -28,70 +32,132 @@ import java.util.List;
 import java.util.Optional;
 
 public class NodeClusterManager extends AbstractClusterManager {
-
     private static final Logger LOG = LoggerFactory.getLogger(NodeClusterManager.class.getCanonicalName());
 
+
     public static List<NodeInfoDescription> getAvailableNodes() {
         //return new AvailableNodesFetcher().fetchNodes();
-        return StorageDispatcher.INSTANCE.getNoSqlStore().getNodeStorage().getAllActiveNodes();
+        return getNodeStorageApi().getAllActiveNodes();
     }
 
+
     public static List<NodeInfoDescription> getAllNodes() {
         //return new AvailableNodesFetcher().fetchNodes();
-        return StorageDispatcher.INSTANCE.getNoSqlStore().getNodeStorage().getAllNodes();
+        return getNodeStorageApi().getAllNodes();
     }
 
     public static Message updateNode(NodeInfoDescription desc) {
-        boolean successfullyUpdated = syncWithRemoteNodeController(desc);
+        boolean successfullyUpdated = syncWithNodeController(desc, NodeSyncOptions.UPDATE_NODE);
         if (successfullyUpdated) {
-            StorageDispatcher.INSTANCE.getNoSqlStore().getNodeStorage().updateNode(desc);
+            getNodeStorageApi().updateNode(desc);
             return Notifications.success("Node updated");
         }
         return Notifications.error("Could not update node");
     }
 
     public static boolean deactivateNode(String nodeControllerId) {
-        Optional<NodeInfoDescription> storedNode =
-                StorageDispatcher.INSTANCE.getNoSqlStore().getNodeStorage().getNode(nodeControllerId);
+        Optional<NodeInfoDescription> storedNode = getNodeStorageApi().getNode(nodeControllerId);
         boolean status = false;
         if (storedNode.isPresent()) {
-            StorageDispatcher.INSTANCE.getNoSqlStore().getNodeStorage().deactivateNode(nodeControllerId);
-            status = syncStateUpdateWithRemoteNodeController(storedNode.get(), false);
+            getNodeStorageApi().deactivateNode(nodeControllerId);
+            status = syncWithNodeController(storedNode.get(), NodeSyncOptions.DEACTIVATE_NODE);
         }
         return status;
     }
 
     public static boolean activateNode(String nodeControllerId) {
-        Optional<NodeInfoDescription> storedNode =
-                StorageDispatcher.INSTANCE.getNoSqlStore().getNodeStorage().getNode(nodeControllerId);
+        Optional<NodeInfoDescription> storedNode = getNodeStorageApi().getNode(nodeControllerId);
         boolean status = false;
         if (storedNode.isPresent()) {
-            StorageDispatcher.INSTANCE.getNoSqlStore().getNodeStorage().activateNode(nodeControllerId);
-            status = syncStateUpdateWithRemoteNodeController(storedNode.get(), true);
+            getNodeStorageApi().activateNode(nodeControllerId);
+            status = syncWithNodeController(storedNode.get(), NodeSyncOptions.ACTIVATE_NODE);
         }
         return status;
     }
 
-    public static void addNode(NodeInfoDescription desc) {
-        List<NodeInfoDescription> allNodes =
-                StorageDispatcher.INSTANCE.getNoSqlStore().getNodeStorage().getAllNodes();
+    public static Message addOrRejoin(NodeInfoDescription desc) {
+        Optional<NodeInfoDescription> latestDesc = getLatestNodeOrElseEmpty(desc.getNodeControllerId());
 
         boolean alreadyRegistered = false;
-        if (allNodes.size() > 0) {
-            alreadyRegistered = allNodes.stream()
-                    .anyMatch(n -> n.getNodeControllerId().equals(desc.getNodeControllerId()));
+        if (latestDesc.isPresent()) {
+            alreadyRegistered = true;
         }
 
         if (!alreadyRegistered) {
-            LOG.info("New cluster node join registration request on from http://{}:{}", desc.getHostname(), desc.getPort());
-            StorageDispatcher.INSTANCE.getNoSqlStore().getNodeStorage().storeNode(desc);
-            LOG.info("New cluster node successfully joined http://{}:{}", desc.getHostname(), desc.getPort());
+            LOG.info("New cluster node join request from http://{}:{}", desc.getHostname(), desc.getPort());
+            return addNewNode(desc);
         } else {
             LOG.info("Re-joined cluster node from http://{}:{}", desc.getHostname(), desc.getPort());
+            return rejoinAndSyncNode(latestDesc.get());
+        }
+    }
+
+    private static Optional<NodeInfoDescription> getLatestNodeOrElseEmpty(String nodeControllerId) {
+        return getNodeStorageApi().getAllNodes().stream()
+                .filter(n -> n.getNodeControllerId().equals(nodeControllerId))
+                .findAny();
+    }
+
+    private static Message addNewNode(NodeInfoDescription desc) throws RuntimeException {
+        try {
+            getNodeStorageApi().storeNode(desc);
+            LOG.info("New cluster node successfully joined http://{}:{}", desc.getHostname(), desc.getPort());
+            return Notifications.success(NotificationType.NODE_JOIN_SUCCESS);
+        } catch (Exception e) {
+            return Notifications.success(NotificationType.NODE_JOIN_ERROR);
+        }
+    }
+
+    private static Message rejoinAndSyncNode(NodeInfoDescription desc) {
+        LOG.info("Sync latest node description to http://{}:{}", desc.getHostname(), desc.getPort());
+        boolean success = syncWithNodeController(desc, NodeSyncOptions.UPDATE_NODE);
+        if (success) {
+            return restartRelays(desc);
+        }
+        return Notifications.success(NotificationType.NODE_JOIN_ERROR);
+    }
+
+    private static Message restartRelays(NodeInfoDescription desc) {
+        List<SpDataStreamRelayContainer> runningRelays = getDataStreamRelay(desc.getNodeControllerId());
+        if (runningRelays.size() > 0) {
+            runningRelays.forEach(relay -> {
+                LOG.info("Sync active relays name={} to http://{}:{}", relay.getName(), desc.getHostname(),
+                        desc.getPort());
+                syncWithNodeController(relay, NodeSyncOptions.RESTART_RELAYS);
+            });
         }
+        return Notifications.success(NotificationType.NODE_JOIN_SUCCESS);
     }
 
     public static void deleteNode(String nodeControllerId) {
-        StorageDispatcher.INSTANCE.getNoSqlStore().getNodeStorage().deleteNode(nodeControllerId);
+        getNodeStorageApi().deleteNode(nodeControllerId);
     }
+
+
+    public static void persistDataStreamRelay(SpDataStreamRelayContainer relayContainer) {
+        getNodeDataStreamRelayStorageApi().addRelayContainer(relayContainer);
+    }
+
+    public static List<SpDataStreamRelayContainer> getDataStreamRelay(String nodeControllerId) {
+        return getNodeDataStreamRelayStorageApi().getAllByNodeControllerId(nodeControllerId);
+    }
+
+    public static void updateDataStreamRelay(SpDataStreamRelayContainer relayContainer) {
+        getNodeDataStreamRelayStorageApi().updateRelayContainer(relayContainer);
+    }
+
+    public static void deleteDataStreamRelay(SpDataStreamRelayContainer relayContainer) {
+        getNodeDataStreamRelayStorageApi().deleteRelayContainer(relayContainer);
+    }
+
+    // Helpers
+
+    private static INodeInfoStorage getNodeStorageApi() {
+        return StorageDispatcher.INSTANCE.getNoSqlStore().getNodeStorage();
+    }
+
+    private static INodeDataStreamRelay getNodeDataStreamRelayStorageApi(){
+        return StorageDispatcher.INSTANCE.getNoSqlStore().getNodeDataStreamRelayStorage();
+    }
+
 }
diff --git a/streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/api/DataSinkPipelineElementResource.java b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/node/NodeSyncOptions.java
similarity index 67%
rename from streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/api/DataSinkPipelineElementResource.java
rename to streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/node/NodeSyncOptions.java
index c404896..644e688 100644
--- a/streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/api/DataSinkPipelineElementResource.java
+++ b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/node/NodeSyncOptions.java
@@ -15,16 +15,8 @@
  * limitations under the License.
  *
  */
-package org.apache.streampipes.node.controller.container.api;
+package org.apache.streampipes.manager.node;
 
-import org.apache.streampipes.model.graph.DataSinkInvocation;
-
-import javax.ws.rs.Path;
-
-//@Path("/api/v2/node/element/sec")
-//public class DataSinkPipelineElementResource extends InvocableEntityResource<DataSinkInvocation> {
-//
-//    public DataSinkPipelineElementResource() {
-//        super(DataSinkInvocation.class);
-//    }
-//}
+public enum NodeSyncOptions {
+    ACTIVATE_NODE, DEACTIVATE_NODE, UPDATE_NODE,RESTART_RELAYS;
+}
diff --git a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/operations/Operations.java b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/operations/Operations.java
index 7861a6f..6047b1d 100644
--- a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/operations/Operations.java
+++ b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/operations/Operations.java
@@ -18,16 +18,15 @@
 
 package org.apache.streampipes.manager.operations;
 
-import com.fasterxml.jackson.core.JsonProcessingException;
 import org.apache.streampipes.commons.exceptions.NoSuitableSepasAvailableException;
 import org.apache.streampipes.commons.exceptions.SepaParseException;
 import org.apache.streampipes.commons.exceptions.SpRuntimeException;
 import org.apache.streampipes.manager.endpoint.EndpointItemFetcher;
-import org.apache.streampipes.manager.execution.http.MigrationHelpers;
-import org.apache.streampipes.manager.execution.http.PipelineExecutor;
-import org.apache.streampipes.manager.execution.http.PipelineStorageService;
+import org.apache.streampipes.manager.execution.pipeline.PipelineExecutor;
+import org.apache.streampipes.manager.execution.pipeline.PipelineStorageService;
 import org.apache.streampipes.manager.matching.DataSetGroundingSelector;
 import org.apache.streampipes.manager.matching.PipelineVerificationHandler;
+import org.apache.streampipes.manager.migration.PipelineElementMigrationHandler;
 import org.apache.streampipes.manager.recommender.ElementRecommender;
 import org.apache.streampipes.manager.remote.ContainerProvidedOptionsHandler;
 import org.apache.streampipes.manager.runtime.PipelineElementRuntimeInfoFetcher;
@@ -38,28 +37,20 @@ import org.apache.streampipes.manager.topic.WildcardTopicGenerator;
 import org.apache.streampipes.manager.verification.extractor.TypeExtractor;
 import org.apache.streampipes.model.SpDataSet;
 import org.apache.streampipes.model.SpDataStream;
-import org.apache.streampipes.model.base.InvocableStreamPipesEntity;
 import org.apache.streampipes.model.client.endpoint.RdfEndpoint;
 import org.apache.streampipes.model.client.endpoint.RdfEndpointItem;
-import org.apache.streampipes.model.graph.DataProcessorInvocation;
 import org.apache.streampipes.model.message.DataSetModificationMessage;
 import org.apache.streampipes.model.message.Message;
 import org.apache.streampipes.model.message.PipelineModificationMessage;
-import org.apache.streampipes.model.pipeline.Pipeline;
-import org.apache.streampipes.model.pipeline.PipelineElementRecommendationMessage;
-import org.apache.streampipes.model.pipeline.PipelineElementStatus;
-import org.apache.streampipes.model.pipeline.PipelineOperationStatus;
+import org.apache.streampipes.model.pipeline.*;
 import org.apache.streampipes.model.runtime.RuntimeOptionsRequest;
 import org.apache.streampipes.model.runtime.RuntimeOptionsResponse;
 import org.apache.streampipes.model.template.PipelineTemplateDescription;
 import org.apache.streampipes.model.template.PipelineTemplateInvocation;
-import org.apache.streampipes.sdk.helpers.Tuple2;
 import org.apache.streampipes.storage.management.StorageDispatcher;
 
 import java.util.ArrayList;
 import java.util.List;
-import java.util.Optional;
-import java.util.stream.Collectors;
 
 
 /**
@@ -196,35 +187,9 @@ public class Operations {
     return PipelineElementRuntimeInfoFetcher.INSTANCE.getCurrentData(spDataStream);
   }
 
-  public static PipelineOperationStatus migratePipelineProcessors(Pipeline newPipeline, boolean visualize, boolean storeStatus,
-                                                                  boolean monitor) throws SpRuntimeException{
-
-
-    Pipeline currentPipeline = StorageDispatcher.INSTANCE.getNoSqlStore().getPipelineStorageAPI().getPipeline(newPipeline.getPipelineId());
-    Pipeline migrationPipeline = StorageDispatcher.INSTANCE.getNoSqlStore().getPipelineStorageAPI().getPipeline(newPipeline.getPipelineId());
-    PipelineOperationStatus status = new PipelineOperationStatus();
-
-    for(Tuple2<DataProcessorInvocation, DataProcessorInvocation> t : MigrationHelpers.getDelta(newPipeline, migrationPipeline)){
-      MigrationHelpers.exchangePipelineElement(migrationPipeline, t);
-
-      PipelineOperationStatus migrationStatus = new PipelineExecutor(migrationPipeline, visualize, storeStatus, monitor)
-              .migratePipelineElement(currentPipeline, t);
-      migrationStatus.getElementStatus().forEach(status::addPipelineElementStatus);
-      if (migrationStatus.isSuccess()) {
-        try {
-          currentPipeline = MigrationHelpers.deepCopyPipeline(migrationPipeline);
-        } catch (JsonProcessingException e) {
-          throw new SpRuntimeException(e);
-        }
-      } else {
-        Tuple2<DataProcessorInvocation, DataProcessorInvocation> failedMigration = new Tuple2<>(t.b, t.a);
-        MigrationHelpers.exchangePipelineElement(migrationPipeline, failedMigration);
-      }
-    }
-
-    overwritePipeline(migrationPipeline);
-    return MigrationHelpers.verifyPipelineOperationStatus(status,
-            "Successfully migrated Pipeline Elements in Pipeline " + newPipeline.getName(),
-            "Could not migrate all Pipeline Elements in Pipeline " + newPipeline.getName());
+  public static PipelineOperationStatus handlePipelineElementMigration(Pipeline desiredPipeline, boolean visualize,
+                                                                       boolean storeStatus, boolean monitor) throws SpRuntimeException{
+    return new PipelineElementMigrationHandler(desiredPipeline, visualize, storeStatus, monitor).handlePipelineMigration();
   }
+
 }
diff --git a/streampipes-rest/src/main/java/org/apache/streampipes/rest/impl/Node.java b/streampipes-rest/src/main/java/org/apache/streampipes/rest/impl/Node.java
index d0f8c9f..e7db8e0 100644
--- a/streampipes-rest/src/main/java/org/apache/streampipes/rest/impl/Node.java
+++ b/streampipes-rest/src/main/java/org/apache/streampipes/rest/impl/Node.java
@@ -40,8 +40,7 @@ public class Node extends AbstractRestResource implements INode {
     @Produces(MediaType.APPLICATION_JSON)
     @Override
     public Response addNode(@PathParam("username") String username, NodeInfoDescription desc) {
-        NodeClusterManager.addNode(desc);
-        return statusMessage(Notifications.success(NotificationType.STORAGE_SUCCESS));
+        return statusMessage(NodeClusterManager.addOrRejoin(desc));
     }
 
     @PUT
@@ -51,7 +50,8 @@ public class Node extends AbstractRestResource implements INode {
     @Produces(MediaType.APPLICATION_JSON)
     @Override
     public Response updateNode(@PathParam("username") String username,
-                               @PathParam("nodeControllerId") String nodeControllerId, NodeInfoDescription desc) {
+                               @PathParam("nodeControllerId") String nodeControllerId,
+                               NodeInfoDescription desc) {
         return statusMessage(NodeClusterManager.updateNode(desc));
     }
 
diff --git a/streampipes-rest/src/main/java/org/apache/streampipes/rest/impl/PipelineResource.java b/streampipes-rest/src/main/java/org/apache/streampipes/rest/impl/PipelineResource.java
index 28c0dee..e8f005e 100644
--- a/streampipes-rest/src/main/java/org/apache/streampipes/rest/impl/PipelineResource.java
+++ b/streampipes-rest/src/main/java/org/apache/streampipes/rest/impl/PipelineResource.java
@@ -270,11 +270,12 @@ public class PipelineResource extends AbstractAuthGuardedRestResource {
   @JacksonSerialized
   @Operation(summary = "Migrate pipeline elements to new node",
           tags = {"Pipeline"})
-  public Response migratePipelineProcessors(@PathParam("username") String username,
-                                            @PathParam("pipelineId") String pipelineId,
-                                            Pipeline pipelineNew) {
+  public Response migratePipelineElements(@PathParam("username") String username,
+                                          @PathParam("pipelineId") String pipelineId,
+                                          Pipeline desiredPipeline) {
     try {
-      PipelineOperationStatus status = Operations.migratePipelineProcessors(pipelineNew, true, true, true);
+      PipelineOperationStatus status = Operations.handlePipelineElementMigration(desiredPipeline,
+              true, true, true);
       return ok(status);
     } catch (Exception e) {
       e.printStackTrace();
diff --git a/streampipes-serializers-json/src/main/java/org/apache/streampipes/serializers/json/GsonSerializer.java b/streampipes-serializers-json/src/main/java/org/apache/streampipes/serializers/json/GsonSerializer.java
index cf056c2..8a6f628 100644
--- a/streampipes-serializers-json/src/main/java/org/apache/streampipes/serializers/json/GsonSerializer.java
+++ b/streampipes-serializers-json/src/main/java/org/apache/streampipes/serializers/json/GsonSerializer.java
@@ -24,6 +24,8 @@ import com.google.gson.Gson;
 import com.google.gson.GsonBuilder;
 import org.apache.streampipes.model.*;
 import org.apache.streampipes.model.connect.rules.value.*;
+import org.apache.streampipes.model.eventrelay.SpDataStreamRelay;
+import org.apache.streampipes.model.eventrelay.SpDataStreamRelayContainer;
 import org.apache.streampipes.model.message.Message;
 import org.apache.streampipes.model.connect.adapter.*;
 import org.apache.streampipes.model.connect.rules.schema.CreateNestedRuleDescription;
diff --git a/streampipes-serializers-jsonld/src/main/java/org/apache/streampipes/serializers/jsonld/CustomAnnotationProvider.java b/streampipes-serializers-jsonld/src/main/java/org/apache/streampipes/serializers/jsonld/CustomAnnotationProvider.java
index b9ee0fa..d3da942 100644
--- a/streampipes-serializers-jsonld/src/main/java/org/apache/streampipes/serializers/jsonld/CustomAnnotationProvider.java
+++ b/streampipes-serializers-jsonld/src/main/java/org/apache/streampipes/serializers/jsonld/CustomAnnotationProvider.java
@@ -41,6 +41,8 @@ import org.apache.streampipes.model.dashboard.DashboardWidgetModel;
 import org.apache.streampipes.model.dashboard.VisualizablePipeline;
 import org.apache.streampipes.model.datalake.DataExplorerWidgetModel;
 import org.apache.streampipes.model.datalake.DataLakeMeasure;
+import org.apache.streampipes.model.eventrelay.SpDataStreamRelay;
+import org.apache.streampipes.model.eventrelay.SpDataStreamRelayContainer;
 import org.apache.streampipes.model.graph.*;
 import org.apache.streampipes.model.grounding.*;
 import org.apache.streampipes.model.monitoring.ElementStatusInfoSettings;
diff --git a/streampipes-serializers-jsonld/src/main/java/org/apache/streampipes/serializers/jsonld/JsonLdTransformer.java b/streampipes-serializers-jsonld/src/main/java/org/apache/streampipes/serializers/jsonld/JsonLdTransformer.java
index 55b4781..84ae05e 100644
--- a/streampipes-serializers-jsonld/src/main/java/org/apache/streampipes/serializers/jsonld/JsonLdTransformer.java
+++ b/streampipes-serializers-jsonld/src/main/java/org/apache/streampipes/serializers/jsonld/JsonLdTransformer.java
@@ -70,6 +70,7 @@ public class JsonLdTransformer implements RdfTransformer {
           StreamPipes.DASHBOARD_WIDGET_MODEL,
           StreamPipes.DASHBOARD_MODEL,
           StreamPipes.DATA_EXPLORER_WIDGET_MODEL,
+          StreamPipes.DATA_STREAM_RELAY,
           StreamPipes.DATA_STREAM_RELAY_CONTAINER,
           StreamPipes.NODE_INFO_DESCRIPTION,
           StreamPipes.DEPLOYMENT_CONTAINER,
diff --git a/streampipes-storage-api/src/main/java/org/apache/streampipes/storage/api/INoSqlStorage.java b/streampipes-storage-api/src/main/java/org/apache/streampipes/storage/api/INoSqlStorage.java
index 06fdfea..4d84cca 100644
--- a/streampipes-storage-api/src/main/java/org/apache/streampipes/storage/api/INoSqlStorage.java
+++ b/streampipes-storage-api/src/main/java/org/apache/streampipes/storage/api/INoSqlStorage.java
@@ -58,4 +58,6 @@ public interface INoSqlStorage {
   IPipelineElementTemplateStorage getPipelineElementTemplateStorage();
 
   INodeInfoStorage getNodeStorage();
+
+  INodeDataStreamRelay getNodeDataStreamRelayStorage();
 }
diff --git a/streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/management/pe/PipelineElementLifeCycle.java b/streampipes-storage-api/src/main/java/org/apache/streampipes/storage/api/INodeDataStreamRelay.java
similarity index 56%
copy from streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/management/pe/PipelineElementLifeCycle.java
copy to streampipes-storage-api/src/main/java/org/apache/streampipes/storage/api/INodeDataStreamRelay.java
index 0ad32ee..67e052b 100644
--- a/streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/management/pe/PipelineElementLifeCycle.java
+++ b/streampipes-storage-api/src/main/java/org/apache/streampipes/storage/api/INodeDataStreamRelay.java
@@ -15,19 +15,25 @@
  * limitations under the License.
  *
  */
-package org.apache.streampipes.node.controller.container.management.pe;
+package org.apache.streampipes.storage.api;
 
-import org.apache.streampipes.container.model.node.InvocableRegistration;
-import org.apache.streampipes.model.Response;
 
-public interface PipelineElementLifeCycle {
+import org.apache.streampipes.model.eventrelay.SpDataStreamRelayContainer;
 
-    void register(InvocableRegistration registration);
+import java.util.List;
+import java.util.Optional;
 
-    Response invoke(String endpoint, String payload);
+public interface INodeDataStreamRelay {
 
-    Response detach(String runningInstanceId);
+    List<SpDataStreamRelayContainer> getAll();
 
-    void unregister();
+    List<SpDataStreamRelayContainer> getAllByNodeControllerId(String id);
 
+    void addRelayContainer(SpDataStreamRelayContainer relayContainer);
+
+    Optional<SpDataStreamRelayContainer> getRelayContainerById(String id);
+
+    void updateRelayContainer(SpDataStreamRelayContainer relayContainer);
+
+    void deleteRelayContainer(SpDataStreamRelayContainer relayContainer);
 }
diff --git a/streampipes-storage-couchdb/src/main/java/org/apache/streampipes/storage/couchdb/CouchDbStorageManager.java b/streampipes-storage-couchdb/src/main/java/org/apache/streampipes/storage/couchdb/CouchDbStorageManager.java
index 269fb42..739c22b 100644
--- a/streampipes-storage-couchdb/src/main/java/org/apache/streampipes/storage/couchdb/CouchDbStorageManager.java
+++ b/streampipes-storage-couchdb/src/main/java/org/apache/streampipes/storage/couchdb/CouchDbStorageManager.java
@@ -120,5 +120,8 @@ public enum CouchDbStorageManager implements INoSqlStorage {
     return new NodeInfoStorageImpl();
   }
 
-
+  @Override
+  public INodeDataStreamRelay getNodeDataStreamRelayStorage() {
+    return new NodeDataStreamRelayImpl();
+  }
 }
diff --git a/streampipes-storage-couchdb/src/main/java/org/apache/streampipes/storage/couchdb/impl/NodeDataStreamRelayImpl.java b/streampipes-storage-couchdb/src/main/java/org/apache/streampipes/storage/couchdb/impl/NodeDataStreamRelayImpl.java
new file mode 100644
index 0000000..2564418
--- /dev/null
+++ b/streampipes-storage-couchdb/src/main/java/org/apache/streampipes/storage/couchdb/impl/NodeDataStreamRelayImpl.java
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.apache.streampipes.storage.couchdb.impl;
+
+import org.apache.streampipes.model.eventrelay.SpDataStreamRelayContainer;
+import org.apache.streampipes.storage.api.INodeDataStreamRelay;
+import org.apache.streampipes.storage.couchdb.dao.AbstractDao;
+import org.apache.streampipes.storage.couchdb.utils.Utils;
+
+import java.util.List;
+import java.util.Optional;
+import java.util.stream.Collectors;
+
+public class NodeDataStreamRelayImpl extends AbstractDao<SpDataStreamRelayContainer> implements INodeDataStreamRelay {
+
+    public NodeDataStreamRelayImpl() {
+        super(Utils::getCouchDbNodeDataStreamRelayClient, SpDataStreamRelayContainer.class);
+    }
+
+    @Override
+    public List<SpDataStreamRelayContainer> getAll() {
+        return findAll();
+    }
+
+    @Override
+    public List<SpDataStreamRelayContainer> getAllByNodeControllerId(String id) {
+        return getAll().stream()
+                .filter(e -> e.getDeploymentTargetNodeId().equals(id))
+                .collect(Collectors.toList());
+    }
+
+    @Override
+    public void addRelayContainer(SpDataStreamRelayContainer relayContainer) {
+        persist(relayContainer);
+    }
+
+    @Override
+    public Optional<SpDataStreamRelayContainer> getRelayContainerById(String s) {
+        return getAll().stream()
+                .filter(e -> e.getRunningStreamRelayInstanceId().equals(s))
+                .findAny();
+    }
+
+    @Override
+    public void updateRelayContainer(SpDataStreamRelayContainer relayContainer) {
+        Optional<SpDataStreamRelayContainer> rc =
+                getRelayContainerById(relayContainer.getRunningStreamRelayInstanceId());
+        if(rc.isPresent()) {
+            relayContainer.setCouchDbId(rc.get().getCouchDbId());
+            relayContainer.setCouchDbRev(rc.get().getCouchDbRev());
+
+            update(relayContainer);
+        }
+    }
+
+    @Override
+    public void deleteRelayContainer(SpDataStreamRelayContainer relayContainer) {
+        Optional<SpDataStreamRelayContainer> rc = getRelayContainerById(relayContainer.getRunningStreamRelayInstanceId());
+        rc.ifPresent(r -> delete(r.getCouchDbId()));
+    }
+}
diff --git a/streampipes-storage-couchdb/src/main/java/org/apache/streampipes/storage/couchdb/utils/Utils.java b/streampipes-storage-couchdb/src/main/java/org/apache/streampipes/storage/couchdb/utils/Utils.java
index c1464ba..be64b45 100644
--- a/streampipes-storage-couchdb/src/main/java/org/apache/streampipes/storage/couchdb/utils/Utils.java
+++ b/streampipes-storage-couchdb/src/main/java/org/apache/streampipes/storage/couchdb/utils/Utils.java
@@ -144,6 +144,12 @@ public class Utils {
     return dbClient;
   }
 
+  public static CouchDbClient getCouchDbNodeDataStreamRelayClient() {
+    CouchDbClient dbClient = new CouchDbClient(props("relays"));
+    dbClient.setGsonBuilder(GsonSerializer.getGsonBuilder());
+    return dbClient;
+  }
+
   public static CouchDbClient getCouchDbInternalUsersClient() {
     CouchDbClient dbClient = new CouchDbClient(props("_users"));
     return dbClient;
diff --git a/ui/src/app/core-model/gen/streampipes-model.ts b/ui/src/app/core-model/gen/streampipes-model.ts
index f34d68a..2587943 100644
--- a/ui/src/app/core-model/gen/streampipes-model.ts
+++ b/ui/src/app/core-model/gen/streampipes-model.ts
@@ -19,10 +19,10 @@
 /* tslint:disable */
 /* eslint-disable */
 // @ts-nocheck
-// Generated using typescript-generator version 2.27.744 on 2021-02-23 09:17:52.
+// Generated using typescript-generator version 2.27.744 on 2021-02-26 19:57:32.
 
 export class AbstractStreamPipesEntity {
-    "@class": "org.apache.streampipes.model.base.AbstractStreamPipesEntity" | "org.apache.streampipes.model.base.NamedStreamPipesEntity" | "org.apache.streampipes.model.connect.adapter.AdapterDescription" | "org.apache.streampipes.model.connect.adapter.AdapterSetDescription" | "org.apache.streampipes.model.connect.adapter.GenericAdapterSetDescription" | "org.apache.streampipes.model.connect.adapter.SpecificAdapterSetDescription" | "org.apache.streampipes.model.connect.adapter.AdapterStre [...]
+    "@class": "org.apache.streampipes.model.base.AbstractStreamPipesEntity" | "org.apache.streampipes.model.base.NamedStreamPipesEntity" | "org.apache.streampipes.model.connect.adapter.AdapterDescription" | "org.apache.streampipes.model.connect.adapter.AdapterSetDescription" | "org.apache.streampipes.model.connect.adapter.GenericAdapterSetDescription" | "org.apache.streampipes.model.connect.adapter.SpecificAdapterSetDescription" | "org.apache.streampipes.model.connect.adapter.AdapterStre [...]
     elementId: string;
 
     static fromData(data: AbstractStreamPipesEntity, target?: AbstractStreamPipesEntity): AbstractStreamPipesEntity {
@@ -119,7 +119,7 @@ export class Accuracy extends EventPropertyQualityDefinition {
 }
 
 export class NamedStreamPipesEntity extends AbstractStreamPipesEntity {
-    "@class": "org.apache.streampipes.model.base.NamedStreamPipesEntity" | "org.apache.streampipes.model.connect.adapter.AdapterDescription" | "org.apache.streampipes.model.connect.adapter.AdapterSetDescription" | "org.apache.streampipes.model.connect.adapter.GenericAdapterSetDescription" | "org.apache.streampipes.model.connect.adapter.SpecificAdapterSetDescription" | "org.apache.streampipes.model.connect.adapter.AdapterStreamDescription" | "org.apache.streampipes.model.connect.adapter.G [...]
+    "@class": "org.apache.streampipes.model.base.NamedStreamPipesEntity" | "org.apache.streampipes.model.connect.adapter.AdapterDescription" | "org.apache.streampipes.model.connect.adapter.AdapterSetDescription" | "org.apache.streampipes.model.connect.adapter.GenericAdapterSetDescription" | "org.apache.streampipes.model.connect.adapter.SpecificAdapterSetDescription" | "org.apache.streampipes.model.connect.adapter.AdapterStreamDescription" | "org.apache.streampipes.model.connect.adapter.G [...]
     appId: string;
     applicationLinks: ApplicationLink[];
     connectedTo: string[];
@@ -151,8 +151,8 @@ export class NamedStreamPipesEntity extends AbstractStreamPipesEntity {
         instance.applicationLinks = __getCopyArrayFn(ApplicationLink.fromData)(data.applicationLinks);
         instance.internallyManaged = data.internallyManaged;
         instance.connectedTo = __getCopyArrayFn(__identity<string>())(data.connectedTo);
-        instance.dom = data.dom;
         instance.uri = data.uri;
+        instance.dom = data.dom;
         return instance;
     }
 }
@@ -1824,8 +1824,8 @@ export class GenericAdapterSetDescription extends AdapterSetDescription implemen
         }
         const instance = target || new GenericAdapterSetDescription();
         super.fromData(data, instance);
-        instance.formatDescription = FormatDescription.fromData(data.formatDescription);
         instance.protocolDescription = ProtocolDescription.fromData(data.protocolDescription);
+        instance.formatDescription = FormatDescription.fromData(data.formatDescription);
         instance.eventSchema = EventSchema.fromData(data.eventSchema);
         return instance;
     }
@@ -1843,8 +1843,8 @@ export class GenericAdapterStreamDescription extends AdapterStreamDescription im
         }
         const instance = target || new GenericAdapterStreamDescription();
         super.fromData(data, instance);
-        instance.formatDescription = FormatDescription.fromData(data.formatDescription);
         instance.protocolDescription = ProtocolDescription.fromData(data.protocolDescription);
+        instance.formatDescription = FormatDescription.fromData(data.formatDescription);
         instance.eventSchema = EventSchema.fromData(data.eventSchema);
         return instance;
     }
@@ -3074,7 +3074,7 @@ export class SpDataSet extends SpDataStream {
 }
 
 export class SpDataStreamRelay extends NamedStreamPipesEntity {
-    "@class": "org.apache.streampipes.model.SpDataStreamRelay";
+    "@class": "org.apache.streampipes.model.eventrelay.SpDataStreamRelay";
     eventGrounding: EventGrounding;
 
     static fromData(data: SpDataStreamRelay, target?: SpDataStreamRelay): SpDataStreamRelay {
diff --git a/ui/src/app/pipelines/dialog/pipeline-status/pipeline-status-dialog.component.ts b/ui/src/app/pipelines/dialog/pipeline-status/pipeline-status-dialog.component.ts
index 4014443..8c27f41 100644
--- a/ui/src/app/pipelines/dialog/pipeline-status/pipeline-status-dialog.component.ts
+++ b/ui/src/app/pipelines/dialog/pipeline-status/pipeline-status-dialog.component.ts
@@ -29,7 +29,7 @@ import {Component, Input} from "@angular/core";
 export class PipelineStatusDialogComponent {
 
     statusDetailsVisible: any;
-    elementStati : [[PipelineElementStatus]];
+    elementStati : Array<Array<PipelineElementStatus>>;
 
     @Input()
     pipelineOperationStatus: PipelineOperationStatus;
@@ -40,18 +40,15 @@ export class PipelineStatusDialogComponent {
     }
 
     ngOnInit(){
-        console.log(this.pipelineOperationStatus.elementStatus)
-        let nodes: [String];
-        nodes = [];
+        let nodes = [];
         this.pipelineOperationStatus.elementStatus.forEach(stat => {
             if (!nodes.includes(stat.elementNode)){
                 nodes.push(stat.elementNode)
             }
         })
-        console.log(nodes)
+
         nodes.forEach(node =>{
-            let nodeStati : [PipelineElementStatus];
-            nodeStati = [];
+            let nodeStati = [];
             this.pipelineOperationStatus.elementStatus.forEach(stat =>{
                 if(stat.elementNode == node){
                     nodeStati.push(stat);
@@ -59,7 +56,6 @@ export class PipelineStatusDialogComponent {
             })
             this.elementStati.push(nodeStati);
         })
-        console.log(this.elementStati)
     }
 
     close() {


[incubator-streampipes] 02/02: Merge branch 'dev' into edge-extensions

Posted by wi...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

wiener pushed a commit to branch edge-extensions
in repository https://gitbox.apache.org/repos/asf/incubator-streampipes.git

commit 6a7c1e63f777da2292647b983614a830904537f2
Merge: afacf89 ff43f77
Author: Patrick Wiener <wi...@fzi.de>
AuthorDate: Thu Mar 4 01:42:46 2021 +0100

    Merge branch 'dev' into edge-extensions

 .../src/main/resources/openapi.yaml                |   2 +-
 streampipes-backend/src/main/resources/shiro.ini   |   4 +-
 .../container/master/rest/SourcesResource.java     |   4 +-
 .../template/PipelineElementTemplateConfig.java    |   8 +
 .../monitoring/pipeline/TopicInfoCollector.java    |   4 -
 .../template/PipelineElementTemplateHandler.java   |   7 +-
 .../template/PipelineElementTemplateVisitor.java   |  95 +++--
 .../apache/streampipes/rest/impl/AutoComplete.java |  31 +-
 ...AbstractConfigurablePipelineElementBuilder.java |  18 +-
 .../org/apache/streampipes/vocabulary/Geo.java     |   7 +
 .../apache/streampipes/vocabulary/Geonames.java    |  13 +
 .../java/org/apache/streampipes/vocabulary/SO.java | 439 +++++++++++++++++++++
 .../apache/streampipes/vocabulary/SPSensor.java    |  14 +
 .../{Geonames.java => SemanticTypeRegistry.java}   |  39 +-
 .../event-schema/event-schema.component.html       |   6 +-
 .../adapter-started-dialog.component.html          |  31 +-
 .../adapter-started-dialog.component.scss          |   9 +-
 .../adapter-started-dialog.component.ts            |   4 +-
 .../edit-event-property.component.html             |   9 +-
 .../edit-event-property.component.ts               |  20 +-
 ui/src/app/connect/services/rest.service.ts        |   6 +-
 .../components/grid/dashboard-grid.component.ts    |   3 +
 .../components/panel/dashboard-panel.component.ts  |   4 +-
 .../widgets/table/table-widget.component.css       |   9 +-
 .../widgets/table/table-widget.component.html      |   6 +-
 .../widgets/table/table-widget.component.ts        |   3 +-
 .../pipeline-element-template-generator.ts         |  54 ++-
 .../apis/semantic-types.service.ts                 |  23 +-
 ui/src/app/platform-services/platform.module.ts    |   5 +-
 29 files changed, 774 insertions(+), 103 deletions(-)

diff --cc streampipes-backend/src/main/resources/shiro.ini
index f4c9ed8,3d348a1..76d1046
--- a/streampipes-backend/src/main/resources/shiro.ini
+++ b/streampipes-backend/src/main/resources/shiro.ini
@@@ -67,12 -67,8 +67,10 @@@ securityManager.rememberMeManager.cooki
  /api/v2/users/*/labeling/category/* = anon
  /api/v2/users/*/labeling/* = anon
  /api/v2/users/*/labeling = anon
 +/api/v2/users/*/nodes = anon
 +/api/v2/users/*/nodes/* = anon
  /api/v2/connect/*/master/administration = anon
- /api/v2/connect/*/master/sources/* = anon
- /api/v2/connect/*/master/sources/*/streams = anon
- /api/v2/connect/*/master/sources/*/streams/* = anon
+ /api/v2/connect/*/master/sources/** = anon
  /api/v2/connect/*/master/resolvable/*/configurations = anon
  /api/openapi.json = anon
  /api/** = authcBearer, customFilter
diff --cc ui/src/app/platform-services/platform.module.ts
index 9d90ac7,44c8bea..4bc0738
--- a/ui/src/app/platform-services/platform.module.ts
+++ b/ui/src/app/platform-services/platform.module.ts
@@@ -40,7 -40,7 +41,9 @@@ import {SemanticTypesService} from "./a
      PipelineElementService,
      PipelineMonitoringService,
      PipelineService,
-     NodeService
++    NodeService,
++    PipelineService,
+     SemanticTypesService
    ],
    entryComponents: []
  })