You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@streampipes.apache.org by wi...@apache.org on 2021/03/04 00:43:06 UTC

[incubator-streampipes] 01/02: [WIP] refactoring pipeline element migration and node controller sync on rejoin

This is an automated email from the ASF dual-hosted git repository.

wiener pushed a commit to branch edge-extensions
in repository https://gitbox.apache.org/repos/asf/incubator-streampipes.git

commit afacf8916c4433f233a34b883dd6dc5209c4eddb
Author: Patrick Wiener <wi...@fzi.de>
AuthorDate: Thu Mar 4 01:40:02 2021 +0100

    [WIP] refactoring pipeline element migration and node controller sync on rejoin
---
 .../model/{ => eventrelay}/SpDataStreamRelay.java  |   2 +-
 .../SpDataStreamRelayContainer.java                |  58 ++-
 .../model/eventrelay}/metrics/RelayMetrics.java    |   2 +-
 .../model/graph/DataProcessorDescription.java      |   2 +-
 .../model/graph/DataProcessorInvocation.java       |   2 +-
 .../model/message/NotificationType.java            |   5 +-
 .../pipeline/PipelineElementMigrationEntity.java   |  51 ++
 .../org/apache/streampipes/model/util/Cloner.java  |   2 +-
 streampipes-node-controller-container/pom.xml      |   4 +
 ...yResource.java => DataStreamRelayResource.java} |   8 +-
 .../container/api/InvocableEntityResource.java     |  55 +--
 .../api/NodeControllerResourceConfig.java          |   4 +-
 ...ource.java => NodeInfoDescriptionResource.java} |   4 +-
 .../container/management/node/NodeManager.java     |   5 +-
 .../management/pe/InvocableElementManager.java     | 111 +++--
 .../management/pe/PipelineElementLifeCycle.java    |   3 +-
 .../management/relay/DataStreamRelayManager.java   | 108 ++---
 .../container/management/relay/EventRelay.java     |   6 +-
 .../relay/bridges/MultiBrokerBridge.java           |   3 +-
 .../manager/execution/http/ElementSubmitter.java   | 162 +++++++
 .../manager/execution/http/GraphSubmitter.java     | 208 ++++-----
 .../manager/execution/http/HttpRequestBuilder.java |   9 +-
 .../manager/execution/http/MigrationHelpers.java   |  75 ---
 .../manager/execution/http/PipelineExecutor.java   | 513 ---------------------
 .../http/StreamRelayEndpointUrlGenerator.java      |   2 +-
 .../pipeline/AbstractPipelineExecutor.java         | 458 ++++++++++++++++++
 .../execution/pipeline/PipelineExecutor.java       | 110 +++++
 .../pipeline/PipelineMigrationExecutor.java        | 349 ++++++++++++++
 .../pipeline/PipelineMigrationHelpers.java         |  15 +-
 .../{http => pipeline}/PipelineStorageService.java |   2 +-
 .../execution/pipeline/migration/Command.java      |  17 +-
 .../migration/PipelineElementStartCommand.java     |  21 +-
 .../migration/PipelineElementStopCommand.java      |  20 +-
 .../pipeline/migration/RelayStartCommand.java      |  20 +-
 .../pipeline/migration/RelayStopCommand.java       |  20 +-
 .../manager/matching/InvocationGraphBuilder.java   |   2 +-
 .../migration/PipelineElementMigrationHandler.java | 147 ++++++
 .../manager/node/AbstractClusterManager.java       | 112 +++--
 .../manager/node/AvailableNodesFetcher.java        |   1 +
 .../manager/node/NodeClusterManager.java           | 112 ++++-
 .../streampipes/manager/node/NodeSyncOptions.java  |  16 +-
 .../streampipes/manager/operations/Operations.java |  51 +-
 .../org/apache/streampipes/rest/impl/Node.java     |   6 +-
 .../streampipes/rest/impl/PipelineResource.java    |   9 +-
 .../serializers/json/GsonSerializer.java           |   2 +
 .../jsonld/CustomAnnotationProvider.java           |   2 +
 .../serializers/jsonld/JsonLdTransformer.java      |   1 +
 .../streampipes/storage/api/INoSqlStorage.java     |   2 +
 .../storage/api/INodeDataStreamRelay.java          |  22 +-
 .../storage/couchdb/CouchDbStorageManager.java     |   5 +-
 .../couchdb/impl/NodeDataStreamRelayImpl.java      |  76 +++
 .../streampipes/storage/couchdb/utils/Utils.java   |   6 +
 ui/src/app/core-model/gen/streampipes-model.ts     |  14 +-
 .../pipeline-status-dialog.component.ts            |  12 +-
 54 files changed, 1910 insertions(+), 1124 deletions(-)

diff --git a/streampipes-model/src/main/java/org/apache/streampipes/model/SpDataStreamRelay.java b/streampipes-model/src/main/java/org/apache/streampipes/model/eventrelay/SpDataStreamRelay.java
similarity index 98%
rename from streampipes-model/src/main/java/org/apache/streampipes/model/SpDataStreamRelay.java
rename to streampipes-model/src/main/java/org/apache/streampipes/model/eventrelay/SpDataStreamRelay.java
index addbd55..0078b18 100644
--- a/streampipes-model/src/main/java/org/apache/streampipes/model/SpDataStreamRelay.java
+++ b/streampipes-model/src/main/java/org/apache/streampipes/model/eventrelay/SpDataStreamRelay.java
@@ -15,7 +15,7 @@
  * limitations under the License.
  *
  */
-package org.apache.streampipes.model;
+package org.apache.streampipes.model.eventrelay;
 
 import io.fogsy.empire.annotations.RdfProperty;
 import io.fogsy.empire.annotations.RdfsClass;
diff --git a/streampipes-model/src/main/java/org/apache/streampipes/model/SpDataStreamRelayContainer.java b/streampipes-model/src/main/java/org/apache/streampipes/model/eventrelay/SpDataStreamRelayContainer.java
similarity index 66%
rename from streampipes-model/src/main/java/org/apache/streampipes/model/SpDataStreamRelayContainer.java
rename to streampipes-model/src/main/java/org/apache/streampipes/model/eventrelay/SpDataStreamRelayContainer.java
index cb1cd40..9db3117 100644
--- a/streampipes-model/src/main/java/org/apache/streampipes/model/SpDataStreamRelayContainer.java
+++ b/streampipes-model/src/main/java/org/apache/streampipes/model/eventrelay/SpDataStreamRelayContainer.java
@@ -15,12 +15,16 @@
  * limitations under the License.
  *
  */
-package org.apache.streampipes.model;
+package org.apache.streampipes.model.eventrelay;
 
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.google.gson.annotations.SerializedName;
 import io.fogsy.empire.annotations.RdfProperty;
 import io.fogsy.empire.annotations.RdfsClass;
 import org.apache.commons.lang.RandomStringUtils;
+import org.apache.streampipes.model.SpDataStream;
 import org.apache.streampipes.model.base.NamedStreamPipesEntity;
+import org.apache.streampipes.model.graph.DataProcessorInvocation;
 import org.apache.streampipes.model.grounding.EventGrounding;
 import org.apache.streampipes.vocabulary.StreamPipes;
 
@@ -39,6 +43,13 @@ public class SpDataStreamRelayContainer extends NamedStreamPipesEntity {
     private static final long serialVersionUID = -4675162465357705480L;
 
     private static final String prefix = "urn:apache.org:relaystreamcontainer:";
+    private static final String RELAY_SUFFIX = "(Stream Relay)";
+
+    @JsonProperty("_id")
+    private @SerializedName("_id") String couchDbId;
+
+    @JsonProperty("_rev")
+    private @SerializedName("_rev") String couchDbRev;
 
     @OneToOne(fetch = FetchType.EAGER,
             cascade = {CascadeType.PERSIST, CascadeType.MERGE})
@@ -77,6 +88,31 @@ public class SpDataStreamRelayContainer extends NamedStreamPipesEntity {
         this.eventRelayStrategy = eventRelayStrategy;
     }
 
+    public SpDataStreamRelayContainer(DataProcessorInvocation desc) {
+        super(desc.getElementId());
+        this.setName(makeRelayName(desc.getName()));
+        this.setEventRelayStrategy(desc.getEventRelayStrategy());
+        this.setRunningStreamRelayInstanceId(desc.getDeploymentRunningInstanceId());
+        this.setInputGrounding(new EventGrounding(desc.getOutputStream().getEventGrounding()));
+        this.setOutputStreamRelays(desc.getOutputStreamRelays());
+        this.setDeploymentTargetNodeHostname(desc.getDeploymentTargetNodeHostname());
+        this.setDeploymentTargetNodePort(desc.getDeploymentTargetNodePort());
+        this.setDeploymentTargetNodeId(desc.getDeploymentTargetNodeId());
+    }
+
+    public SpDataStreamRelayContainer(String runningStreamRelayInstanceId, String eventRelayStrategy, SpDataStream desc,
+                                      List<SpDataStreamRelay> dataStreamRelays) {
+        super(desc.getElementId());
+        this.setRunningStreamRelayInstanceId(runningStreamRelayInstanceId);
+        this.setEventRelayStrategy(eventRelayStrategy);
+        this.setName(makeRelayName(desc.getName()));
+        this.setInputGrounding(new EventGrounding(desc.getEventGrounding()));
+        this.setDeploymentTargetNodeId(desc.getDeploymentTargetNodeId());
+        this.setDeploymentTargetNodeHostname(desc.getDeploymentTargetNodeHostname());
+        this.setDeploymentTargetNodePort(desc.getDeploymentTargetNodePort());
+        this.setOutputStreamRelays(dataStreamRelays);
+    }
+
     public SpDataStreamRelayContainer(NamedStreamPipesEntity other) {
         super(other);
     }
@@ -136,4 +172,24 @@ public class SpDataStreamRelayContainer extends NamedStreamPipesEntity {
     public void setDeploymentTargetNodePort(Integer deploymentTargetNodePort) {
         this.deploymentTargetNodePort = deploymentTargetNodePort;
     }
+
+    private String makeRelayName(String name) {
+        return name + " " + RELAY_SUFFIX;
+    }
+
+    public String getCouchDbId() {
+        return couchDbId;
+    }
+
+    public void setCouchDbId(String couchDbId) {
+        this.couchDbId = couchDbId;
+    }
+
+    public String getCouchDbRev() {
+        return couchDbRev;
+    }
+
+    public void setCouchDbRev(String couchDbRev) {
+        this.couchDbRev = couchDbRev;
+    }
 }
diff --git a/streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/management/relay/metrics/RelayMetrics.java b/streampipes-model/src/main/java/org/apache/streampipes/model/eventrelay/metrics/RelayMetrics.java
similarity index 96%
rename from streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/management/relay/metrics/RelayMetrics.java
rename to streampipes-model/src/main/java/org/apache/streampipes/model/eventrelay/metrics/RelayMetrics.java
index 6a019ab..e70667a 100644
--- a/streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/management/relay/metrics/RelayMetrics.java
+++ b/streampipes-model/src/main/java/org/apache/streampipes/model/eventrelay/metrics/RelayMetrics.java
@@ -15,7 +15,7 @@
  * limitations under the License.
  *
  */
-package org.apache.streampipes.node.controller.container.management.relay.metrics;
+package org.apache.streampipes.model.eventrelay.metrics;
 
 import org.apache.streampipes.model.grounding.TransportProtocol;
 
diff --git a/streampipes-model/src/main/java/org/apache/streampipes/model/graph/DataProcessorDescription.java b/streampipes-model/src/main/java/org/apache/streampipes/model/graph/DataProcessorDescription.java
index ac43c71..cee32d4 100644
--- a/streampipes-model/src/main/java/org/apache/streampipes/model/graph/DataProcessorDescription.java
+++ b/streampipes-model/src/main/java/org/apache/streampipes/model/graph/DataProcessorDescription.java
@@ -21,7 +21,7 @@ package org.apache.streampipes.model.graph;
 import io.fogsy.empire.annotations.RdfProperty;
 import io.fogsy.empire.annotations.RdfsClass;
 import org.apache.streampipes.model.SpDataStream;
-import org.apache.streampipes.model.SpDataStreamRelay;
+import org.apache.streampipes.model.eventrelay.SpDataStreamRelay;
 import org.apache.streampipes.model.base.ConsumableStreamPipesEntity;
 import org.apache.streampipes.model.output.OutputStrategy;
 import org.apache.streampipes.model.staticproperty.StaticProperty;
diff --git a/streampipes-model/src/main/java/org/apache/streampipes/model/graph/DataProcessorInvocation.java b/streampipes-model/src/main/java/org/apache/streampipes/model/graph/DataProcessorInvocation.java
index 5c3236b..1e7a4d6 100644
--- a/streampipes-model/src/main/java/org/apache/streampipes/model/graph/DataProcessorInvocation.java
+++ b/streampipes-model/src/main/java/org/apache/streampipes/model/graph/DataProcessorInvocation.java
@@ -21,7 +21,7 @@ package org.apache.streampipes.model.graph;
 import io.fogsy.empire.annotations.RdfProperty;
 import io.fogsy.empire.annotations.RdfsClass;
 import org.apache.streampipes.model.SpDataStream;
-import org.apache.streampipes.model.SpDataStreamRelay;
+import org.apache.streampipes.model.eventrelay.SpDataStreamRelay;
 import org.apache.streampipes.model.base.InvocableStreamPipesEntity;
 import org.apache.streampipes.model.output.OutputStrategy;
 import org.apache.streampipes.model.staticproperty.StaticProperty;
diff --git a/streampipes-model/src/main/java/org/apache/streampipes/model/message/NotificationType.java b/streampipes-model/src/main/java/org/apache/streampipes/model/message/NotificationType.java
index 4a1c81e..d7bf388 100644
--- a/streampipes-model/src/main/java/org/apache/streampipes/model/message/NotificationType.java
+++ b/streampipes-model/src/main/java/org/apache/streampipes/model/message/NotificationType.java
@@ -68,7 +68,10 @@ public enum NotificationType {
 	INSTALLATION_SUCCESSFUL("Installation successful", ""), 
 	
 	PROPERTY_FILE_WRITTEN("Writing properties file...", ""), 
-	ADMIN_USER_CREATED("Creating admin user...", "");
+	ADMIN_USER_CREATED("Creating admin user...", ""),
+
+	NODE_JOIN_SUCCESS("Success", "Node successfully joined"),
+	NODE_JOIN_ERROR("Error", "Node could not be joined");
 
 
 	private final String title;
diff --git a/streampipes-model/src/main/java/org/apache/streampipes/model/pipeline/PipelineElementMigrationEntity.java b/streampipes-model/src/main/java/org/apache/streampipes/model/pipeline/PipelineElementMigrationEntity.java
new file mode 100644
index 0000000..1973d6e
--- /dev/null
+++ b/streampipes-model/src/main/java/org/apache/streampipes/model/pipeline/PipelineElementMigrationEntity.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.apache.streampipes.model.pipeline;
+
+import org.apache.streampipes.model.base.InvocableStreamPipesEntity;
+
+public class PipelineElementMigrationEntity {
+
+    private InvocableStreamPipesEntity sourceElement;
+    private InvocableStreamPipesEntity targetElement;
+
+    public PipelineElementMigrationEntity() {
+    }
+
+    public PipelineElementMigrationEntity(InvocableStreamPipesEntity sourceElement,
+                                          InvocableStreamPipesEntity targetElement) {
+        this.sourceElement = sourceElement;
+        this.targetElement = targetElement;
+    }
+
+    public InvocableStreamPipesEntity getSourceElement() {
+        return sourceElement;
+    }
+
+    public void setSourceElement(InvocableStreamPipesEntity sourceElement) {
+        this.sourceElement = sourceElement;
+    }
+
+    public InvocableStreamPipesEntity getTargetElement() {
+        return targetElement;
+    }
+
+    public void setTargetElement(InvocableStreamPipesEntity targetElement) {
+        this.targetElement = targetElement;
+    }
+}
diff --git a/streampipes-model/src/main/java/org/apache/streampipes/model/util/Cloner.java b/streampipes-model/src/main/java/org/apache/streampipes/model/util/Cloner.java
index 2ac64de..dc92659 100644
--- a/streampipes-model/src/main/java/org/apache/streampipes/model/util/Cloner.java
+++ b/streampipes-model/src/main/java/org/apache/streampipes/model/util/Cloner.java
@@ -18,7 +18,7 @@
 
 package org.apache.streampipes.model.util;
 
-import org.apache.streampipes.model.SpDataStreamRelay;
+import org.apache.streampipes.model.eventrelay.SpDataStreamRelay;
 import org.apache.streampipes.model.grounding.*;
 import org.apache.streampipes.model.output.*;
 import org.apache.streampipes.model.resource.Hardware;
diff --git a/streampipes-node-controller-container/pom.xml b/streampipes-node-controller-container/pom.xml
index 4076064..d5e0f49 100644
--- a/streampipes-node-controller-container/pom.xml
+++ b/streampipes-node-controller-container/pom.xml
@@ -131,6 +131,10 @@
             <groupId>org.apache.kafka</groupId>
             <artifactId>kafka-clients</artifactId>
         </dependency>
+        <dependency>
+            <groupId>com.fasterxml.jackson.module</groupId>
+            <artifactId>jackson-module-jaxb-annotations</artifactId>
+        </dependency>
     </dependencies>
 
     <build>
diff --git a/streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/api/AdapterDataStreamRelayResource.java b/streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/api/DataStreamRelayResource.java
similarity index 82%
rename from streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/api/AdapterDataStreamRelayResource.java
rename to streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/api/DataStreamRelayResource.java
index 00bb34a..0883a68 100644
--- a/streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/api/AdapterDataStreamRelayResource.java
+++ b/streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/api/DataStreamRelayResource.java
@@ -17,7 +17,7 @@
  */
 package org.apache.streampipes.node.controller.container.api;
 
-import org.apache.streampipes.model.SpDataStreamRelayContainer;
+import org.apache.streampipes.model.eventrelay.SpDataStreamRelayContainer;
 import org.apache.streampipes.node.controller.container.management.relay.DataStreamRelayManager;
 import org.apache.streampipes.rest.shared.annotation.JacksonSerialized;
 
@@ -25,7 +25,7 @@ import javax.ws.rs.*;
 import javax.ws.rs.core.MediaType;
 
 @Path("/api/v2/node/stream/relay")
-public class AdapterDataStreamRelayResource extends AbstractResource {
+public class DataStreamRelayResource extends AbstractResource {
 
     @POST
     @JacksonSerialized
@@ -33,7 +33,7 @@ public class AdapterDataStreamRelayResource extends AbstractResource {
     @Consumes(MediaType.APPLICATION_JSON)
     @Produces(MediaType.APPLICATION_JSON)
     public javax.ws.rs.core.Response invoke(SpDataStreamRelayContainer graph) {
-        return ok(DataStreamRelayManager.getInstance().startAdapterDataStreamRelay(graph));
+        return ok(DataStreamRelayManager.getInstance().start(graph));
     }
 
     @DELETE
@@ -41,6 +41,6 @@ public class AdapterDataStreamRelayResource extends AbstractResource {
     @Consumes(MediaType.APPLICATION_JSON)
     @Produces(MediaType.APPLICATION_JSON)
     public javax.ws.rs.core.Response detach(@PathParam("runningInstanceId") String runningInstanceId) {
-        return ok(DataStreamRelayManager.getInstance().stopAdapterDataStreamRelay(runningInstanceId));
+        return ok(DataStreamRelayManager.getInstance().stop(runningInstanceId));
     }
 }
diff --git a/streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/api/InvocableEntityResource.java b/streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/api/InvocableEntityResource.java
index 57f8c29..6028438 100644
--- a/streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/api/InvocableEntityResource.java
+++ b/streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/api/InvocableEntityResource.java
@@ -21,12 +21,9 @@ import com.fasterxml.jackson.core.JsonProcessingException;
 import org.apache.streampipes.commons.exceptions.SpRuntimeException;
 import org.apache.streampipes.container.model.node.InvocableRegistration;
 import org.apache.streampipes.model.Response;
-import org.apache.streampipes.model.SpDataStreamRelayContainer;
 import org.apache.streampipes.model.base.InvocableStreamPipesEntity;
-import org.apache.streampipes.model.graph.DataProcessorInvocation;
 import org.apache.streampipes.node.controller.container.management.pe.InvocableElementManager;
 import org.apache.streampipes.node.controller.container.management.pe.RunningInvocableInstances;
-import org.apache.streampipes.node.controller.container.management.relay.DataStreamRelayManager;
 import org.apache.streampipes.rest.shared.annotation.JacksonSerialized;
 import org.apache.streampipes.serializers.json.JacksonSerializer;
 import org.slf4j.Logger;
@@ -34,12 +31,6 @@ import org.slf4j.LoggerFactory;
 
 import javax.ws.rs.*;
 import javax.ws.rs.core.*;
-import java.lang.annotation.Annotation;
-import java.net.URI;
-import java.util.Date;
-import java.util.Locale;
-import java.util.Map;
-import java.util.Set;
 
 @Path("/api/v2/node/element")
 public class InvocableEntityResource extends AbstractResource {
@@ -62,26 +53,22 @@ public class InvocableEntityResource extends AbstractResource {
     @Produces(MediaType.APPLICATION_JSON)
     public javax.ws.rs.core.Response invoke(@PathParam("identifier") String identifier,
                                             @PathParam("elementId") String elementId, InvocableStreamPipesEntity graph) {
-        String endpoint;
 
         if (identifier.equals(DATA_PROCESSOR_PREFIX)) {
-            endpoint = graph.getBelongsTo();
-            DataStreamRelayManager.getInstance().startPipelineElementDataStreamRelay((DataProcessorInvocation) graph);
-            Response resp = InvocableElementManager.getInstance().invoke(endpoint, toJson(graph));
-            if (resp.isSuccess()) {
+            Response elementResponse = InvocableElementManager.getInstance().invoke(graph);
+            if (elementResponse.isSuccess()) {
                 RunningInvocableInstances.INSTANCE.add(graph.getDeploymentRunningInstanceId(), graph);
             }
-            return ok(resp);
+            return ok(elementResponse);
         }
         // Currently no data sinks are registered at node controller. If we, at some point, want to also run data
         // sinks on edge nodes we need to register there Declarer at the node controller one startup.
         else if (identifier.equals(DATA_SINK_PREFIX)) {
-            endpoint = graph.getBelongsTo();
-            Response resp = InvocableElementManager.getInstance().invoke(endpoint, toJson(graph));
-            if (resp.isSuccess()) {
+            Response elementResponse = InvocableElementManager.getInstance().invoke(graph);
+            if (elementResponse.isSuccess()) {
                 RunningInvocableInstances.INSTANCE.add(graph.getDeploymentRunningInstanceId(), graph);
             }
-            return ok(resp);
+            return ok(elementResponse);
         }
 
         return ok();
@@ -96,37 +83,7 @@ public class InvocableEntityResource extends AbstractResource {
         String endpoint = RunningInvocableInstances.INSTANCE.get(runningInstanceId).getBelongsTo();
         Response resp = InvocableElementManager.getInstance().detach(endpoint + SLASH + runningInstanceId);
         RunningInvocableInstances.INSTANCE.remove(runningInstanceId);
-        DataStreamRelayManager.getInstance().stopPipelineElementDataStreamRelay(runningInstanceId);
 
         return ok(resp);
     }
-
-
-    @DELETE
-    @Path("{identifier}/{elementId}/{runningInstanceId}/relay")
-    @Produces(MediaType.APPLICATION_JSON)
-    public Response detachRelay(@PathParam("identifier") String identifier,
-                                            @PathParam("elementId") String elementId,
-                                            @PathParam("runningInstanceId") String runningInstanceId) {
-        return DataStreamRelayManager.getInstance().stopDataStreamRelay(runningInstanceId);
-    }
-
-    @POST
-    @Path("{identifier}/{elementId}/{runningInstanceId}/relay")
-    @Produces(MediaType.APPLICATION_JSON)
-    public Response invokeRelay(@PathParam("identifier") String identifier,
-                                @PathParam("elementId") String elementId,
-                                @PathParam("runningInstanceId") String runningInstanceId,
-                                SpDataStreamRelayContainer relay) {
-        return DataStreamRelayManager.getInstance().startDataStreamRelay(relay, runningInstanceId);
-    }
-
-    private String toJson(InvocableStreamPipesEntity graph) {
-        try {
-            return JacksonSerializer.getObjectMapper().writeValueAsString(graph);
-        } catch (JsonProcessingException e) {
-            e.printStackTrace();
-        }
-        throw new SpRuntimeException("Could not serialize object: " + graph);
-    }
 }
diff --git a/streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/api/NodeControllerResourceConfig.java b/streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/api/NodeControllerResourceConfig.java
index 5fcd3dc..ac277e8 100644
--- a/streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/api/NodeControllerResourceConfig.java
+++ b/streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/api/NodeControllerResourceConfig.java
@@ -25,9 +25,9 @@ public class NodeControllerResourceConfig extends ResourceConfig {
 
     public NodeControllerResourceConfig() {
         register(HealthCheckResource.class);
-        register(InfoStatusResource.class);
+        register(NodeInfoDescriptionResource.class);
         register(InvocableEntityResource.class);
-        register(AdapterDataStreamRelayResource.class);
+        register(DataStreamRelayResource.class);
         register(ConnectResource.class);
         register(ContainerResource.class);
     }
diff --git a/streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/api/InfoStatusResource.java b/streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/api/NodeInfoDescriptionResource.java
similarity index 97%
rename from streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/api/InfoStatusResource.java
rename to streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/api/NodeInfoDescriptionResource.java
index 7e7b36a..6bbe2f5 100644
--- a/streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/api/InfoStatusResource.java
+++ b/streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/api/NodeInfoDescriptionResource.java
@@ -28,7 +28,7 @@ import javax.ws.rs.core.MediaType;
 import javax.ws.rs.core.Response;
 
 @Path("/api/v2/node/info")
-public class InfoStatusResource extends AbstractResource {
+public class NodeInfoDescriptionResource extends AbstractResource {
 
     private static final String ACTIVATE = "activate";
     private static final String DEACTIVATE = "deactivate";
@@ -70,6 +70,6 @@ public class InfoStatusResource extends AbstractResource {
     @Path("/relays")
     @Produces(MediaType.APPLICATION_JSON)
     public Response getAllRelays() {
-        return ok(DataStreamRelayManager.getInstance().getAllRelays());
+        return ok(DataStreamRelayManager.getInstance().getAllRelaysMetrics());
     }
 }
diff --git a/streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/management/node/NodeManager.java b/streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/management/node/NodeManager.java
index d9ec40d..0f01751 100644
--- a/streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/management/node/NodeManager.java
+++ b/streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/management/node/NodeManager.java
@@ -127,13 +127,13 @@ public class NodeManager {
     }
 
     public Message activate() {
-        LOG.info("Deactivate node controller");
+        LOG.info("Activate node controller");
         this.nodeInfo.setActive(true);
         return Notifications.success(NotificationType.OPERATION_SUCCESS);
     }
 
     public Message deactivate() {
-        LOG.info("Activate node controller");
+        LOG.info("Deactivate node controller");
         this.nodeInfo.setActive(false);
         return Notifications.success(NotificationType.OPERATION_SUCCESS);
     }
@@ -151,6 +151,7 @@ public class NodeManager {
         SuccessMessage message = JacksonSerializer
                 .getObjectMapper()
                 .readValue(resp, SuccessMessage.class);
+        LOG.info(message.getNotifications().get(0).getDescription());
         return message.isSuccess();
     }
 
diff --git a/streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/management/pe/InvocableElementManager.java b/streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/management/pe/InvocableElementManager.java
index 4db36b3..9891ee6 100644
--- a/streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/management/pe/InvocableElementManager.java
+++ b/streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/management/pe/InvocableElementManager.java
@@ -17,12 +17,16 @@
  */
 package org.apache.streampipes.node.controller.container.management.pe;
 
+import com.fasterxml.jackson.core.JsonProcessingException;
 import com.google.gson.JsonSyntaxException;
 import org.apache.http.client.fluent.Request;
 import org.apache.http.entity.ContentType;
 import org.apache.http.entity.StringEntity;
+import org.apache.streampipes.commons.exceptions.SpRuntimeException;
 import org.apache.streampipes.container.model.node.InvocableRegistration;
 import org.apache.streampipes.model.Response;
+import org.apache.streampipes.model.base.InvocableStreamPipesEntity;
+import org.apache.streampipes.model.node.NodeInfoDescription;
 import org.apache.streampipes.node.controller.container.config.NodeControllerConfig;
 import org.apache.streampipes.node.controller.container.management.node.NodeManager;
 import org.apache.streampipes.serializers.json.JacksonSerializer;
@@ -31,6 +35,7 @@ import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
 import java.util.Collections;
+import java.util.List;
 
 public class InvocableElementManager implements PipelineElementLifeCycle {
 
@@ -58,50 +63,19 @@ public class InvocableElementManager implements PipelineElementLifeCycle {
 
     @Override
     public void register(InvocableRegistration registration) {
-        try {
-            Request.Put(makeConsulRegistrationEndpoint())
-                    .addHeader("accept", "application/json")
-                    .body(new StringEntity(JacksonSerializer
-                            .getObjectMapper()
-                            .writeValueAsString(registration.getConsulServiceRegistrationBody())))
-                    .execute();
-
-            // TODO: persistent storage to survive failures
-            NodeManager.getInstance()
-                    .retrieveNodeInfoDescription()
-                    .setSupportedElements(registration.getSupportedPipelineElementAppIds());
-
-            String url = "http://"
-                            + NodeControllerConfig.INSTANCE.getBackendHost()
-                            + ":"
-                            + NodeControllerConfig.INSTANCE.getBackendPort()
-                            + "/"
-                            + "streampipes-backend/api/v2/users/admin@streampipes.org/nodes"
-                            + "/"
-                            + NodeControllerConfig.INSTANCE.getNodeControllerId();
-
-            String desc = JacksonSerializer.getObjectMapper()
-                    .writeValueAsString(NodeManager.getInstance().retrieveNodeInfoDescription());
-
-            Request.Put(url)
-                    .bodyString(desc, ContentType.APPLICATION_JSON)
-//                    .connectTimeout(1000)
-//                    .socketTimeout(100000)
-                    .execute();
-
-            LOG.info("Successfully registered pipeline element container");
-        } catch (IOException e) {
-            e.printStackTrace();
-        }
+        registerAtConsul(registration);
+        updateAndSyncNodeInfoDescription(registration);
+        LOG.info("Successfully registered pipeline element container");
     }
 
     @Override
-    public Response invoke(String endpoint, String payload) {
+    public Response invoke(InvocableStreamPipesEntity graph) {
+        String endpoint = graph.getBelongsTo();
         LOG.info("Invoke pipeline element: {}", endpoint);
         try {
             org.apache.http.client.fluent.Response httpResp = Request
                     .Post(endpoint)
-                    .bodyString(payload, ContentType.APPLICATION_JSON)
+                    .bodyString(toJson(graph), ContentType.APPLICATION_JSON)
                     .connectTimeout(CONNECT_TIMEOUT)
                     .execute();
             return handleResponse(httpResp);
@@ -129,29 +103,58 @@ public class InvocableElementManager implements PipelineElementLifeCycle {
     @Override
     public void unregister(){
         // TODO: unregister element from Consul and
+        setSupportedPipelineElements(Collections.emptyList());
+        try {
+            String url = generateBackendEndpoint();
+            String desc = toJson(getNodeInfoDescription());
+            Request.Put(url)
+                    .bodyString(desc, ContentType.APPLICATION_JSON)
+                    .execute();
+        } catch (IOException e) {
+            e.printStackTrace();
+        }
+    }
+
+    private void updateAndSyncNodeInfoDescription(InvocableRegistration registration) {
+        setSupportedPipelineElements(registration.getSupportedPipelineElementAppIds());
+        try {
+            String url = generateBackendEndpoint();
+            String desc = toJson(getNodeInfoDescription());
+            Request.Put(url)
+                    .bodyString(desc, ContentType.APPLICATION_JSON)
+                    .execute();
+        } catch (IOException e) {
+            e.printStackTrace();
+        }
+    }
+
+    private NodeInfoDescription getNodeInfoDescription() {
+        return NodeManager.getInstance().retrieveNodeInfoDescription();
+    }
+
+    private void setSupportedPipelineElements(List<String> supportedPipelineElements) {
         NodeManager.getInstance()
                 .retrieveNodeInfoDescription()
-                .setSupportedElements(Collections.emptyList());
+                .setSupportedElements(supportedPipelineElements);
+    }
 
-        String url = "http://"
+    private String generateBackendEndpoint() {
+        return HTTP_PROTOCOL
                 + NodeControllerConfig.INSTANCE.getBackendHost()
-                + ":"
+                + COLON
                 + NodeControllerConfig.INSTANCE.getBackendPort()
-                + "/"
+                + SLASH
                 + "streampipes-backend/api/v2/users/admin@streampipes.org/nodes"
-                + "/"
+                + SLASH
                 + NodeControllerConfig.INSTANCE.getNodeControllerId();
+    }
 
+    private void registerAtConsul(InvocableRegistration registration) {
         try {
-            String desc = JacksonSerializer.getObjectMapper()
-                    .writeValueAsString(NodeManager.getInstance().retrieveNodeInfoDescription());
-
-            Request.Put(url)
-                    .bodyString(desc, ContentType.APPLICATION_JSON)
-                    .connectTimeout(1000)
-                    .socketTimeout(100000)
+            Request.Put(makeConsulRegistrationEndpoint())
+                    .addHeader("accept", "application/json")
+                    .body(new StringEntity(toJson(registration.getConsulServiceRegistrationBody())))
                     .execute();
-
         } catch (IOException e) {
             e.printStackTrace();
         }
@@ -183,4 +186,12 @@ public class InvocableElementManager implements PipelineElementLifeCycle {
         }
     }
 
+    private <T> String toJson(T element) {
+        try {
+            return JacksonSerializer.getObjectMapper().writeValueAsString(element);
+        } catch (JsonProcessingException e) {
+            throw new SpRuntimeException("Could not serialize object: " + element, e);
+        }
+    }
+
 }
diff --git a/streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/management/pe/PipelineElementLifeCycle.java b/streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/management/pe/PipelineElementLifeCycle.java
index 0ad32ee..107c717 100644
--- a/streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/management/pe/PipelineElementLifeCycle.java
+++ b/streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/management/pe/PipelineElementLifeCycle.java
@@ -19,12 +19,13 @@ package org.apache.streampipes.node.controller.container.management.pe;
 
 import org.apache.streampipes.container.model.node.InvocableRegistration;
 import org.apache.streampipes.model.Response;
+import org.apache.streampipes.model.base.InvocableStreamPipesEntity;
 
 public interface PipelineElementLifeCycle {
 
     void register(InvocableRegistration registration);
 
-    Response invoke(String endpoint, String payload);
+    Response invoke(InvocableStreamPipesEntity graph);
 
     Response detach(String runningInstanceId);
 
diff --git a/streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/management/relay/DataStreamRelayManager.java b/streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/management/relay/DataStreamRelayManager.java
index 638e9bc..0fbe538 100644
--- a/streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/management/relay/DataStreamRelayManager.java
+++ b/streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/management/relay/DataStreamRelayManager.java
@@ -17,16 +17,17 @@
  */
 package org.apache.streampipes.node.controller.container.management.relay;
 
+import org.apache.streampipes.commons.exceptions.SpRuntimeException;
 import org.apache.streampipes.model.Response;
-import org.apache.streampipes.model.SpDataStreamRelay;
-import org.apache.streampipes.model.SpDataStreamRelayContainer;
+import org.apache.streampipes.model.base.InvocableStreamPipesEntity;
+import org.apache.streampipes.model.eventrelay.SpDataStreamRelay;
+import org.apache.streampipes.model.eventrelay.SpDataStreamRelayContainer;
 import org.apache.streampipes.model.graph.DataProcessorInvocation;
 import org.apache.streampipes.model.grounding.TransportProtocol;
-import org.apache.streampipes.node.controller.container.management.relay.metrics.RelayMetrics;
+import org.apache.streampipes.model.eventrelay.metrics.RelayMetrics;
 
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
+
+import java.util.*;
 import java.util.stream.Collectors;
 
 public class DataStreamRelayManager {
@@ -45,80 +46,65 @@ public class DataStreamRelayManager {
         return instance;
     }
 
-    public Response startAdapterDataStreamRelay(SpDataStreamRelayContainer desc) {
-        String strategy = desc.getEventRelayStrategy();
-        String runningInstanceId = desc.getRunningStreamRelayInstanceId();
-        TransportProtocol source = desc.getInputGrounding().getTransportProtocol();
-
-        Map<String, EventRelay> eventRelayMap = new HashMap<>();
-
-        desc.getOutputStreamRelays().forEach(r -> {
-            TransportProtocol target = r.getEventGrounding().getTransportProtocol();
-            EventRelay eventRelay = new EventRelay(source, target, strategy);
-            eventRelay.start();
-            eventRelayMap.put(r.getElementId(), eventRelay);
-        });
-        RunningRelayInstances.INSTANCE.add(desc.getRunningStreamRelayInstanceId(), eventRelayMap);
-        return new Response(runningInstanceId,true,"");
-    }
-
-    public Response stopAdapterDataStreamRelay(String id) {
-        return stopDataStreamRelay(id);
+    public Response start(InvocableStreamPipesEntity graph) {
+        return start(convert(graph));
     }
 
-
-    public Response startDataStreamRelay(SpDataStreamRelayContainer desc, String id) {
+    public Response start(SpDataStreamRelayContainer desc) {
         String strategy = desc.getEventRelayStrategy();
+        String id = desc.getRunningStreamRelayInstanceId();
         TransportProtocol source = desc.getInputGrounding().getTransportProtocol();
 
         Map<String, EventRelay> eventRelayMap = new HashMap<>();
 
-        desc.getOutputStreamRelays().forEach(r -> {
-            TransportProtocol target = r.getEventGrounding().getTransportProtocol();
-            EventRelay eventRelay = new EventRelay(source, target, strategy);
-            eventRelay.start();
-            eventRelayMap.put(r.getElementId(), eventRelay);
+        // start data stream relay
+        // 1:1 mapping -> remote forward
+        // 1:n mamping -> remote fan-out
+        desc.getOutputStreamRelays().forEach(relay -> {
+            // add new relay if not running
+            if(!runningRelayExists(id, relay)) {
+                TransportProtocol target = relay.getEventGrounding().getTransportProtocol();
+                EventRelay eventRelay = new EventRelay(source, target, strategy);
+                eventRelay.start();
+                eventRelayMap.put(relay.getElementId(), eventRelay);
+            }
         });
-        RunningRelayInstances.INSTANCE.add(id, eventRelayMap);
+        RunningRelayInstances.INSTANCE.add(desc.getRunningStreamRelayInstanceId(), eventRelayMap);
         return new Response(id,true,"");
     }
 
-    public Response stopDataStreamRelay(String id) {
-        Map<String, EventRelay> relay = RunningRelayInstances.INSTANCE.get(id);
-        if (relay != null) {
-            relay.values().forEach(EventRelay::stop);
+    public Response stop(String id) {
+        Map<String, EventRelay> relays = RunningRelayInstances.INSTANCE.get(id);
+        if (relays != null) {
+            relays.values().forEach(EventRelay::stop);
         }
         RunningRelayInstances.INSTANCE.remove(id);
         return new Response(id, true, "");
     }
 
-    public void startPipelineElementDataStreamRelay(DataProcessorInvocation graph) {
-        TransportProtocol source = graph
-                .getOutputStream()
-                .getEventGrounding()
-                .getTransportProtocol();
-
-        String strategy = graph.getEventRelayStrategy();
-        Map<String, EventRelay> eventRelayMap = new HashMap<>();
-
-        List<SpDataStreamRelay> dataStreamRelays = graph.getOutputStreamRelays();
-        dataStreamRelays.forEach(r -> {
-            TransportProtocol target = r.getEventGrounding().getTransportProtocol();
-            EventRelay eventRelay = new EventRelay(source, target, strategy);
-            eventRelay.start();
-            eventRelayMap.put(r.getElementId(), eventRelay);
-        });
-        RunningRelayInstances.INSTANCE.add(graph.getDeploymentRunningInstanceId(), eventRelayMap);
-    }
-
-    public void stopPipelineElementDataStreamRelay(String id) {
-        stopDataStreamRelay(id);
-    }
-
-    public List<RelayMetrics> getAllRelays() {
+    public List<RelayMetrics> getAllRelaysMetrics() {
        return RunningRelayInstances.INSTANCE.getRunningInstances()
                     .stream()
                     .map(EventRelay::getRelayMetrics)
                     .collect(Collectors.toList());
     }
+
+    private boolean runningRelayExists(String id, SpDataStreamRelay relay) {
+        Map<String, EventRelay> relays = RunningRelayInstances.INSTANCE.get(id);
+        if (relays != null) {
+            return relays.keySet().stream()
+                    .anyMatch(eventRelay -> eventRelay.equals(relay.getElementId()));
+        } else {
+            return false;
+        }
+    }
+
+    // Helpers
+
+    private SpDataStreamRelayContainer convert(InvocableStreamPipesEntity graph) {
+        if (graph instanceof DataProcessorInvocation) {
+            return new SpDataStreamRelayContainer((DataProcessorInvocation) graph);
+        }
+        throw new SpRuntimeException("Could not convert pipeline element description to data stream relay");
+    }
 }
diff --git a/streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/management/relay/EventRelay.java b/streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/management/relay/EventRelay.java
index 7c52867..f22a815 100644
--- a/streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/management/relay/EventRelay.java
+++ b/streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/management/relay/EventRelay.java
@@ -19,7 +19,7 @@ package org.apache.streampipes.node.controller.container.management.relay;
 
 import org.apache.streampipes.commons.exceptions.SpRuntimeException;
 import org.apache.streampipes.model.grounding.*;
-import org.apache.streampipes.node.controller.container.management.relay.metrics.RelayMetrics;
+import org.apache.streampipes.model.eventrelay.metrics.RelayMetrics;
 
 public class EventRelay extends BaseEventRelay {
 
@@ -29,11 +29,11 @@ public class EventRelay extends BaseEventRelay {
         super(source, target, DEFAULT_EVENT_RELAY_STRATEGY);
     }
 
-    public EventRelay(TransportProtocol source, TransportProtocol target, String relayStrategy) {
+    public EventRelay(TransportProtocol source, TransportProtocol target, String relayStrategy)  {
         super(source, target, relayStrategy);
     }
 
-    public void start() throws SpRuntimeException{
+    public void start() throws SpRuntimeException {
         this.multiBrokerBridge.start();
     }
 
diff --git a/streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/management/relay/bridges/MultiBrokerBridge.java b/streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/management/relay/bridges/MultiBrokerBridge.java
index cc7c0c5..09b9057 100644
--- a/streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/management/relay/bridges/MultiBrokerBridge.java
+++ b/streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/management/relay/bridges/MultiBrokerBridge.java
@@ -21,10 +21,9 @@ import org.apache.streampipes.commons.exceptions.SpRuntimeException;
 import org.apache.streampipes.messaging.EventConsumer;
 import org.apache.streampipes.messaging.EventProducer;
 import org.apache.streampipes.messaging.EventRelay;
-import org.apache.streampipes.model.Response;
 import org.apache.streampipes.model.grounding.*;
+import org.apache.streampipes.model.eventrelay.metrics.RelayMetrics;
 import org.apache.streampipes.node.controller.container.config.NodeControllerConfig;
-import org.apache.streampipes.node.controller.container.management.relay.metrics.RelayMetrics;
 import org.apache.streampipes.sdk.helpers.Tuple3;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
diff --git a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/http/ElementSubmitter.java b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/http/ElementSubmitter.java
new file mode 100644
index 0000000..57715b9
--- /dev/null
+++ b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/http/ElementSubmitter.java
@@ -0,0 +1,162 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.apache.streampipes.manager.execution.http;
+
+import org.apache.streampipes.model.SpDataSet;
+import org.apache.streampipes.model.base.InvocableStreamPipesEntity;
+import org.apache.streampipes.model.base.NamedStreamPipesEntity;
+import org.apache.streampipes.model.eventrelay.SpDataStreamRelay;
+import org.apache.streampipes.model.eventrelay.SpDataStreamRelayContainer;
+import org.apache.streampipes.model.pipeline.PipelineElementStatus;
+import org.apache.streampipes.model.pipeline.PipelineOperationStatus;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.List;
+import java.util.Optional;
+
+public abstract class ElementSubmitter {
+    protected final static Logger LOG = LoggerFactory.getLogger(ElementSubmitter.class);
+
+    protected final String pipelineId;
+    protected final String pipelineName;
+    protected final List<InvocableStreamPipesEntity> graphs;
+    protected final List<SpDataSet> dataSets;
+    protected final List<SpDataStreamRelayContainer> relays;
+
+    public ElementSubmitter(String pipelineId, String pipelineName, List<InvocableStreamPipesEntity> graphs,
+                            List<SpDataSet> dataSets, List<SpDataStreamRelayContainer> relays) {
+        this.pipelineId = pipelineId;
+        this.pipelineName = pipelineName;
+        this.graphs = graphs;
+        this.dataSets = dataSets;
+        this.relays = relays;
+    }
+
+    protected abstract PipelineOperationStatus invokePipelineElementsAndRelays();
+
+    protected abstract PipelineOperationStatus detachPipelineElementsAndRelays();
+
+    protected abstract PipelineOperationStatus invokeRelaysOnMigration();
+
+    protected abstract PipelineOperationStatus detachRelaysOnMigration();
+
+
+    protected void invokePipelineElements(PipelineOperationStatus status) {
+        graphs.forEach(graph ->
+                status.addPipelineElementStatus(makeInvokeHttpRequest(new InvocableEntityUrlGenerator(graph), graph)));
+    }
+
+    protected void detachPipelineElements(PipelineOperationStatus status) {
+        graphs.forEach(graph ->
+                status.addPipelineElementStatus(makeDetachHttpRequest(new InvocableEntityUrlGenerator(graph), graph)));
+    }
+
+    protected void invokeDataSets(PipelineOperationStatus status) {
+        dataSets.forEach(dataset ->
+                status.addPipelineElementStatus(makeInvokeHttpRequest(new DataSetEntityUrlGenerator(dataset), dataset)));
+    }
+
+    protected void detachDataSets(PipelineOperationStatus status) {
+        dataSets.forEach(dataset ->
+                status.addPipelineElementStatus(makeDetachHttpRequest(new DataSetEntityUrlGenerator(dataset), dataset)));
+    }
+
+    protected void invokeRelays(PipelineOperationStatus status) {
+        relays.forEach(relay ->
+                status.addPipelineElementStatus(makeInvokeHttpRequest(new StreamRelayEndpointUrlGenerator(relay), relay)));
+    }
+
+    protected void detachRelays(PipelineOperationStatus status) {
+        relays.forEach(relay ->
+                status.addPipelineElementStatus(makeDetachHttpRequest(new StreamRelayEndpointUrlGenerator(relay), relay)));
+    }
+
+
+    protected PipelineElementStatus makeInvokeHttpRequest(EndpointUrlGenerator<?> urlGenerator,
+                                                        NamedStreamPipesEntity entity) {
+        if (entity instanceof SpDataStreamRelay) {
+            // data stream relays
+            return new HttpRequestBuilder(entity, urlGenerator.generateRelayEndpoint()).invoke();
+        } else {
+            // data sets, data processors, data sinks
+            return new HttpRequestBuilder(entity, urlGenerator.generateInvokeEndpoint()).invoke();
+        }
+    }
+
+    protected PipelineElementStatus makeDetachHttpRequest(EndpointUrlGenerator<?> urlGenerator,
+                                                        NamedStreamPipesEntity entity) {
+        if (entity instanceof SpDataStreamRelay) {
+            // data stream relays
+            return new HttpRequestBuilder(entity, urlGenerator.generateRelayEndpoint()).detach();
+        } else {
+            // data sets, data processors, data sinks
+            return new HttpRequestBuilder(entity, urlGenerator.generateDetachEndpoint()).detach();
+        }
+    }
+
+    protected PipelineOperationStatus verifyPipelineOperationStatus(PipelineOperationStatus status, String successMessage,
+                                                                  String errorMessage, boolean rollbackIfFailed) {
+        status.setSuccess(status.getElementStatus().stream()
+                .allMatch(PipelineElementStatus::isSuccess));
+
+        if (status.isSuccess()) {
+            status.setTitle(successMessage);
+        } else {
+            if (rollbackIfFailed) {
+                LOG.info("Could not start pipeline, initializing rollback...");
+                rollbackInvokedEntities(status);
+            }
+            status.setTitle(errorMessage);
+        }
+        return status;
+    }
+
+    private void rollbackInvokedEntities(PipelineOperationStatus status) {
+        for (PipelineElementStatus s : status.getElementStatus()) {
+            if (s.isSuccess()) {
+                Optional<InvocableStreamPipesEntity> graphs = findGraphs(s.getElementId());
+                graphs.ifPresent(graph -> {
+                    LOG.info("Rolling back element " + graph.getElementId());
+                    makeDetachHttpRequest(new InvocableEntityUrlGenerator(graph), graph);
+                });
+            }
+        }
+    }
+
+    // Helper methods
+
+    protected PipelineOperationStatus initPipelineOperationStatus() {
+        PipelineOperationStatus status = new PipelineOperationStatus();
+        status.setPipelineId(pipelineId);
+        status.setPipelineName(pipelineName);
+        return status;
+    }
+
+    private Optional<InvocableStreamPipesEntity> findGraphs(String elementId) {
+        return graphs.stream().filter(i -> i.getBelongsTo().equals(elementId)).findFirst();
+    }
+
+    protected boolean allInvocableEntitiesRunning(PipelineOperationStatus status) {
+        return status.getElementStatus().stream().allMatch(PipelineElementStatus::isSuccess);
+    }
+
+    protected boolean relaysExist() {
+        return relays.stream().anyMatch(s -> s.getOutputStreamRelays().size() > 0);
+    }
+}
diff --git a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/http/GraphSubmitter.java b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/http/GraphSubmitter.java
index ab14532..726d1a6 100644
--- a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/http/GraphSubmitter.java
+++ b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/http/GraphSubmitter.java
@@ -18,88 +18,57 @@
 
 package org.apache.streampipes.manager.execution.http;
 
-import org.apache.streampipes.model.SpDataStreamRelayContainer;
+import org.apache.streampipes.model.eventrelay.SpDataStreamRelayContainer;
 import org.apache.streampipes.model.base.NamedStreamPipesEntity;
 import org.apache.streampipes.model.graph.DataProcessorInvocation;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 import org.apache.streampipes.model.SpDataSet;
 import org.apache.streampipes.model.base.InvocableStreamPipesEntity;
-import org.apache.streampipes.model.pipeline.PipelineElementStatus;
 import org.apache.streampipes.model.pipeline.PipelineOperationStatus;
 
+import java.util.ArrayList;
 import java.util.List;
 import java.util.Map;
-import java.util.Optional;
 
-public class GraphSubmitter {
+public class GraphSubmitter extends ElementSubmitter {
 
-  private final static Logger LOG = LoggerFactory.getLogger(GraphSubmitter.class);
-
-  private final List<InvocableStreamPipesEntity> graphs;
-  private final List<SpDataSet> dataSets;
-  private final String pipelineId;
-  private final String pipelineName;
-  private final List<SpDataStreamRelayContainer> streamRelays;
-
-  public GraphSubmitter(String pipelineId, String pipelineName, List<InvocableStreamPipesEntity> graphs,
-                        List<SpDataSet> dataSets, List<SpDataStreamRelayContainer> streamRelays) {
-    this.graphs = graphs;
-    this.pipelineId = pipelineId;
-    this.pipelineName = pipelineName;
-    this.dataSets = dataSets;
-    this.streamRelays = streamRelays;
+  public GraphSubmitter(String pipelineId, String pipelineName) {
+    super(pipelineId, pipelineName, new ArrayList<>(), new ArrayList<>(), new ArrayList<>());
   }
 
-
-  public PipelineOperationStatus invokeRelays(Map<NamedStreamPipesEntity, SpDataStreamRelayContainer> relays) {
-    PipelineOperationStatus status = initPipelineOperationStatus();
-
-    relays.entrySet().forEach(e -> {
-      if(e.getValue().getOutputStreamRelays().size() > 0){
-          if(e.getKey() instanceof DataProcessorInvocation)
-            status.addPipelineElementStatus(makeHttpRequest(new InvocableEntityUrlGenerator((DataProcessorInvocation) e.getKey()), e.getValue(), "invokeRelay"));
-          else
-            status.addPipelineElementStatus(makeHttpRequest(new StreamRelayEndpointUrlGenerator(e.getValue()), e.getValue(), "invoke"));
-      }
-    });
-
-    return verifyPipelineOperationStatus(
-            status,
-            "Successfully started relays in pipeline " + pipelineName,
-            "Could not start relays in pipeline" + pipelineName,
-            true);
+  public GraphSubmitter(String pipelineId, String pipelineName, List<InvocableStreamPipesEntity> graphs) {
+    super(pipelineId, pipelineName, graphs, new ArrayList<>(), new ArrayList<>());
   }
 
-  public PipelineOperationStatus detachRelays(Map<NamedStreamPipesEntity, SpDataStreamRelayContainer> relays) {
-    PipelineOperationStatus status = initPipelineOperationStatus();
-
-    relays.entrySet().forEach(e -> {
-      if(e.getValue().getOutputStreamRelays().size() > 0){
-        if(e.getKey() instanceof DataProcessorInvocation)
-          status.addPipelineElementStatus(makeHttpRequest(new InvocableEntityUrlGenerator((DataProcessorInvocation) e.getKey()), e.getValue(), "detachRelay"));
-        else
-          status.addPipelineElementStatus(makeHttpRequest(new StreamRelayEndpointUrlGenerator(e.getValue()), e.getValue(), "detach"));
-      }
-    });
-
-    return verifyPipelineOperationStatus(
-            status,
-            "Successfully stopped relays in pipeline " + pipelineName,
-            "Could not stop all relays in pipeline " + pipelineName,
-            false);
+  public GraphSubmitter(String pipelineId, String pipelineName, List<InvocableStreamPipesEntity> graphs,
+                        List<SpDataSet> dataSets, List<SpDataStreamRelayContainer> relays) {
+    super(pipelineId, pipelineName, graphs, dataSets, relays);
   }
 
-  public PipelineOperationStatus invokeGraphs() {
+  /**
+   * Called when pipeline is started. This invokes all pipeline elements including relays between adjacent pipeline
+   * element nodes in the pipeline graph. Thereby, we do this along the following procedure:
+   *
+   * - start relays (if any)
+   * - start pipeline elements
+   * - start data sets (only if pipeline elements and relays started)
+   *
+   * We verify the status of this procedure and trigger a graceful rollback in case of any failure. In any case, we
+   * return an appropriate success/error message.
+   *
+   * @return PipelineOperationStatus
+   */
+  @Override
+  public PipelineOperationStatus invokePipelineElementsAndRelays() {
     PipelineOperationStatus status = initPipelineOperationStatus();
 
-    if (streamRelays.stream().anyMatch(s -> s.getOutputStreamRelays().size() > 0)) {
-      streamRelays.forEach(streamRelay -> invoke(new StreamRelayEndpointUrlGenerator(streamRelay), streamRelay, status));
+    if (relaysExist()) {
+      invokeRelays(status);
     }
-    graphs.forEach(graph -> invoke(new InvocableEntityUrlGenerator(graph), graph, status));
-    // only invoke datasets when following pipeline elements are started
+
+    invokePipelineElements(status);
+
     if (allInvocableEntitiesRunning(status)) {
-        dataSets.forEach(dataset -> invoke(new DataSetEntityUrlGenerator(dataset), dataset, status));
+      invokeDataSets(status);
     }
 
     return verifyPipelineOperationStatus(
@@ -109,13 +78,27 @@ public class GraphSubmitter {
             true);
   }
 
-  public PipelineOperationStatus detachGraphs() {
+  /**
+   * Called when pipeline is stopped. This detaches all pipeline elements including relays between adjacent pipeline
+   * element nodes in the pipeline graph. Thereby, we do this along the following procedure:
+   *
+   * - stop pipeline elements
+   * - stop data sets
+   * - stop relays (if any)
+   *
+   * We verify the status of this procedure and return the success/error message.
+   *
+   * @return PipelineOperationStatus
+   */
+  @Override
+  public PipelineOperationStatus detachPipelineElementsAndRelays() {
     PipelineOperationStatus status = initPipelineOperationStatus();
 
-    graphs.forEach(graph -> detach(new InvocableEntityUrlGenerator(graph), graph, status));
-    dataSets.forEach(dataset -> detach(new DataSetEntityUrlGenerator(dataset), dataset, status));
-    if (streamRelays.stream().anyMatch(s -> s.getOutputStreamRelays().size() > 0)) {
-      streamRelays.forEach(streamRelay -> detach(new StreamRelayEndpointUrlGenerator(streamRelay), streamRelay, status));
+    detachPipelineElements(status);
+    detachDataSets(status);
+
+    if (relaysExist()) {
+      detachRelays(status);
     }
 
     return verifyPipelineOperationStatus(
@@ -125,74 +108,41 @@ public class GraphSubmitter {
             false);
   }
 
-  private PipelineOperationStatus verifyPipelineOperationStatus(PipelineOperationStatus status, String successMessage,
-                                             String errorMessage, boolean rollbackIfFailed) {
-    status.setSuccess(status.getElementStatus().stream().allMatch(PipelineElementStatus::isSuccess));
-
-    if (status.isSuccess()) {
-      status.setTitle(successMessage);
-    } else {
-      if (rollbackIfFailed) {
-        LOG.info("Could not start pipeline, initializing rollback...");
-        rollbackInvokedEntities(status);
-      }
-      status.setTitle(errorMessage);
-    }
-    return status;
-  }
-
-  private void rollbackInvokedEntities(PipelineOperationStatus status) {
-    for (PipelineElementStatus s : status.getElementStatus()) {
-      if (s.isSuccess()) {
-        Optional<InvocableStreamPipesEntity> graphs = findGraphs(s.getElementId());
-        graphs.ifPresent(graph -> {
-          LOG.info("Rolling back element " + graph.getElementId());
-          makeHttpRequest(new InvocableEntityUrlGenerator(graph), graph, "detach");
-        });
-      }
-    }
-  }
+  /**
+   * Called when pipeline elements are migrated and new relays need to be invoked between adjacent pipeline elements
+   *
+   * @return PipelineOperationStatus
+   */
+  @Override
+  public PipelineOperationStatus invokeRelaysOnMigration() {
+    PipelineOperationStatus status = initPipelineOperationStatus();
 
-  private void invoke(EndpointUrlGenerator<?> urlGenerator,
-                      NamedStreamPipesEntity namedEntity, PipelineOperationStatus status) {
-    status.addPipelineElementStatus(makeHttpRequest(urlGenerator, namedEntity, "invoke"));
-  }
+    invokeRelays(status);
 
-  private void detach(EndpointUrlGenerator<?> urlGenerator,
-                      NamedStreamPipesEntity namedEntity, PipelineOperationStatus status) {
-    status.addPipelineElementStatus(makeHttpRequest(urlGenerator, namedEntity, "detach"));
+    return verifyPipelineOperationStatus(
+            status,
+            "Successfully started relays in pipeline " + pipelineName,
+            "Could not start relays in pipeline" + pipelineName,
+            true);
   }
 
-  // Helper methods
-
-  private PipelineOperationStatus initPipelineOperationStatus() {
-    PipelineOperationStatus status = new PipelineOperationStatus();
-    status.setPipelineId(pipelineId);
-    status.setPipelineName(pipelineName);
-    return status;
-  }
+  /**
+   * Called when pipeline elements are migrated and new relays need to be detached, e.g. from predecessors to old
+   * pipeline element
+   *
+   * @return PipelineOperationStatus
+   */
+  @Override
+  public PipelineOperationStatus detachRelaysOnMigration() {
+    PipelineOperationStatus status = initPipelineOperationStatus();
 
-  private PipelineElementStatus makeHttpRequest(EndpointUrlGenerator<?> urlGenerator,
-                                                NamedStreamPipesEntity namedEntity, String type) {
-    switch (type) {
-      case "invoke":
-        return new HttpRequestBuilder(namedEntity, urlGenerator.generateInvokeEndpoint()).invoke();
-      case "detach":
-        return new HttpRequestBuilder(namedEntity, urlGenerator.generateDetachEndpoint()).detach();
-      case "invokeRelay":
-        return new HttpRequestBuilder(namedEntity, urlGenerator.generateRelayEndpoint()).invoke();
-      case "detachRelay":
-        return new HttpRequestBuilder(namedEntity, urlGenerator.generateRelayEndpoint()).detach();
-      default:
-        throw new IllegalArgumentException("Type not known: " + type);
-    }
-  }
+    detachRelays(status);
 
-  private Optional<InvocableStreamPipesEntity> findGraphs(String elementId) {
-    return graphs.stream().filter(i -> i.getBelongsTo().equals(elementId)).findFirst();
+    return verifyPipelineOperationStatus(
+            status,
+            "Successfully stopped relays in pipeline " + pipelineName,
+            "Could not stop all relays in pipeline " + pipelineName,
+            false);
   }
 
-  private boolean allInvocableEntitiesRunning(PipelineOperationStatus status) {
-    return status.getElementStatus().stream().allMatch(PipelineElementStatus::isSuccess);
-  }
 }
diff --git a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/http/HttpRequestBuilder.java b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/http/HttpRequestBuilder.java
index 7097577..dc0fdd9 100644
--- a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/http/HttpRequestBuilder.java
+++ b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/http/HttpRequestBuilder.java
@@ -18,21 +18,20 @@
 
 package org.apache.streampipes.manager.execution.http;
 
-import com.fasterxml.jackson.core.JsonProcessingException;
-import com.google.gson.Gson;
+
 import com.google.gson.JsonSyntaxException;
 import org.apache.http.client.fluent.Request;
 import org.apache.http.client.fluent.Response;
 import org.apache.http.entity.ContentType;
-import org.apache.streampipes.commons.Utils;
-import org.apache.streampipes.model.SpDataStreamRelayContainer;
 import org.apache.streampipes.model.base.InvocableStreamPipesEntity;
 import org.apache.streampipes.model.base.NamedStreamPipesEntity;
+import org.apache.streampipes.model.eventrelay.SpDataStreamRelayContainer;
+import org.apache.streampipes.model.graph.DataProcessorInvocation;
 import org.apache.streampipes.model.pipeline.PipelineElementStatus;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.apache.streampipes.serializers.json.JacksonSerializer;
-import org.apache.streampipes.serializers.jsonld.JsonLdTransformer;
+
 
 import java.io.IOException;
 
diff --git a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/http/MigrationHelpers.java b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/http/MigrationHelpers.java
deleted file mode 100644
index 43aa454..0000000
--- a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/http/MigrationHelpers.java
+++ /dev/null
@@ -1,75 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-
-package org.apache.streampipes.manager.execution.http;
-
-import com.fasterxml.jackson.core.JsonProcessingException;
-import com.fasterxml.jackson.databind.ObjectMapper;
-import org.apache.streampipes.model.base.NamedStreamPipesEntity;
-import org.apache.streampipes.model.graph.DataProcessorInvocation;
-import org.apache.streampipes.model.pipeline.Pipeline;
-import org.apache.streampipes.model.pipeline.PipelineElementStatus;
-import org.apache.streampipes.model.pipeline.PipelineOperationStatus;
-import org.apache.streampipes.sdk.helpers.Tuple2;
-
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Optional;
-
-public class MigrationHelpers {
-
-    static public List<Tuple2<DataProcessorInvocation, DataProcessorInvocation>> getDelta(Pipeline pipelineX, Pipeline pipelineY){
-        List<Tuple2<DataProcessorInvocation, DataProcessorInvocation>> delta = new ArrayList<>();
-        pipelineX.getSepas().forEach(iX -> {
-            if (pipelineY.getSepas().stream().filter(iY -> iY.getElementId().equals(iX.getElementId()))
-                    .noneMatch(iY -> iY.getDeploymentTargetNodeId().equals(iX.getDeploymentTargetNodeId()))){
-                Optional<DataProcessorInvocation> invocationY = pipelineY.getSepas().stream().
-                        filter(iY -> iY.getDeploymentRunningInstanceId().equals(iX.getDeploymentRunningInstanceId())).findFirst();
-                invocationY.ifPresent(dataProcessorInvocation -> delta.add(new Tuple2<>(iX, dataProcessorInvocation)));
-            }
-        });
-        return delta;
-    }
-
-    static public PipelineOperationStatus verifyPipelineOperationStatus(PipelineOperationStatus status, String successMessage,
-                                                                         String errorMessage) {
-        //Duplicate from method in GraphSubmitter
-        status.setSuccess(status.getElementStatus().stream().allMatch(PipelineElementStatus::isSuccess));
-        if (status.isSuccess()) {
-            status.setTitle(successMessage);
-        } else {
-            status.setTitle(errorMessage);
-        }
-        return status;
-    }
-
-    public static void exchangePipelineElement(Pipeline exchangePipeline, Tuple2<? extends NamedStreamPipesEntity, ? extends NamedStreamPipesEntity> entity){
-
-        if (entity.a instanceof DataProcessorInvocation){
-            int index = exchangePipeline.getSepas().indexOf(entity.b);
-            exchangePipeline.getSepas().remove(index);
-            exchangePipeline.getSepas().add(index, (DataProcessorInvocation) entity.a);
-        }
-    }
-
-    public static Pipeline deepCopyPipeline(Pipeline object) throws JsonProcessingException {
-        ObjectMapper objectMapper = new ObjectMapper();
-        return objectMapper.readValue(objectMapper.writeValueAsString(object), Pipeline.class);
-    }
-
-}
diff --git a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/http/PipelineExecutor.java b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/http/PipelineExecutor.java
deleted file mode 100644
index 11ecbf4..0000000
--- a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/http/PipelineExecutor.java
+++ /dev/null
@@ -1,513 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-
-package org.apache.streampipes.manager.execution.http;
-
-import org.apache.streampipes.manager.data.PipelineGraph;
-import org.apache.streampipes.manager.data.PipelineGraphBuilder;
-import org.apache.streampipes.manager.data.PipelineGraphHelpers;
-import org.apache.streampipes.manager.matching.InvocationGraphBuilder;
-import org.apache.streampipes.model.SpDataStream;
-import org.apache.streampipes.model.SpDataStreamRelay;
-import org.apache.streampipes.model.SpDataStreamRelayContainer;
-import org.apache.streampipes.model.base.NamedStreamPipesEntity;
-import org.apache.streampipes.model.grounding.EventGrounding;
-import org.apache.streampipes.model.pipeline.PipelineElementStatus;
-import org.apache.streampipes.sdk.helpers.Tuple2;
-import org.apache.streampipes.manager.execution.status.PipelineStatusManager;
-import org.apache.streampipes.manager.util.TemporaryGraphStorage;
-import org.apache.streampipes.model.SpDataSet;
-import org.apache.streampipes.model.base.InvocableStreamPipesEntity;
-import org.apache.streampipes.model.graph.DataProcessorInvocation;
-import org.apache.streampipes.model.graph.DataSinkInvocation;
-import org.apache.streampipes.model.grounding.KafkaTransportProtocol;
-import org.apache.streampipes.model.message.PipelineStatusMessage;
-import org.apache.streampipes.model.message.PipelineStatusMessageType;
-import org.apache.streampipes.model.pipeline.Pipeline;
-import org.apache.streampipes.model.pipeline.PipelineOperationStatus;
-import org.apache.streampipes.model.staticproperty.SecretStaticProperty;
-import org.apache.streampipes.storage.api.IPipelineStorage;
-import org.apache.streampipes.storage.management.StorageDispatcher;
-import org.apache.streampipes.user.management.encryption.CredentialsManager;
-import org.lightcouch.DocumentConflictException;
-
-import java.security.GeneralSecurityException;
-import java.util.*;
-import java.util.concurrent.atomic.AtomicReference;
-import java.util.stream.Collectors;
-
-public class PipelineExecutor {
-
-  private Pipeline pipeline;
-  private boolean visualize;
-  private boolean storeStatus;
-  private boolean monitor;
-
-  public PipelineExecutor(Pipeline pipeline, boolean visualize, boolean storeStatus,
-                          boolean monitor) {
-    this.pipeline = pipeline;
-    this.visualize = visualize;
-    this.storeStatus = storeStatus;
-    this.monitor = monitor;
-  }
-
-  public PipelineOperationStatus startPipeline() {
-
-    pipeline.getSepas().forEach(this::updateGroupIds);
-    pipeline.getActions().forEach(this::updateGroupIds);
-
-    List<DataProcessorInvocation> sepas = pipeline.getSepas();
-    List<DataSinkInvocation> secs = pipeline.getActions();
-
-    List<SpDataSet> dataSets = pipeline.getStreams().stream().filter(s -> s instanceof SpDataSet).map(s -> new
-            SpDataSet((SpDataSet) s)).collect(Collectors.toList());
-
-    for (SpDataSet ds : dataSets) {
-      ds.setCorrespondingPipeline(pipeline.getPipelineId());
-    }
-
-    List<InvocableStreamPipesEntity> graphs = new ArrayList<>();
-    graphs.addAll(sepas);
-    graphs.addAll(secs);
-
-    List<InvocableStreamPipesEntity> decryptedGraphs = decryptSecrets(graphs);
-
-    graphs.forEach(g -> g.setStreamRequirements(Collections.emptyList()));
-
-    List<SpDataStreamRelayContainer> dataStreamRelayContainers = generateDataStreamRelays(decryptedGraphs);
-
-    PipelineOperationStatus status = new GraphSubmitter(
-            pipeline.getPipelineId(),
-            pipeline.getName(),
-            decryptedGraphs,
-            dataSets,
-            dataStreamRelayContainers).invokeGraphs();
-
-    if (status.isSuccess()) {
-      storeInvocationGraphs(pipeline.getPipelineId(), graphs, dataSets);
-
-      PipelineStatusManager.addPipelineStatus(
-              pipeline.getPipelineId(),
-              new PipelineStatusMessage(pipeline.getPipelineId(),
-                      System.currentTimeMillis(),
-                      PipelineStatusMessageType.PIPELINE_STARTED.title(),
-                      PipelineStatusMessageType.PIPELINE_STARTED.description()));
-
-      if (storeStatus) {
-        setPipelineStarted(pipeline);
-      }
-    }
-    return status;
-  }
-
-  public PipelineOperationStatus stopPipeline() {
-    List<InvocableStreamPipesEntity> graphs = TemporaryGraphStorage.graphStorage.get(pipeline.getPipelineId());
-    List<SpDataSet> dataSets = TemporaryGraphStorage.datasetStorage.get(pipeline.getPipelineId());
-
-    List<SpDataStreamRelayContainer> dataStreamRelayContainers = generateDataStreamRelays(graphs);
-
-    PipelineOperationStatus status = new GraphSubmitter(
-            pipeline.getPipelineId(),
-            pipeline.getName(),
-            graphs,
-            dataSets,
-            dataStreamRelayContainers).detachGraphs();
-
-    if (status.isSuccess()) {
-      if (visualize) {
-        StorageDispatcher
-                .INSTANCE
-                .getNoSqlStore()
-                .getVisualizationStorageApi()
-                .deleteVisualization(pipeline.getPipelineId());
-      }
-      if (storeStatus) {
-        setPipelineStopped(pipeline);
-      }
-
-      PipelineStatusManager.addPipelineStatus(pipeline.getPipelineId(),
-              new PipelineStatusMessage(pipeline.getPipelineId(),
-                      System.currentTimeMillis(),
-                      PipelineStatusMessageType.PIPELINE_STOPPED.title(),
-                      PipelineStatusMessageType.PIPELINE_STOPPED.description()));
-
-    }
-    return status;
-  }
-
-  public PipelineOperationStatus migratePipelineElement(Pipeline currentPipeline,
-                                                         Tuple2<DataProcessorInvocation, DataProcessorInvocation> targetElement){
-    PipelineOperationStatus status = initPipelineOperationStatus();
-    //Purge existing relays
-    pipeline.getSepas().forEach(s -> s.setOutputStreamRelays(new ArrayList<>()));
-    //Generate new relays
-    PipelineGraph newPipelineGraph = new PipelineGraphBuilder(pipeline).buildGraph();
-    new InvocationGraphBuilder(newPipelineGraph, pipeline.getPipelineId()).buildGraphs();
-    //Get predecessors
-    List<NamedStreamPipesEntity> predecessors = new ArrayList<>();
-    PipelineGraph currentPipelineGraph = new PipelineGraphBuilder(currentPipeline).buildGraph();
-    PipelineGraphHelpers
-            .findStreams(newPipelineGraph)
-            .forEach(source -> predecessors.addAll(getPredecessors(source, targetElement.a, newPipelineGraph, new ArrayList<>())));
-    //Find counterpart for predecessors in currentPipeline
-    List<Tuple2<NamedStreamPipesEntity, NamedStreamPipesEntity>> matchingPredecessors = new ArrayList<>();
-    for(NamedStreamPipesEntity e : predecessors){
-      matchingPredecessors.add(new Tuple2<>(e, findMatching(e, currentPipelineGraph)));
-    }
-    List<NamedStreamPipesEntity> currentPredecessors = matchingPredecessors.stream().map(t -> t.b).collect(Collectors.toList());
-
-
-    PipelineOperationStatus statusStartTarget = startPipelineElement(Collections.singletonList(targetElement.a));
-    statusStartTarget.getElementStatus().forEach(status::addPipelineElementStatus);
-    if(!statusStartTarget.isSuccess()){
-      //Target PE could not be started; nothing to roll back
-      return status;
-    }
-    //Stop relay to origin
-    PipelineOperationStatus statusStopRelays = stopRelays(currentPredecessors, targetElement.b);
-    statusStopRelays.getElementStatus().forEach(status::addPipelineElementStatus);
-    if(!statusStopRelays.isSuccess()){
-      //Rollback of target PE and all successfully stopped Relays
-      stopPipelineElements(Collections.singletonList(targetElement.a))
-              .getElementStatus().forEach(status::addPipelineElementStatus);
-
-      Set<String> relaysToRollBack = statusStopRelays.getElementStatus().stream().filter(PipelineElementStatus::isSuccess)
-              .map(PipelineElementStatus::getElementId).collect(Collectors.toSet());
-      Map<NamedStreamPipesEntity, SpDataStreamRelayContainer> rollbackRelays = findRelays(currentPredecessors, targetElement.b)
-              .entrySet()
-              .stream()
-              .filter(entry -> relaysToRollBack.contains(entry.getValue().getRunningStreamRelayInstanceId()))
-              .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
-      new GraphSubmitter(pipeline.getPipelineId(),
-              pipeline.getName(),  new ArrayList<>(), new ArrayList<>(), new ArrayList<>()).invokeRelays(rollbackRelays)
-              .getElementStatus().forEach(status::addPipelineElementStatus);
-      return status;
-    }
-    //start Relay to target
-    PipelineOperationStatus statusStartRelays = startRelays(predecessors, targetElement.a);
-    statusStartRelays.getElementStatus().forEach(status::addPipelineElementStatus);
-    if(!statusStartRelays.isSuccess()){
-      //Rollback target PE, stopped relays and all successfully started Relays
-      stopPipelineElements(Collections.singletonList(targetElement.a))
-              .getElementStatus().forEach(status::addPipelineElementStatus);
-
-      startRelays(currentPredecessors, targetElement.b)
-              .getElementStatus().forEach(status::addPipelineElementStatus);
-
-      Set<String> relaysToRollBack = statusStartRelays.getElementStatus().stream().filter(PipelineElementStatus::isSuccess)
-              .map(PipelineElementStatus::getElementId).collect(Collectors.toSet());
-      Map<NamedStreamPipesEntity, SpDataStreamRelayContainer> rollbackRelays = findRelays(predecessors, targetElement.a)
-              .entrySet()
-              .stream()
-              .filter(entry -> relaysToRollBack.contains(entry.getValue().getRunningStreamRelayInstanceId()))
-              .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
-      new GraphSubmitter(pipeline.getPipelineId(),
-              pipeline.getName(),  new ArrayList<>(), new ArrayList<>(), new ArrayList<>()).detachRelays(rollbackRelays)
-              .getElementStatus().forEach(status::addPipelineElementStatus);
-      return status;
-    }
-    //Stop origin and associated relay
-    PipelineOperationStatus statusStopOrigin = stopPipelineElements(Collections.singletonList(targetElement.b));
-    statusStopOrigin.getElementStatus().forEach(status::addPipelineElementStatus);
-
-    if(!statusStopOrigin.isSuccess()){
-      //Rollback everything
-      stopRelays(predecessors, targetElement.a)
-              .getElementStatus().forEach(status::addPipelineElementStatus);
-      startRelays(currentPredecessors, targetElement.b)
-              .getElementStatus().forEach(status::addPipelineElementStatus);
-      stopPipelineElements(Collections.singletonList(targetElement.a))
-              .getElementStatus().forEach(status::addPipelineElementStatus);
-      return status;
-    }
-
-    List<InvocableStreamPipesEntity> graphs = new ArrayList<>();
-    graphs.addAll(pipeline.getActions());
-    graphs.addAll(pipeline.getSepas());
-    List<SpDataSet> dataSets = pipeline.getStreams().stream().filter(s -> s instanceof SpDataSet).map(s -> new
-            SpDataSet((SpDataSet) s)).collect(Collectors.toList());
-    storeInvocationGraphs(pipeline.getPipelineId(), graphs, dataSets);
-
-    status.setSuccess(status.getElementStatus().stream().allMatch(PipelineElementStatus::isSuccess));
-    return status;
-  }
-
-  private PipelineOperationStatus stopPipelineElements(List<NamedStreamPipesEntity> graphs){
-    List<InvocableStreamPipesEntity> invocations = new ArrayList<>();
-    graphs.stream().filter(i -> i instanceof InvocableStreamPipesEntity).forEach(i -> invocations.add((InvocableStreamPipesEntity) i));
-
-    if (invocations.isEmpty()) {
-      PipelineOperationStatus ret = initPipelineOperationStatus();
-      ret.setSuccess(true);
-      return ret;
-    }
-    return new GraphSubmitter(pipeline.getPipelineId(),
-            pipeline.getName(),  invocations, new ArrayList<>(), new ArrayList<>())
-            .detachGraphs();
-  }
-
-  private PipelineOperationStatus startPipelineElement(List<NamedStreamPipesEntity> graphs){
-    List<InvocableStreamPipesEntity> invocations = new ArrayList<>();
-    graphs.stream().filter(i -> i instanceof InvocableStreamPipesEntity).forEach(i -> invocations.add((InvocableStreamPipesEntity) i));
-
-    if (invocations.isEmpty()) {
-      PipelineOperationStatus ret = initPipelineOperationStatus();
-      ret.setSuccess(true);
-      return ret;
-    }
-    List<InvocableStreamPipesEntity> decryptedGraphs = decryptSecrets(invocations);
-    return new GraphSubmitter(pipeline.getPipelineId(),
-            pipeline.getName(), decryptedGraphs, new ArrayList<>(), new ArrayList<>())
-            .invokeGraphs();
-  }
-
-  private PipelineOperationStatus stopRelays(List<NamedStreamPipesEntity> predecessors, InvocableStreamPipesEntity target){
-    Map<NamedStreamPipesEntity, SpDataStreamRelayContainer> relays = findRelays(predecessors, target);
-    return new GraphSubmitter(pipeline.getPipelineId(),
-            pipeline.getName(),  new ArrayList<>(), new ArrayList<>(), new ArrayList<>()).detachRelays(relays);
-  }
-
-  private PipelineOperationStatus startRelays(List<NamedStreamPipesEntity> predecessors, InvocableStreamPipesEntity target){
-    Map<NamedStreamPipesEntity, SpDataStreamRelayContainer> relays = findRelays(predecessors, target);
-    return new GraphSubmitter(pipeline.getPipelineId(),
-            pipeline.getName(),  new ArrayList<>(), new ArrayList<>(), new ArrayList<>()).invokeRelays(relays);
-  }
-
-  private Map<NamedStreamPipesEntity, SpDataStreamRelayContainer> findRelays(List<NamedStreamPipesEntity> predecessors,
-                                                                             InvocableStreamPipesEntity target){
-
-    Map<NamedStreamPipesEntity, SpDataStreamRelayContainer> relays = new HashMap<>();
-
-    predecessors.forEach(pred -> {
-      List<SpDataStreamRelay> dataStreamRelays = new ArrayList<>();
-      SpDataStreamRelayContainer dsRelayContainer = new SpDataStreamRelayContainer();
-
-      if (pred instanceof DataProcessorInvocation){
-        //Data Processor
-        DataProcessorInvocation processorInvocation = (DataProcessorInvocation) pred;
-        dataStreamRelays.addAll(processorInvocation.getOutputStreamRelays().stream().
-                filter(r ->
-                        target.getInputStreams().stream().map(s ->
-                                s.getEventGrounding().getTransportProtocol().getTopicDefinition().getActualTopicName())
-                                .collect(Collectors.toSet()).contains(r.getEventGrounding().getTransportProtocol()
-                                .getTopicDefinition().getActualTopicName()))
-                .collect(Collectors.toList()));
-        dsRelayContainer.setRunningStreamRelayInstanceId(pipeline.getPipelineId());
-        dsRelayContainer.setEventRelayStrategy(pipeline.getEventRelayStrategy());
-        dsRelayContainer.setName(processorInvocation.getName() + " (Stream Relay)");
-        dsRelayContainer.setInputGrounding(new EventGrounding(processorInvocation.getOutputStream().getEventGrounding()));
-        dsRelayContainer.setDeploymentTargetNodeId(processorInvocation.getDeploymentTargetNodeId());
-        dsRelayContainer.setDeploymentTargetNodeHostname(processorInvocation.getDeploymentTargetNodeHostname());
-        dsRelayContainer.setDeploymentTargetNodePort(processorInvocation.getDeploymentTargetNodePort());
-        dsRelayContainer.setOutputStreamRelays(dataStreamRelays);
-        relays.put(pred, dsRelayContainer);
-      } else if (pred instanceof SpDataStream){
-        //DataStream
-        SpDataStream dataStream = (SpDataStream) pred;
-        if (!matchingDeploymentTargets(dataStream, target)){
-          //There is a relay that needs to be removed
-          dataStreamRelays.add(new SpDataStreamRelay(new EventGrounding(target.getInputStreams()
-                  .get(getIndex(pred.getDOM(), target))
-                  .getEventGrounding())));
-
-          dsRelayContainer.setRunningStreamRelayInstanceId(pipeline.getPipelineId());
-          dsRelayContainer.setEventRelayStrategy(pipeline.getEventRelayStrategy());
-          dsRelayContainer.setName(dataStream.getName() + " (Stream Relay)");
-          dsRelayContainer.setInputGrounding(new EventGrounding(dataStream.getEventGrounding()));
-          dsRelayContainer.setDeploymentTargetNodeId(dataStream.getDeploymentTargetNodeId());
-          dsRelayContainer.setDeploymentTargetNodeHostname(dataStream.getDeploymentTargetNodeHostname());
-          dsRelayContainer.setDeploymentTargetNodePort(dataStream.getDeploymentTargetNodePort());
-          dsRelayContainer.setOutputStreamRelays(dataStreamRelays);
-          relays.put(pred, dsRelayContainer);
-        }
-      }
-    });
-    return relays;
-  }
-
-
-  private List<NamedStreamPipesEntity> getPredecessors(NamedStreamPipesEntity source,
-                                                           InvocableStreamPipesEntity target, PipelineGraph pipelineGraph,
-                                                           List<NamedStreamPipesEntity> foundPredecessors){
-    //TODO: Check if this works for all graph topologies
-    if (pipelineGraph.outgoingEdgesOf(source)
-            .stream()
-            .map(pipelineGraph::getEdgeTarget)
-            .map(g -> (InvocableStreamPipesEntity) g)
-            .collect(Collectors.toSet()).contains(target)){
-      foundPredecessors.add(source);
-    } else{
-      List<NamedStreamPipesEntity> successors = pipelineGraph.outgoingEdgesOf(source)
-              .stream()
-              .map(pipelineGraph::getEdgeTarget)
-              .map(g -> (InvocableStreamPipesEntity) g)
-              .collect(Collectors.toList());
-      if (successors.isEmpty()) return foundPredecessors;
-      successors.forEach(successor -> getPredecessors(successor, target, pipelineGraph, foundPredecessors));
-    }
-    return foundPredecessors;
-  }
-
-  private NamedStreamPipesEntity findMatching(NamedStreamPipesEntity entity, PipelineGraph pipelineGraph){
-    AtomicReference<NamedStreamPipesEntity> match = new AtomicReference<>();
-    PipelineGraphHelpers.findStreams(pipelineGraph).forEach(ds -> {
-    NamedStreamPipesEntity foundEntity = compareGraphs(ds, entity, pipelineGraph, new ArrayList<>());
-    if (foundEntity != null) match.set(foundEntity);
-    });
-    return match.get();
-  }
-
-  private NamedStreamPipesEntity compareGraphs(NamedStreamPipesEntity source, NamedStreamPipesEntity searchedEntity, PipelineGraph pipelineGraph, List<NamedStreamPipesEntity> successors){
-    if(source.getDOM().equals(searchedEntity.getDOM())) return source;
-    else if (successors.isEmpty())
-      successors = pipelineGraph.outgoingEdgesOf(source)
-            .stream()
-            .map(pipelineGraph::getEdgeTarget)
-            .map(g -> (InvocableStreamPipesEntity) g)
-            .collect(Collectors.toList());
-    Optional<NamedStreamPipesEntity> successor= successors.stream().findFirst();
-    if (successor.isPresent()) {
-      successors.remove(successor.get());
-      return compareGraphs(successor.get(), searchedEntity, pipelineGraph, successors);
-    }
-    return null;
-  }
-
-  private PipelineOperationStatus initPipelineOperationStatus() {
-    //Duplicate from method in GraphSubmitter
-    PipelineOperationStatus status = new PipelineOperationStatus();
-    status.setPipelineId(pipeline.getPipelineId());
-    status.setPipelineName(pipeline.getName());
-    return status;
-  }
-
-
-  private String extractTopic(EventGrounding eg) {
-    return eg.getTransportProtocol().getTopicDefinition().getActualTopicName();
-  }
-
-  private List<SpDataStreamRelayContainer> generateDataStreamRelays(List<InvocableStreamPipesEntity> graphs) {
-    Set<String> topicSet = new HashSet<>();
-    List<SpDataStreamRelayContainer> dsRelayContainers = new ArrayList<>();
-    SpDataStreamRelayContainer dsRelayContainer = new SpDataStreamRelayContainer();
-    List<SpDataStreamRelay> dataStreamRelays = new ArrayList<>();
-
-    graphs.stream()
-            .filter(g -> pipeline.getStreams().stream()
-                    .filter(ds -> topicSet.add(extractTopic(ds.getEventGrounding())))
-                    .anyMatch(ds -> (!matchingDeploymentTargets(ds, g) && connected(ds, g))))
-            .forEach(g -> pipeline.getStreams()
-                    .forEach(ds -> {
-                      dsRelayContainer.setRunningStreamRelayInstanceId(pipeline.getPipelineId());
-                      dsRelayContainer.setEventRelayStrategy(pipeline.getEventRelayStrategy());
-                      dsRelayContainer.setName(ds.getName() + " (Stream Relay)");
-                      dsRelayContainer.setInputGrounding(new EventGrounding(ds.getEventGrounding()));
-                      dsRelayContainer.setDeploymentTargetNodeId(ds.getDeploymentTargetNodeId());
-                      dsRelayContainer.setDeploymentTargetNodeHostname(ds.getDeploymentTargetNodeHostname());
-                      dsRelayContainer.setDeploymentTargetNodePort(ds.getDeploymentTargetNodePort());
-
-                      dataStreamRelays.add(new SpDataStreamRelay(new EventGrounding(g.getInputStreams()
-                              .get(getIndex(ds.getDOM(), g))
-                              .getEventGrounding())));
-                    })
-            );
-
-    dsRelayContainer.setOutputStreamRelays(dataStreamRelays);
-    dsRelayContainers.add(dsRelayContainer);
-
-    return dsRelayContainers;
-  }
-
-  private boolean matchingDeploymentTargets(SpDataStream source, InvocableStreamPipesEntity target) {
-    return source.getDeploymentTargetNodeId().equals(target.getDeploymentTargetNodeId());
-  }
-
-  private boolean connected(SpDataStream source, InvocableStreamPipesEntity target) {
-    int index = getIndex(source.getDOM(), target);
-    if (index != -1) {
-      return target.getConnectedTo().get(index).equals(source.getDOM());
-    }
-    return false;
-  }
-
-  private void updateGroupIds(InvocableStreamPipesEntity entity) {
-    entity.getInputStreams()
-            .stream()
-            .filter(is -> is.getEventGrounding().getTransportProtocol() instanceof KafkaTransportProtocol)
-            .map(is -> is.getEventGrounding().getTransportProtocol())
-            .map(KafkaTransportProtocol.class::cast)
-            .forEach(tp -> tp.setGroupId(UUID.randomUUID().toString()));
-  }
-
-  private List<InvocableStreamPipesEntity> decryptSecrets(List<InvocableStreamPipesEntity> graphs) {
-    List<InvocableStreamPipesEntity> decryptedGraphs = new ArrayList<>();
-    graphs.stream().map(g -> {
-      if (g instanceof DataProcessorInvocation) {
-        return new DataProcessorInvocation((DataProcessorInvocation) g);
-      } else {
-        return new DataSinkInvocation((DataSinkInvocation) g);
-      }
-    }).forEach(g -> {
-      g.getStaticProperties()
-              .stream()
-              .filter(SecretStaticProperty.class::isInstance)
-              .forEach(sp -> {
-                try {
-                  String decrypted = CredentialsManager.decrypt(pipeline.getCreatedByUser(),
-                          ((SecretStaticProperty) sp).getValue());
-                  ((SecretStaticProperty) sp).setValue(decrypted);
-                  ((SecretStaticProperty) sp).setEncrypted(false);
-                } catch (GeneralSecurityException e) {
-                  e.printStackTrace();
-                }
-              });
-      decryptedGraphs.add(g);
-    });
-    return decryptedGraphs;
-  }
-
-  private void setPipelineStarted(Pipeline pipeline) {
-    pipeline.setRunning(true);
-    pipeline.setStartedAt(new Date().getTime());
-    try {
-      getPipelineStorageApi().updatePipeline(pipeline);
-    } catch (DocumentConflictException dce) {
-      //dce.printStackTrace();
-    }
-  }
-
-  private void setPipelineStopped(Pipeline pipeline) {
-    pipeline.setRunning(false);
-    getPipelineStorageApi().updatePipeline(pipeline);
-  }
-
-  private void storeInvocationGraphs(String pipelineId, List<InvocableStreamPipesEntity> graphs,
-                                     List<SpDataSet> dataSets) {
-    TemporaryGraphStorage.graphStorage.put(pipelineId, graphs);
-    TemporaryGraphStorage.datasetStorage.put(pipelineId, dataSets);
-  }
-
-  private IPipelineStorage getPipelineStorageApi() {
-    return StorageDispatcher.INSTANCE.getNoSqlStore().getPipelineStorageAPI();
-  }
-
-  private Integer getIndex(String sourceDomId, InvocableStreamPipesEntity targetElement) {
-    return targetElement.getConnectedTo().indexOf(sourceDomId);
-  }
-
-}
diff --git a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/http/StreamRelayEndpointUrlGenerator.java b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/http/StreamRelayEndpointUrlGenerator.java
index e62ff41..5ee8b51 100644
--- a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/http/StreamRelayEndpointUrlGenerator.java
+++ b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/http/StreamRelayEndpointUrlGenerator.java
@@ -16,7 +16,7 @@
  *
  */
 package org.apache.streampipes.manager.execution.http;
-import org.apache.streampipes.model.SpDataStreamRelayContainer;
+import org.apache.streampipes.model.eventrelay.SpDataStreamRelayContainer;
 
 public class StreamRelayEndpointUrlGenerator extends EndpointUrlGenerator<SpDataStreamRelayContainer> {
 
diff --git a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/pipeline/AbstractPipelineExecutor.java b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/pipeline/AbstractPipelineExecutor.java
new file mode 100644
index 0000000..af0aced
--- /dev/null
+++ b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/pipeline/AbstractPipelineExecutor.java
@@ -0,0 +1,458 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.apache.streampipes.manager.execution.pipeline;
+
+import org.apache.streampipes.commons.exceptions.SpRuntimeException;
+import org.apache.streampipes.manager.data.PipelineGraph;
+import org.apache.streampipes.manager.data.PipelineGraphHelpers;
+import org.apache.streampipes.manager.execution.http.GraphSubmitter;
+import org.apache.streampipes.manager.node.NodeClusterManager;
+import org.apache.streampipes.manager.util.TemporaryGraphStorage;
+import org.apache.streampipes.model.SpDataSet;
+import org.apache.streampipes.model.SpDataStream;
+import org.apache.streampipes.model.eventrelay.SpDataStreamRelay;
+import org.apache.streampipes.model.eventrelay.SpDataStreamRelayContainer;
+import org.apache.streampipes.model.base.InvocableStreamPipesEntity;
+import org.apache.streampipes.model.base.NamedStreamPipesEntity;
+import org.apache.streampipes.model.graph.DataProcessorInvocation;
+import org.apache.streampipes.model.graph.DataSinkInvocation;
+import org.apache.streampipes.model.grounding.EventGrounding;
+import org.apache.streampipes.model.grounding.KafkaTransportProtocol;
+import org.apache.streampipes.model.pipeline.Pipeline;
+import org.apache.streampipes.model.pipeline.PipelineOperationStatus;
+import org.apache.streampipes.model.staticproperty.SecretStaticProperty;
+import org.apache.streampipes.storage.api.IPipelineStorage;
+import org.apache.streampipes.storage.management.StorageDispatcher;
+import org.apache.streampipes.user.management.encryption.CredentialsManager;
+
+import java.security.GeneralSecurityException;
+import java.util.*;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.stream.Collectors;
+
+public abstract class AbstractPipelineExecutor {
+
+    protected Pipeline pipeline;
+    protected boolean visualize;
+    protected boolean storeStatus;
+    protected boolean monitor;
+
+    public AbstractPipelineExecutor(Pipeline pipeline, boolean visualize, boolean storeStatus, boolean monitor) {
+        this.pipeline = pipeline;
+        this.visualize = visualize;
+        this.storeStatus = storeStatus;
+        this.monitor = monitor;
+    }
+
+    // standard methods
+    protected void setPipelineStarted(Pipeline pipeline) {
+        pipeline.setRunning(true);
+        pipeline.setStartedAt(new Date().getTime());
+        getPipelineStorageApi().updatePipeline(pipeline);
+    }
+
+    protected void setPipelineStopped(Pipeline pipeline) {
+        pipeline.setRunning(false);
+        getPipelineStorageApi().updatePipeline(pipeline);
+    }
+
+    protected void deleteVisualization(String pipelineId) {
+        StorageDispatcher.INSTANCE
+                .getNoSqlStore()
+                .getVisualizationStorageApi()
+                .deleteVisualization(pipelineId);
+    }
+
+    protected void storeInvocationGraphs(String pipelineId, List<InvocableStreamPipesEntity> graphs,
+                                       List<SpDataSet> dataSets) {
+        TemporaryGraphStorage.graphStorage.put(pipelineId, graphs);
+        TemporaryGraphStorage.datasetStorage.put(pipelineId, dataSets);
+    }
+
+    protected void storeDataStreamRelayContainer(List<SpDataStreamRelayContainer> relays) {
+        relays.forEach(NodeClusterManager::persistDataStreamRelay);
+    }
+
+    protected void deleteDataStreamRelayContainer(List<SpDataStreamRelayContainer> relays) {
+        relays.forEach(NodeClusterManager::deleteDataStreamRelay);
+    }
+
+    protected void updateDataStreamRelayContainer(List<SpDataStreamRelayContainer> relays) {
+        relays.forEach(NodeClusterManager::updateDataStreamRelay);
+    }
+
+
+    protected PipelineOperationStatus startPipelineElementsAndRelays(List<InvocableStreamPipesEntity> graphs,
+                                                                     List<SpDataStreamRelayContainer> relays){
+        if (graphs.isEmpty()) {
+            return initPipelineOperationStatus();
+        }
+        return new GraphSubmitter(pipeline.getPipelineId(), pipeline.getName(),
+                graphs, new ArrayList<>(), relays).invokePipelineElementsAndRelays();
+    }
+
+    protected PipelineOperationStatus stopPipelineElementsAndRelays(List<InvocableStreamPipesEntity> graphs,
+                                                                    List<SpDataStreamRelayContainer> relays){
+        if (graphs.isEmpty()) {
+            return initPipelineOperationStatus();
+        }
+        return new GraphSubmitter(pipeline.getPipelineId(), pipeline.getName(),
+                graphs, new ArrayList<>(),relays).detachPipelineElementsAndRelays();
+    }
+
+    protected PipelineOperationStatus startRelays(List<SpDataStreamRelayContainer> relays){
+        return new GraphSubmitter(pipeline.getPipelineId(), pipeline.getName(), new ArrayList<>(), new ArrayList<>(),
+                relays).invokeRelaysOnMigration();
+    }
+
+    protected PipelineOperationStatus stopRelays(List<SpDataStreamRelayContainer> relays){
+        return new GraphSubmitter(pipeline.getPipelineId(), pipeline.getName(),new ArrayList<>(), new ArrayList<>(),
+                relays).detachRelaysOnMigration();
+    }
+
+    protected List<SpDataStreamRelayContainer> findRelays(List<NamedStreamPipesEntity> predecessors,
+                                                          InvocableStreamPipesEntity target){
+
+        List<SpDataStreamRelayContainer> relays = new ArrayList<>();
+
+        predecessors.forEach(pred -> {
+            List<SpDataStreamRelay> dataStreamRelays = new ArrayList<>();
+            SpDataStreamRelayContainer relayContainer = new SpDataStreamRelayContainer();
+
+            if (pred instanceof DataProcessorInvocation){
+                //Data Processor
+                DataProcessorInvocation graph = (DataProcessorInvocation) pred;
+                if (differentDeploymentTargets(pred, target)) {
+                    dataStreamRelays.addAll(findRelaysWithMatchingTopic(graph, target));
+
+                    //dsRelayContainer.setRunningStreamRelayInstanceId(pipeline.getPipelineId());
+                    relayContainer = new SpDataStreamRelayContainer(graph);
+                    relayContainer.setOutputStreamRelays(dataStreamRelays);
+
+                    relays.add(relayContainer);
+                }
+            } else if (pred instanceof SpDataStream){
+                //DataStream
+                SpDataStream stream = (SpDataStream) pred;
+                if (differentDeploymentTargets(stream, target)){
+                    //There is a relay that needs to be removed
+                    dataStreamRelays.add(new SpDataStreamRelay(new EventGrounding(target.getInputStreams()
+                            .get(getIndex(pred.getDOM(), target))
+                            .getEventGrounding())));
+
+                    String id = extractUniqueAdpaterId(stream.getElementId());
+                    String relayStrategy = pipeline.getEventRelayStrategy();
+
+                    relays.add(new SpDataStreamRelayContainer(id, relayStrategy, stream, dataStreamRelays));
+                }
+            }
+        });
+        return relays;
+    }
+
+    protected List<NamedStreamPipesEntity> getPredecessors(NamedStreamPipesEntity source,
+                                                           InvocableStreamPipesEntity target,
+                                                           PipelineGraph pipelineGraph,
+                                                           List<NamedStreamPipesEntity> foundPredecessors){
+
+        Set<InvocableStreamPipesEntity> targets = getTargetsAsSet(source, pipelineGraph,
+                InvocableStreamPipesEntity.class);
+
+        //TODO: Check if this works for all graph topologies
+        if (targets.contains(target)){
+            foundPredecessors.add(source);
+        } else {
+            List<NamedStreamPipesEntity> successors = getTargetsAsList(source, pipelineGraph,
+                    NamedStreamPipesEntity.class);
+
+            if (successors.isEmpty()) return foundPredecessors;
+            successors.forEach(successor -> getPredecessors(successor, target, pipelineGraph, foundPredecessors));
+        }
+        return foundPredecessors;
+    }
+
+    protected NamedStreamPipesEntity findMatching(NamedStreamPipesEntity entity, PipelineGraph pipelineGraph){
+        AtomicReference<NamedStreamPipesEntity> match = new AtomicReference<>();
+        List<SpDataStream> dataStreams = PipelineGraphHelpers.findStreams(pipelineGraph);
+
+        for (SpDataStream stream : dataStreams) {
+            NamedStreamPipesEntity foundEntity = compareGraphs(stream, entity, pipelineGraph, new ArrayList<>());
+            if (foundEntity != null) {
+                match.set(foundEntity);
+            }
+        }
+        return match.get();
+    }
+
+    private NamedStreamPipesEntity compareGraphs(NamedStreamPipesEntity source,
+                                                   NamedStreamPipesEntity searchedEntity,
+                                                   PipelineGraph pipelineGraph,
+                                                   List<NamedStreamPipesEntity> successors){
+        if(matchingDOM(source, searchedEntity)) {
+            return source;
+        } else if (successors.isEmpty()) {
+            successors = getTargetsAsList(source, pipelineGraph, NamedStreamPipesEntity.class);
+            Optional<NamedStreamPipesEntity> successor = successors.stream().findFirst();
+            if (successor.isPresent()) {
+                successors.remove(successor.get());
+                return compareGraphs(successor.get(), searchedEntity, pipelineGraph, successors);
+            }
+        }
+        return null;
+    }
+
+    protected List<SpDataStreamRelayContainer> generateRelays(List<InvocableStreamPipesEntity> graphs) {
+        return generateDataStreamRelays(graphs).stream()
+                .filter(r -> r.getOutputStreamRelays().size() > 0)
+                .collect(Collectors.toList());
+    }
+
+    private List<SpDataStreamRelayContainer> generateDataStreamRelays(List<InvocableStreamPipesEntity> graphs) {
+        List<SpDataStreamRelayContainer> relays = new ArrayList<>();
+
+        for (InvocableStreamPipesEntity graph : graphs) {
+            for (SpDataStream stream: pipeline.getStreams()) {
+                if (differentDeploymentTargets(stream, graph) && connected(stream, graph)) {
+
+                    List<SpDataStreamRelay> dataStreamRelays = new ArrayList<>();
+                    dataStreamRelays.add(new SpDataStreamRelay(new EventGrounding(graph.getInputStreams()
+                            .get(getIndex(stream.getDOM(), graph))
+                            .getEventGrounding())));
+
+                    String id = extractUniqueAdpaterId(stream.getElementId());
+                    String relayStrategy = pipeline.getEventRelayStrategy();
+
+                    relays.add(new SpDataStreamRelayContainer(id, relayStrategy, stream, dataStreamRelays));
+                }
+            }
+            for (DataProcessorInvocation processor : pipeline.getSepas()) {
+                if (differentDeploymentTargets(processor, graph) && connected(processor, graph)) {
+                    SpDataStreamRelayContainer processorRelay = new SpDataStreamRelayContainer(processor);
+                    relays.add(processorRelay);
+                }
+            }
+        }
+        return relays;
+    }
+
+
+    // Helpers
+
+    /**
+     * Updates group.id for data processor/sink. Note: KafkaTransportProtocol only!!
+     *
+     * @param entity    data processor/sink
+     */
+    protected void updateKafkaGroupIds(InvocableStreamPipesEntity entity) {
+        entity.getInputStreams()
+                .stream()
+                .filter(is -> is.getEventGrounding().getTransportProtocol() instanceof KafkaTransportProtocol)
+                .map(is -> is.getEventGrounding().getTransportProtocol())
+                .map(KafkaTransportProtocol.class::cast)
+                .forEach(tp -> tp.setGroupId(UUID.randomUUID().toString()));
+    }
+
+    /**
+     * Decrypt potential secrets contained in static properties, e.g., passwords
+     *
+     * @param graphs    List of graphs
+     * @return  List of decrypted graphs
+     */
+    protected List<InvocableStreamPipesEntity> decryptSecrets(List<InvocableStreamPipesEntity> graphs) {
+        List<InvocableStreamPipesEntity> decryptedGraphs = new ArrayList<>();
+        graphs.stream().map(g -> {
+            if (g instanceof DataProcessorInvocation) {
+                return new DataProcessorInvocation((DataProcessorInvocation) g);
+            } else {
+                return new DataSinkInvocation((DataSinkInvocation) g);
+            }
+        }).forEach(g -> {
+            g.getStaticProperties()
+                    .stream()
+                    .filter(SecretStaticProperty.class::isInstance)
+                    .forEach(sp -> {
+                        try {
+                            String decrypted = CredentialsManager.decrypt(pipeline.getCreatedByUser(),
+                                    ((SecretStaticProperty) sp).getValue());
+                            ((SecretStaticProperty) sp).setValue(decrypted);
+                            ((SecretStaticProperty) sp).setEncrypted(false);
+                        } catch (GeneralSecurityException e) {
+                            e.printStackTrace();
+                        }
+                    });
+            decryptedGraphs.add(g);
+        });
+        return decryptedGraphs;
+    }
+
+    /**
+     * Get pipeline storage dispatcher API
+     *
+     * @return IPipelineStorage NoSQL storage interface for pipelines
+     */
+    private IPipelineStorage getPipelineStorageApi() {
+        return StorageDispatcher.INSTANCE.getNoSqlStore().getPipelineStorageAPI();
+    }
+
+    /**
+     * Extract topic name
+     *
+     * @param entity
+     * @return
+     */
+    private String extractActualTopic(NamedStreamPipesEntity entity) {
+        if (entity instanceof SpDataStream) {
+            return ((SpDataStream) entity)
+                    .getEventGrounding().getTransportProtocol().getTopicDefinition().getActualTopicName();
+        } else if (entity instanceof SpDataStreamRelay) {
+            return ((SpDataStreamRelay) entity)
+                    .getEventGrounding().getTransportProtocol().getTopicDefinition().getActualTopicName();
+        }
+        throw new SpRuntimeException("Could not extract actual topic name from entity");
+    }
+
+    // Edge / Migration Helpers
+
+    /**
+     * Compare deployment targets of two pipeline elements, namely data stream/processor (source) and data
+     * processor/sink (target)
+     *
+     * @param source    data stream/processor
+     * @param target    data processor/sink
+     * @return boolean value that returns true if source and target share the same deployment target, else false
+     */
+    private boolean differentDeploymentTargets(NamedStreamPipesEntity source, InvocableStreamPipesEntity target) {
+        if (source instanceof SpDataStream) {
+            return !((SpDataStream) source).getDeploymentTargetNodeId().equals(target.getDeploymentTargetNodeId());
+        } else if (source instanceof DataProcessorInvocation) {
+            return !((DataProcessorInvocation) source).getDeploymentTargetNodeId().equals(target.getDeploymentTargetNodeId());
+        }
+        throw new SpRuntimeException("Matching deployment targets check failed");
+    }
+
+    /**
+     * Find relays with matching topics
+     *
+     * @param graph     data processor
+     * @param target    data processor/sink
+     * @return collection of data stream relays
+     */
+    private Collection<? extends SpDataStreamRelay> findRelaysWithMatchingTopic(DataProcessorInvocation graph,
+                                                                                InvocableStreamPipesEntity target) {
+        return graph.getOutputStreamRelays().stream().
+                filter(relay ->
+                        target.getInputStreams().stream()
+                                .map(this::extractActualTopic)
+                                .collect(Collectors.toSet())
+                                .contains(extractActualTopic(relay)))
+                .collect(Collectors.toList());
+    }
+
+
+    private <T> Set<T> getTargetsAsSet(NamedStreamPipesEntity source, PipelineGraph pipelineGraph,
+                                       Class<T> clazz){
+        return pipelineGraph.outgoingEdgesOf(source)
+                .stream()
+                .map(pipelineGraph::getEdgeTarget)
+                .map(clazz::cast)
+                .collect(Collectors.toSet());
+    }
+
+    private <T> List<T> getTargetsAsList(NamedStreamPipesEntity source, PipelineGraph pipelineGraph,
+                                         Class<T> clazz){
+        return new ArrayList<>(getTargetsAsSet(source, pipelineGraph, clazz));
+    }
+
+    /**
+     * Compare connection of two pipeline elements, namely data stream/processor (source) and data processor/sink
+     * (target) by DOM identifier.
+     *
+     * @param source    data stream or data processor
+     * @param target    data processor/sink
+     * @return boolean value that returns true if source and target are connected, else false
+     */
+    private boolean connected(NamedStreamPipesEntity source, InvocableStreamPipesEntity target) {
+        int index = getIndex(source.getDOM(), target);
+        if (index != -1) {
+            return target.getConnectedTo().get(index).equals(source.getDOM());
+        }
+        return false;
+    }
+
+    /**
+     * Get index of data processor/sink connection based on source DOM identifier
+     *
+     * @param sourceDomId   source DOM identifier
+     * @param target        data processor/sink
+     * @return Integer with index of connection, if invalid returns -1.
+     */
+    private Integer getIndex(String sourceDomId, InvocableStreamPipesEntity target) {
+        return target.getConnectedTo().indexOf(sourceDomId);
+    }
+
+    /**
+     * Checks if DOM are equal
+     *
+     * @param source pipeline element
+     * @param target pipeline element
+     * @return true if DOM is the same, else false
+     */
+    private boolean matchingDOM(NamedStreamPipesEntity source, NamedStreamPipesEntity target) {
+        return source.getDOM().equals(target.getDOM());
+    }
+
+    /**
+     * Get List of InvocableStreamPipes entities, i.e., data processors/sinks from list of NameStreamPipesEntity
+     *
+     * @param graphs    List<NamedStreamPipesEntity> graphs
+     * @return
+     */
+    private List<InvocableStreamPipesEntity> getListOfInvocableStreamPipesEntity(List<NamedStreamPipesEntity> graphs) {
+        List<InvocableStreamPipesEntity> invocableEntities = new ArrayList<>();
+        graphs.stream()
+                .filter(i -> i instanceof InvocableStreamPipesEntity)
+                .forEach(i -> invocableEntities.add((InvocableStreamPipesEntity) i));
+        return invocableEntities;
+    }
+
+    /**
+     * Create pipeline operation status with pipeline id and name and set success to true
+     *
+     * @return PipelineOperationStatus
+     */
+    protected PipelineOperationStatus initPipelineOperationStatus() {
+        PipelineOperationStatus status = new PipelineOperationStatus();
+        status.setPipelineId(pipeline.getPipelineId());
+        status.setPipelineName(pipeline.getName());
+        status.setSuccess(true);
+        return status;
+    }
+
+    private <T> List<T> filter(List<InvocableStreamPipesEntity> graphs, Class<T> clazz) {
+        return graphs
+                .stream()
+                .filter(clazz::isInstance)
+                .map(clazz::cast)
+                .collect(Collectors.toList());
+    }
+
+    private String extractUniqueAdpaterId(String s) {
+        return s.substring(s.lastIndexOf("/") + 1);
+    }
+
+}
diff --git a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/pipeline/PipelineExecutor.java b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/pipeline/PipelineExecutor.java
new file mode 100644
index 0000000..17aa1ce
--- /dev/null
+++ b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/pipeline/PipelineExecutor.java
@@ -0,0 +1,110 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.streampipes.manager.execution.pipeline;
+
+import org.apache.streampipes.manager.execution.http.GraphSubmitter;
+import org.apache.streampipes.model.eventrelay.SpDataStreamRelayContainer;
+import org.apache.streampipes.manager.execution.status.PipelineStatusManager;
+import org.apache.streampipes.manager.util.TemporaryGraphStorage;
+import org.apache.streampipes.model.SpDataSet;
+import org.apache.streampipes.model.base.InvocableStreamPipesEntity;
+import org.apache.streampipes.model.graph.DataProcessorInvocation;
+import org.apache.streampipes.model.graph.DataSinkInvocation;
+import org.apache.streampipes.model.message.PipelineStatusMessage;
+import org.apache.streampipes.model.message.PipelineStatusMessageType;
+import org.apache.streampipes.model.pipeline.Pipeline;
+import org.apache.streampipes.model.pipeline.PipelineOperationStatus;
+
+import java.util.*;
+import java.util.stream.Collectors;
+
+public class PipelineExecutor extends AbstractPipelineExecutor {
+
+    public PipelineExecutor(Pipeline pipeline, boolean visualize, boolean storeStatus, boolean monitor) {
+        super(pipeline, visualize, storeStatus, monitor);
+    }
+
+    public PipelineOperationStatus startPipeline() {
+
+        pipeline.getSepas().forEach(this::updateKafkaGroupIds);
+        pipeline.getActions().forEach(this::updateKafkaGroupIds);
+
+        List<DataProcessorInvocation> sepas = pipeline.getSepas();
+        List<DataSinkInvocation> secs = pipeline.getActions();
+
+        List<SpDataSet> dataSets = pipeline.getStreams().stream().filter(s -> s instanceof SpDataSet).map(s -> new
+                SpDataSet((SpDataSet) s)).collect(Collectors.toList());
+
+        for (SpDataSet ds : dataSets) {
+          ds.setCorrespondingPipeline(pipeline.getPipelineId());
+        }
+
+        List<InvocableStreamPipesEntity> graphs = new ArrayList<>();
+        graphs.addAll(sepas);
+        graphs.addAll(secs);
+
+        List<InvocableStreamPipesEntity> decryptedGraphs = decryptSecrets(graphs);
+
+        graphs.forEach(g -> g.setStreamRequirements(Collections.emptyList()));
+
+        List<SpDataStreamRelayContainer> relays = generateRelays(decryptedGraphs);
+
+        PipelineOperationStatus status = new GraphSubmitter(pipeline.getPipelineId(), pipeline.getName(),
+                decryptedGraphs, dataSets, relays).invokePipelineElementsAndRelays();
+
+        if (status.isSuccess()) {
+            storeInvocationGraphs(pipeline.getPipelineId(), graphs, dataSets);
+            storeDataStreamRelayContainer(relays);
+
+            PipelineStatusManager.addPipelineStatus(
+                  pipeline.getPipelineId(),
+                  new PipelineStatusMessage(pipeline.getPipelineId(),
+                          System.currentTimeMillis(),
+                          PipelineStatusMessageType.PIPELINE_STARTED.title(),
+                          PipelineStatusMessageType.PIPELINE_STARTED.description()));
+
+          if (storeStatus) setPipelineStarted(pipeline);
+        }
+        return status;
+    }
+
+    public PipelineOperationStatus stopPipeline() {
+        List<InvocableStreamPipesEntity> graphs = TemporaryGraphStorage.graphStorage.get(pipeline.getPipelineId());
+        List<SpDataSet> dataSets = TemporaryGraphStorage.datasetStorage.get(pipeline.getPipelineId());
+        List<SpDataStreamRelayContainer> relays = generateRelays(graphs);
+
+        PipelineOperationStatus status = new GraphSubmitter(pipeline.getPipelineId(), pipeline.getName(), graphs,
+                dataSets, relays).detachPipelineElementsAndRelays();
+
+        if (status.isSuccess()) {
+            if (visualize) deleteVisualization(pipeline.getPipelineId());
+            if (storeStatus) setPipelineStopped(pipeline);
+
+            deleteDataStreamRelayContainer(relays);
+
+            PipelineStatusManager.addPipelineStatus(pipeline.getPipelineId(),
+                  new PipelineStatusMessage(pipeline.getPipelineId(),
+                          System.currentTimeMillis(),
+                          PipelineStatusMessageType.PIPELINE_STOPPED.title(),
+                          PipelineStatusMessageType.PIPELINE_STOPPED.description()));
+        }
+        return status;
+    }
+
+}
diff --git a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/pipeline/PipelineMigrationExecutor.java b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/pipeline/PipelineMigrationExecutor.java
new file mode 100644
index 0000000..e610d24
--- /dev/null
+++ b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/pipeline/PipelineMigrationExecutor.java
@@ -0,0 +1,349 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.apache.streampipes.manager.execution.pipeline;
+
+import org.apache.streampipes.manager.data.PipelineGraph;
+import org.apache.streampipes.manager.data.PipelineGraphBuilder;
+import org.apache.streampipes.manager.data.PipelineGraphHelpers;
+import org.apache.streampipes.manager.execution.http.GraphSubmitter;
+import org.apache.streampipes.manager.matching.InvocationGraphBuilder;
+import org.apache.streampipes.model.SpDataSet;
+import org.apache.streampipes.model.eventrelay.SpDataStreamRelayContainer;
+import org.apache.streampipes.model.base.InvocableStreamPipesEntity;
+import org.apache.streampipes.model.base.NamedStreamPipesEntity;
+import org.apache.streampipes.model.graph.DataProcessorInvocation;
+import org.apache.streampipes.model.pipeline.Pipeline;
+import org.apache.streampipes.model.pipeline.PipelineElementMigrationEntity;
+import org.apache.streampipes.model.pipeline.PipelineElementStatus;
+import org.apache.streampipes.model.pipeline.PipelineOperationStatus;
+
+import java.util.*;
+import java.util.stream.Collectors;
+
+public class PipelineMigrationExecutor extends AbstractPipelineExecutor {
+
+    /**
+     * Old pipeline before migration
+     */
+    private final Pipeline pipelineBeforeMigration;
+
+    /**
+     * Pipeline element to be migrated. Contains pair of source and target element description
+     */
+    private final PipelineElementMigrationEntity migrationEntity;
+
+    /**
+     * Predecessors of the pipeline element to be migrated in the migration pipeline
+     */
+    private final List<NamedStreamPipesEntity> predecessorsAfterMigration;
+
+    /**
+     * Predecessors of the pipeline element to be migrated in the old pipeline before migration
+     */
+    private final List<NamedStreamPipesEntity> predecessorsBeforeMigration;
+
+    /**
+     * Collection of relays that were created in the migration process needing to be stored
+     */
+    private final List<SpDataStreamRelayContainer> relaysToBePersisted;
+
+    /**
+     * Collection of relays that were removed in the migration process needing to be deleted from persistent storage.
+     */
+    private final List<SpDataStreamRelayContainer> relaysToBeDeleted;
+
+
+    public PipelineMigrationExecutor(Pipeline pipeline, Pipeline pipelineBeforeMigration,
+                                     PipelineElementMigrationEntity migrationEntity,
+                                     boolean visualize, boolean storeStatus, boolean monitor) {
+        super(pipeline, visualize, storeStatus, monitor);
+        this.pipelineBeforeMigration = pipelineBeforeMigration;
+        this.migrationEntity = migrationEntity;
+        this.predecessorsAfterMigration = new ArrayList<>();
+        this.predecessorsBeforeMigration = new ArrayList<>();
+        this.relaysToBePersisted = new ArrayList<>();
+        this.relaysToBeDeleted = new ArrayList<>();
+    }
+
+    // TODO: refactor!
+    public PipelineOperationStatus migratePipelineElement() {
+
+        PipelineOperationStatus status = initPipelineOperationStatus();
+
+        // 1. start new element
+        // 2. stop relay to origin element
+        // 3. start relay to new element
+        // 4. stop origin element
+        // 5. stop origin relay
+
+        prepareMigration();
+
+        // Start target pipeline elements and relays on new target node
+        PipelineOperationStatus statusStartTarget = startTargetPipelineElementsAndRelays(status);
+        if(!statusStartTarget.isSuccess()){
+            //Target PE could not be started; nothing to roll back
+            return status;
+        }
+
+        // Stop relays from origin predecessor
+        PipelineOperationStatus statusStopRelays = stopRelaysFromPredecessorsBeforeMigration(status);
+        if(!statusStopRelays.isSuccess()){
+            rollbackToPreMigrationStepOne(statusStopRelays, status);
+            return status;
+        }
+
+        // Start relays to target after migration
+        PipelineOperationStatus statusStartRelays = startRelaysFromPredecessorsAfterMigration(status);
+        if(!statusStartRelays.isSuccess()){
+            rollbackToPreMigrationStepTwo(statusStartRelays, status);
+            return status;
+        }
+
+        //Stop origin and associated relay
+        PipelineOperationStatus statusStopOrigin = stopOriginPipelineElementsAndRelays(status);
+        if(!statusStopOrigin.isSuccess()){
+            rollbackToPreMigrationStepThree(status);
+            return status;
+        }
+
+        List<InvocableStreamPipesEntity> graphs = new ArrayList<>();
+        graphs.addAll(pipeline.getActions());
+        graphs.addAll(pipeline.getSepas());
+
+        List<SpDataSet> dataSets = findDataSets();
+
+        // store new pipeline and relays
+        storeInvocationGraphs(pipeline.getPipelineId(), graphs, dataSets);
+        storeDataStreamRelayContainer(relaysToBePersisted);
+        deleteDataStreamRelayContainer(relaysToBeDeleted);
+
+        // set global status
+        status.setSuccess(status.getElementStatus().stream().allMatch(PipelineElementStatus::isSuccess));
+
+        return status;
+    }
+
+    private void prepareMigration() {
+        //Purge existing relays
+        purgeExistingRelays();
+
+        //Generate new relays
+        PipelineGraph pipelineGraphAfterMigration = new PipelineGraphBuilder(pipeline).buildGraph();
+        buildGraphWithRelays(pipelineGraphAfterMigration);
+
+        //Get predecessors
+        PipelineGraph pipelineGraphBeforeMigration = new PipelineGraphBuilder(pipelineBeforeMigration).buildGraph();
+        findPredecessorsInMigrationPipeline(pipelineGraphAfterMigration);
+
+        //Find counterpart for predecessors in currentPipeline
+        findAndComparePredecessorsInCurrentPipeline(pipelineGraphBeforeMigration);
+    }
+
+    private void rollbackToPreMigrationStepOne(PipelineOperationStatus statusStopRelays,
+                                               PipelineOperationStatus status) {
+        // Stop target pipeline element and relays on new target node
+        PipelineOperationStatus statusTargetRollback = rollbackTargetPipelineElementsAndRelays();
+
+        // Restart relays before migration attempt
+        // extract unique running instance ids of original relays
+        Set<String> relayIdsToRollback = extractUniqueRelayIds(statusStopRelays);
+        PipelineOperationStatus statusRelaysRollback = rollbackToOriginRelaysByInvoke(relayIdsToRollback);
+
+        // Add status to global migration status
+        updateMigrationStatus(statusTargetRollback, status);
+        updateMigrationStatus(statusRelaysRollback, status);
+    }
+
+    private void rollbackToPreMigrationStepTwo(PipelineOperationStatus statusStartRelays,
+                                               PipelineOperationStatus status) {
+        //Rollback target PE, stopped relays and all successfully started relays
+        PipelineOperationStatus statusTargetRollback = rollbackTargetPipelineElementsAndRelays();
+
+        // Restart relays to origin from predecessors before migration
+        PipelineOperationStatus statusRelaysInvokeRollback = startRelays(findRelays(predecessorsBeforeMigration,
+                migrationEntity.getSourceElement()));
+
+        // Stop relays that were started due to migration attempt
+        // extract unique running instance ids of original relays
+        Set<String> relayIdsToRollback = extractUniqueRelayIds(statusStartRelays);
+        PipelineOperationStatus statusRelaysDetachRollback = rollbackTargetRelaysByDetach(relayIdsToRollback);
+
+        // Add status to global migration status
+        updateMigrationStatus(statusTargetRollback, status);
+        updateMigrationStatus(statusRelaysInvokeRollback, status);
+        updateMigrationStatus(statusRelaysDetachRollback, status);
+    }
+
+    private void rollbackToPreMigrationStepThree(PipelineOperationStatus status) {
+        //Rollback everything
+        PipelineOperationStatus statusStopRelays = stopRelays(findRelays(predecessorsAfterMigration,
+                migrationEntity.getTargetElement()));
+
+        PipelineOperationStatus statusStartRelays = startRelays(findRelays(predecessorsBeforeMigration,
+                migrationEntity.getSourceElement()));
+
+        List<InvocableStreamPipesEntity> graphs = Collections.singletonList(migrationEntity.getTargetElement());
+        List<SpDataStreamRelayContainer> relays = extractRelaysFromDataProcessor(graphs);
+
+        PipelineOperationStatus statusStopTargetAndRelays = stopPipelineElementsAndRelays(graphs, relays);
+
+        // Add status to global migration status
+        updateMigrationStatus(statusStopRelays, status);
+        updateMigrationStatus(statusStartRelays, status);
+        updateMigrationStatus(statusStopTargetAndRelays, status);
+    }
+
+
+    private PipelineOperationStatus rollbackToOriginRelaysByInvoke(Set<String> relayIdsToRollback) {
+        List<SpDataStreamRelayContainer> rollbackRelays = findRelaysAndFilterById(relayIdsToRollback,
+                predecessorsBeforeMigration, migrationEntity.getSourceElement());
+
+        return startRelays(rollbackRelays);
+    }
+
+    private PipelineOperationStatus rollbackTargetRelaysByDetach(Set<String> relayIdsToRollback) {
+        List<SpDataStreamRelayContainer> rollbackRelays = findRelaysAndFilterById(relayIdsToRollback,
+                predecessorsAfterMigration, migrationEntity.getTargetElement());
+
+        return stopRelays(rollbackRelays);
+    }
+
+    private PipelineOperationStatus rollbackTargetPipelineElementsAndRelays() {
+        List<InvocableStreamPipesEntity> graphs = Collections.singletonList(migrationEntity.getTargetElement());
+        List<SpDataStreamRelayContainer> relays = extractRelaysFromDataProcessor(graphs);
+
+        return new GraphSubmitter(pipeline.getPipelineId(),
+                pipeline.getName(), graphs, new ArrayList<>(), relays).detachPipelineElementsAndRelays();
+    }
+
+    private PipelineOperationStatus startRelaysFromPredecessorsAfterMigration(PipelineOperationStatus status) {
+        List<SpDataStreamRelayContainer> relays = findRelays(predecessorsAfterMigration,
+                migrationEntity.getTargetElement());
+
+        updateRelaysToBePersisted(relays);
+
+        PipelineOperationStatus statusStartRelays = startRelays(relays);
+        updateMigrationStatus(statusStartRelays, status);
+
+        return statusStartRelays;
+    }
+
+    private PipelineOperationStatus stopRelaysFromPredecessorsBeforeMigration(PipelineOperationStatus status) {
+        List<SpDataStreamRelayContainer> relays = findRelays(predecessorsBeforeMigration,
+                migrationEntity.getSourceElement());
+
+        updateRelaysToBeDeleted(relays);
+
+        PipelineOperationStatus statusStopRelays = stopRelays(relays);
+        updateMigrationStatus(statusStopRelays, status);
+
+        return statusStopRelays;
+    }
+
+    private PipelineOperationStatus startTargetPipelineElementsAndRelays(PipelineOperationStatus status) {
+        List<InvocableStreamPipesEntity> decryptedGraphs =
+                decryptSecrets(Collections.singletonList(migrationEntity.getTargetElement()));
+        List<SpDataStreamRelayContainer> relays = extractRelaysFromDataProcessor(decryptedGraphs);
+
+        updateRelaysToBePersisted(relays);
+
+        PipelineOperationStatus statusStartTarget = startPipelineElementsAndRelays(decryptedGraphs, relays);
+        updateMigrationStatus(statusStartTarget, status);
+
+        return statusStartTarget;
+    }
+
+    private PipelineOperationStatus stopOriginPipelineElementsAndRelays(PipelineOperationStatus status) {
+        List<InvocableStreamPipesEntity> graphs = Collections.singletonList(migrationEntity.getSourceElement());
+        List<SpDataStreamRelayContainer> relays = extractRelaysFromDataProcessor(graphs);
+
+        updateRelaysToBeDeleted(relays);
+
+        PipelineOperationStatus statusStopOrigin = stopPipelineElementsAndRelays(graphs, relays);
+        updateMigrationStatus(statusStopOrigin, status);
+
+        return statusStopOrigin;
+    }
+
+    // Helpers
+
+    private void updateRelaysToBePersisted(List<SpDataStreamRelayContainer> relays) {
+        relays.stream()
+                .filter(r -> r.getOutputStreamRelays().size() > 0)
+                .forEach(relaysToBePersisted::add);
+    }
+
+    private void updateRelaysToBeDeleted(List<SpDataStreamRelayContainer> relays) {
+        relays.stream()
+                .filter(r -> r.getOutputStreamRelays().size() > 0)
+                .forEach(relaysToBeDeleted::add);
+    }
+
+    private List<SpDataStreamRelayContainer> findRelaysAndFilterById(Set<String> relayIdsToRollback,
+                                                                     List<NamedStreamPipesEntity> predecessor,
+                                                                     InvocableStreamPipesEntity target) {
+        return findRelays(predecessor, target).stream()
+                .filter(relay -> relayIdsToRollback.contains(relay.getRunningStreamRelayInstanceId()))
+                .collect(Collectors.toList());
+    }
+
+    private void findAndComparePredecessorsInCurrentPipeline(PipelineGraph pipelineGraphBeforeMigration) {
+        predecessorsAfterMigration.forEach(migrationPredecessor ->
+                predecessorsBeforeMigration.add(findMatching(migrationPredecessor, pipelineGraphBeforeMigration)));
+    }
+
+    private void findPredecessorsInMigrationPipeline(PipelineGraph pipelineGraphAfterMigration) {
+        PipelineGraphHelpers.findStreams(pipelineGraphAfterMigration).forEach(stream ->
+                predecessorsAfterMigration.addAll(getPredecessors(stream, migrationEntity.getTargetElement(),
+                        pipelineGraphAfterMigration, new ArrayList<>())));
+    }
+
+    private List<SpDataSet> findDataSets() {
+        return pipeline.getStreams().stream()
+                .filter(s -> s instanceof SpDataSet)
+                .map(s -> new SpDataSet((SpDataSet) s))
+                .collect(Collectors.toList());
+    }
+
+    private void buildGraphWithRelays(PipelineGraph pipelineGraphAfterMigration) {
+        new InvocationGraphBuilder(pipelineGraphAfterMigration, pipeline.getPipelineId()).buildGraphs();
+    }
+
+    private void purgeExistingRelays() {
+        pipeline.getSepas().forEach(s -> s.setOutputStreamRelays(new ArrayList<>()));
+    }
+
+    private void updateMigrationStatus(PipelineOperationStatus partialStatus, PipelineOperationStatus status) {
+        // Add status to global migration status
+        partialStatus.getElementStatus().forEach(status::addPipelineElementStatus);
+    }
+
+    private List<SpDataStreamRelayContainer> extractRelaysFromDataProcessor(List<InvocableStreamPipesEntity> graphs) {
+        return graphs.stream()
+                .map(DataProcessorInvocation.class::cast)
+                .map(SpDataStreamRelayContainer::new)
+                .collect(Collectors.toList());
+    }
+
+    private Set<String> extractUniqueRelayIds(PipelineOperationStatus status) {
+        return status.getElementStatus().stream()
+                .filter(PipelineElementStatus::isSuccess)
+                .map(PipelineElementStatus::getElementId)
+                .collect(Collectors.toSet());
+    }
+}
diff --git a/streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/api/DataSinkPipelineElementResource.java b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/pipeline/PipelineMigrationHelpers.java
similarity index 67%
copy from streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/api/DataSinkPipelineElementResource.java
copy to streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/pipeline/PipelineMigrationHelpers.java
index c404896..2258cb9 100644
--- a/streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/api/DataSinkPipelineElementResource.java
+++ b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/pipeline/PipelineMigrationHelpers.java
@@ -15,16 +15,7 @@
  * limitations under the License.
  *
  */
-package org.apache.streampipes.node.controller.container.api;
+package org.apache.streampipes.manager.execution.pipeline;
 
-import org.apache.streampipes.model.graph.DataSinkInvocation;
-
-import javax.ws.rs.Path;
-
-//@Path("/api/v2/node/element/sec")
-//public class DataSinkPipelineElementResource extends InvocableEntityResource<DataSinkInvocation> {
-//
-//    public DataSinkPipelineElementResource() {
-//        super(DataSinkInvocation.class);
-//    }
-//}
+public class PipelineMigrationHelpers {
+}
diff --git a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/http/PipelineStorageService.java b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/pipeline/PipelineStorageService.java
similarity index 98%
rename from streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/http/PipelineStorageService.java
rename to streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/pipeline/PipelineStorageService.java
index 9df92ac..23fc3e2 100644
--- a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/http/PipelineStorageService.java
+++ b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/pipeline/PipelineStorageService.java
@@ -16,7 +16,7 @@
  *
  */
 
-package org.apache.streampipes.manager.execution.http;
+package org.apache.streampipes.manager.execution.pipeline;
 
 import org.apache.streampipes.manager.data.PipelineGraph;
 import org.apache.streampipes.manager.data.PipelineGraphBuilder;
diff --git a/streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/api/DataSinkPipelineElementResource.java b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/pipeline/migration/Command.java
similarity index 67%
copy from streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/api/DataSinkPipelineElementResource.java
copy to streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/pipeline/migration/Command.java
index c404896..fd4bdd9 100644
--- a/streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/api/DataSinkPipelineElementResource.java
+++ b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/pipeline/migration/Command.java
@@ -15,16 +15,9 @@
  * limitations under the License.
  *
  */
-package org.apache.streampipes.node.controller.container.api;
+package org.apache.streampipes.manager.execution.pipeline.migration;
 
-import org.apache.streampipes.model.graph.DataSinkInvocation;
-
-import javax.ws.rs.Path;
-
-//@Path("/api/v2/node/element/sec")
-//public class DataSinkPipelineElementResource extends InvocableEntityResource<DataSinkInvocation> {
-//
-//    public DataSinkPipelineElementResource() {
-//        super(DataSinkInvocation.class);
-//    }
-//}
+public interface Command {
+    void execute();
+    void rollback();
+}
diff --git a/streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/api/DataSinkPipelineElementResource.java b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/pipeline/migration/PipelineElementStartCommand.java
similarity index 67%
copy from streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/api/DataSinkPipelineElementResource.java
copy to streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/pipeline/migration/PipelineElementStartCommand.java
index c404896..4afbf46 100644
--- a/streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/api/DataSinkPipelineElementResource.java
+++ b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/pipeline/migration/PipelineElementStartCommand.java
@@ -15,16 +15,17 @@
  * limitations under the License.
  *
  */
-package org.apache.streampipes.node.controller.container.api;
+package org.apache.streampipes.manager.execution.pipeline.migration;
 
-import org.apache.streampipes.model.graph.DataSinkInvocation;
+public class PipelineElementStartCommand implements Command {
 
-import javax.ws.rs.Path;
+    @Override
+    public void execute() {
 
-//@Path("/api/v2/node/element/sec")
-//public class DataSinkPipelineElementResource extends InvocableEntityResource<DataSinkInvocation> {
-//
-//    public DataSinkPipelineElementResource() {
-//        super(DataSinkInvocation.class);
-//    }
-//}
+    }
+
+    @Override
+    public void rollback() {
+
+    }
+}
diff --git a/streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/api/DataProcessorPipelineElementResource.java b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/pipeline/migration/PipelineElementStopCommand.java
similarity index 65%
rename from streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/api/DataProcessorPipelineElementResource.java
rename to streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/pipeline/migration/PipelineElementStopCommand.java
index bb186e8..5a2bbd6 100644
--- a/streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/api/DataProcessorPipelineElementResource.java
+++ b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/pipeline/migration/PipelineElementStopCommand.java
@@ -15,16 +15,16 @@
  * limitations under the License.
  *
  */
-package org.apache.streampipes.node.controller.container.api;
+package org.apache.streampipes.manager.execution.pipeline.migration;
 
-import org.apache.streampipes.model.graph.DataProcessorInvocation;
+public class PipelineElementStopCommand implements Command {
+    @Override
+    public void execute() {
 
-import javax.ws.rs.Path;
+    }
 
-//@Path("/api/v2/node/element/sepa")
-//public class DataProcessorPipelineElementResource extends InvocableEntityResource<DataProcessorInvocation> {
-//
-//    public DataProcessorPipelineElementResource() {
-//        super(DataProcessorInvocation.class);
-//    }
-//}
+    @Override
+    public void rollback() {
+
+    }
+}
diff --git a/streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/api/DataSinkPipelineElementResource.java b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/pipeline/migration/RelayStartCommand.java
similarity index 67%
copy from streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/api/DataSinkPipelineElementResource.java
copy to streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/pipeline/migration/RelayStartCommand.java
index c404896..b2e242d 100644
--- a/streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/api/DataSinkPipelineElementResource.java
+++ b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/pipeline/migration/RelayStartCommand.java
@@ -15,16 +15,16 @@
  * limitations under the License.
  *
  */
-package org.apache.streampipes.node.controller.container.api;
+package org.apache.streampipes.manager.execution.pipeline.migration;
 
-import org.apache.streampipes.model.graph.DataSinkInvocation;
+public class RelayStartCommand implements Command {
+    @Override
+    public void execute() {
 
-import javax.ws.rs.Path;
+    }
 
-//@Path("/api/v2/node/element/sec")
-//public class DataSinkPipelineElementResource extends InvocableEntityResource<DataSinkInvocation> {
-//
-//    public DataSinkPipelineElementResource() {
-//        super(DataSinkInvocation.class);
-//    }
-//}
+    @Override
+    public void rollback() {
+
+    }
+}
diff --git a/streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/api/DataSinkPipelineElementResource.java b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/pipeline/migration/RelayStopCommand.java
similarity index 67%
copy from streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/api/DataSinkPipelineElementResource.java
copy to streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/pipeline/migration/RelayStopCommand.java
index c404896..1938268 100644
--- a/streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/api/DataSinkPipelineElementResource.java
+++ b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/pipeline/migration/RelayStopCommand.java
@@ -15,16 +15,16 @@
  * limitations under the License.
  *
  */
-package org.apache.streampipes.node.controller.container.api;
+package org.apache.streampipes.manager.execution.pipeline.migration;
 
-import org.apache.streampipes.model.graph.DataSinkInvocation;
+public class RelayStopCommand implements Command {
+    @Override
+    public void execute() {
 
-import javax.ws.rs.Path;
+    }
 
-//@Path("/api/v2/node/element/sec")
-//public class DataSinkPipelineElementResource extends InvocableEntityResource<DataSinkInvocation> {
-//
-//    public DataSinkPipelineElementResource() {
-//        super(DataSinkInvocation.class);
-//    }
-//}
+    @Override
+    public void rollback() {
+
+    }
+}
diff --git a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/matching/InvocationGraphBuilder.java b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/matching/InvocationGraphBuilder.java
index f7cae4f..533b8a2 100644
--- a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/matching/InvocationGraphBuilder.java
+++ b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/matching/InvocationGraphBuilder.java
@@ -28,7 +28,7 @@ import org.apache.streampipes.manager.data.PipelineGraphHelpers;
 import org.apache.streampipes.manager.matching.output.OutputSchemaFactory;
 import org.apache.streampipes.manager.matching.output.OutputSchemaGenerator;
 import org.apache.streampipes.model.SpDataStream;
-import org.apache.streampipes.model.SpDataStreamRelay;
+import org.apache.streampipes.model.eventrelay.SpDataStreamRelay;
 import org.apache.streampipes.model.base.InvocableStreamPipesEntity;
 import org.apache.streampipes.model.base.NamedStreamPipesEntity;
 import org.apache.streampipes.model.graph.DataProcessorInvocation;
diff --git a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/migration/PipelineElementMigrationHandler.java b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/migration/PipelineElementMigrationHandler.java
new file mode 100644
index 0000000..4d3f6d5
--- /dev/null
+++ b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/migration/PipelineElementMigrationHandler.java
@@ -0,0 +1,147 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.apache.streampipes.manager.migration;
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.streampipes.commons.exceptions.SpRuntimeException;
+import org.apache.streampipes.manager.execution.pipeline.PipelineMigrationExecutor;
+import org.apache.streampipes.manager.operations.Operations;
+import org.apache.streampipes.model.graph.DataProcessorInvocation;
+import org.apache.streampipes.model.pipeline.Pipeline;
+import org.apache.streampipes.model.pipeline.PipelineElementMigrationEntity;
+import org.apache.streampipes.model.pipeline.PipelineElementStatus;
+import org.apache.streampipes.model.pipeline.PipelineOperationStatus;
+import org.apache.streampipes.storage.api.IPipelineStorage;
+import org.apache.streampipes.storage.management.StorageDispatcher;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Optional;
+
+public class PipelineElementMigrationHandler {
+
+    private final PipelineOperationStatus pipelineMigrationStatus;
+    private final Pipeline desiredPipeline;
+    private final Pipeline migrationPipeline;
+    private Pipeline currentPipeline;
+    private final boolean visualize;
+    private final boolean storeStatus;
+    private final boolean monitor;
+
+    public PipelineElementMigrationHandler(Pipeline desiredPipeline, boolean visualize, boolean storeStatus,
+                                           boolean monitor) {
+        this.pipelineMigrationStatus = new PipelineOperationStatus();
+        this.desiredPipeline = desiredPipeline;
+        this.currentPipeline = getPipelineById(desiredPipeline.getPipelineId());
+        this.migrationPipeline = getPipelineById(desiredPipeline.getPipelineId());
+        this.visualize = visualize;
+        this.storeStatus = storeStatus;
+        this.monitor = monitor;
+    }
+
+    public PipelineOperationStatus handlePipelineMigration() {
+        migratePipelineElementOrRollback();
+        return verifyPipelineMigrationStatus(pipelineMigrationStatus,
+                "Successfully migrated Pipeline Elements in Pipeline " + desiredPipeline.getName(),
+                "Could not migrate all Pipeline Elements in Pipeline " + desiredPipeline.getName());
+    }
+
+    private void migratePipelineElementOrRollback() {
+        List<PipelineElementMigrationEntity> migrationEntityList = getPipelineDelta(desiredPipeline, migrationPipeline);
+
+        migrationEntityList.forEach(entity -> {
+            swapPipelineElement(migrationPipeline, entity);
+
+            PipelineOperationStatus entityStatus = migratePipelineElement(entity);
+
+            entityStatus.getElementStatus()
+                    .forEach(this.pipelineMigrationStatus::addPipelineElementStatus);
+
+            if (entityStatus.isSuccess()) {
+                try {
+                    currentPipeline = deepCopyPipeline(migrationPipeline);
+                } catch (JsonProcessingException e) {
+                    throw new SpRuntimeException("Could not deep copy pipeline for migration: " + e.getMessage(), e);
+                }
+            } else {
+                PipelineElementMigrationEntity failedEntity =
+                        new PipelineElementMigrationEntity(entity.getTargetElement(), entity.getSourceElement());
+
+                swapPipelineElement(migrationPipeline, failedEntity);
+            }
+        });
+
+        Operations.overwritePipeline(migrationPipeline);
+    }
+
+    private PipelineOperationStatus migratePipelineElement(PipelineElementMigrationEntity migrationEntity) {
+        return new PipelineMigrationExecutor(migrationPipeline, currentPipeline, migrationEntity, visualize,
+                storeStatus, monitor).migratePipelineElement();
+    }
+
+
+    // Helpers
+
+    private Pipeline getPipelineById(String pipelineId) {
+        return getPipelineStorageApi().getPipeline(pipelineId);
+    }
+
+    private IPipelineStorage getPipelineStorageApi() {
+        return StorageDispatcher.INSTANCE.getNoSqlStore().getPipelineStorageAPI();
+    }
+
+    private List<PipelineElementMigrationEntity> getPipelineDelta(Pipeline pipelineX, Pipeline pipelineY){
+        List<PipelineElementMigrationEntity> delta = new ArrayList<>();
+        pipelineX.getSepas().forEach(iX -> {
+            if (pipelineY.getSepas().stream().filter(iY -> iY.getElementId().equals(iX.getElementId()))
+                    .noneMatch(iY -> iY.getDeploymentTargetNodeId().equals(iX.getDeploymentTargetNodeId()))){
+                Optional<DataProcessorInvocation> invocationY = pipelineY.getSepas().stream().
+                        filter(iY -> iY.getDeploymentRunningInstanceId().equals(iX.getDeploymentRunningInstanceId())).findFirst();
+                invocationY.ifPresent(dataProcessorInvocation -> delta.add(new PipelineElementMigrationEntity(dataProcessorInvocation, iX)));
+            }
+        });
+        return delta;
+    }
+
+    private PipelineOperationStatus verifyPipelineMigrationStatus(PipelineOperationStatus status, String successMessage,
+                                                                  String errorMessage) {
+        //Duplicate from method in GraphSubmitter
+        status.setSuccess(status.getElementStatus().stream().allMatch(PipelineElementStatus::isSuccess));
+        if (status.isSuccess()) {
+            status.setTitle(successMessage);
+        } else {
+            status.setTitle(errorMessage);
+        }
+        return status;
+    }
+
+    private void swapPipelineElement(Pipeline exchangePipeline,
+                                           PipelineElementMigrationEntity migrationEntity){
+        if (migrationEntity.getTargetElement() instanceof DataProcessorInvocation){
+            int index = exchangePipeline.getSepas().indexOf(migrationEntity.getSourceElement());
+            exchangePipeline.getSepas().remove(index);
+            exchangePipeline.getSepas().add(index, (DataProcessorInvocation) migrationEntity.getTargetElement());
+        }
+    }
+
+    public static Pipeline deepCopyPipeline(Pipeline object) throws JsonProcessingException {
+        ObjectMapper objectMapper = new ObjectMapper();
+        return objectMapper.readValue(objectMapper.writeValueAsString(object), Pipeline.class);
+    }
+}
diff --git a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/node/AbstractClusterManager.java b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/node/AbstractClusterManager.java
index b5803d5..dce1e06 100644
--- a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/node/AbstractClusterManager.java
+++ b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/node/AbstractClusterManager.java
@@ -17,8 +17,11 @@
  */
 package org.apache.streampipes.manager.node;
 
+import com.fasterxml.jackson.core.JsonProcessingException;
 import org.apache.http.client.fluent.Request;
 import org.apache.http.entity.ContentType;
+import org.apache.streampipes.commons.exceptions.SpRuntimeException;
+import org.apache.streampipes.model.eventrelay.SpDataStreamRelayContainer;
 import org.apache.streampipes.model.node.NodeInfoDescription;
 import org.apache.streampipes.serializers.json.JacksonSerializer;
 import org.slf4j.Logger;
@@ -30,25 +33,50 @@ public abstract class AbstractClusterManager {
     private static final Logger LOG = LoggerFactory.getLogger(AbstractClusterManager.class.getCanonicalName());
 
     private static final String PROTOCOL = "http://";
-    private static final String SLASH = "/";
     private static final String COLON = ":";
     private static final long RETRY_INTERVAL_MS = 5000;
-    private static final Object BASE_NODE_CONTROLLER_INFO_ROUTE = "/api/v2/node/info";
     private static final int CONNECT_TIMEOUT = 1000;
+    private static final String BASE_NODE_CONTROLLER_INFO_ROUTE = "/api/v2/node/info";
+    private static final String BASE_NODE_CONTROLLER_RELAY_ROUTE = "/api/v2/node/stream/relay";
 
-    protected static boolean syncStateUpdateWithRemoteNodeController(NodeInfoDescription desc, boolean activate) {
+    public enum RequestOptions {
+        GET,POST,PUT,DELETE
+    }
+
+    protected static <T> boolean syncWithNodeController(T element, NodeSyncOptions sync) {
+        switch (sync) {
+            case ACTIVATE_NODE:
+                return sync(element, "/activate", RequestOptions.POST, false, NodeInfoDescription.class);
+            case DEACTIVATE_NODE:
+                return sync(element, "/deactivate", RequestOptions.POST, false, NodeInfoDescription.class);
+            case UPDATE_NODE:
+                return sync(element, "", RequestOptions.PUT, true, NodeInfoDescription.class);
+            case RESTART_RELAYS:
+                return sync(element, "/invoke", RequestOptions.POST, true, SpDataStreamRelayContainer.class);
+            default:
+                return false;
+        }
+    }
+
+    private static <T> boolean sync(T element, String subroute, RequestOptions request, boolean withBody, Class<?> type) {
         boolean synced = false;
-        String url;
-        if (activate) {
-            url = generateEndpoint(desc, "/activate");
-        } else {
-            url = generateEndpoint(desc, "/deactivate");
+
+        String body = "{}";
+        if (withBody) {
+            body = jackson(element);
         }
-        LOG.info("Trying to sync state update with node controller=" + url);
+
+        String url = generateEndpoint(element, subroute, type);
+        LOG.info("Trying to sync with node controller=" + url);
 
         boolean connected = false;
         while (!connected) {
-            connected = post(url);
+            // call node controller REST endpoints
+            switch (request) {
+                case POST: connected = post(url, body);
+                case PUT : connected = put(url, body);
+            }
+
             if (!connected) {
                 LOG.info("Retrying in {} seconds", (RETRY_INTERVAL_MS / 10000));
                 try {
@@ -57,46 +85,43 @@ public abstract class AbstractClusterManager {
                     e.printStackTrace();
                 }
             }
-        }
-        synced = true;
-        return synced;
-    }
-
-    protected static boolean syncWithRemoteNodeController(NodeInfoDescription desc) {
-        boolean synced = false;
-        try {
-            String body = JacksonSerializer.getObjectMapper().writeValueAsString(desc);
-            String url = generateEndpoint(desc);
-            LOG.info("Trying to sync description updates with node controller=" + url);
-
-            boolean connected = false;
-            while (!connected) {
-                connected = put(url, body);
-                if (!connected) {
-                    LOG.info("Retrying in {} seconds", (RETRY_INTERVAL_MS / 10000));
-                    try {
-                        Thread.sleep(RETRY_INTERVAL_MS);
-                    } catch (InterruptedException e) {
-                        e.printStackTrace();
-                    }
-                }
-            }
             synced = true;
-        } catch (IOException e) {
-            e.printStackTrace();
         }
+        LOG.info("Successfully synced with node controller=" + url);
         return synced;
     }
 
-    protected static String generateEndpoint(NodeInfoDescription desc) {
-        return generateEndpoint(desc, "");
+    // Helpers
+
+    private static <T> String generateEndpoint(T desc, String subroute, Class<?> type) {
+        if (type.equals(NodeInfoDescription.class)) {
+            NodeInfoDescription d = (NodeInfoDescription) desc;
+            return PROTOCOL
+                    + d.getHostname()
+                    + COLON
+                    + d.getPort()
+                    + BASE_NODE_CONTROLLER_INFO_ROUTE
+                    + subroute;
+        } else {
+            SpDataStreamRelayContainer d = (SpDataStreamRelayContainer) desc;
+            return PROTOCOL
+                    + d.getDeploymentTargetNodeHostname()
+                    + COLON
+                    + d.getDeploymentTargetNodePort()
+                    + BASE_NODE_CONTROLLER_RELAY_ROUTE
+                    + subroute;
+        }
     }
 
-    protected static String generateEndpoint(NodeInfoDescription desc, String subroute) {
-        return PROTOCOL + desc.getHostname() + COLON + desc.getPort() + BASE_NODE_CONTROLLER_INFO_ROUTE + subroute;
+    private static <T> String jackson(T desc) {
+        try {
+            return JacksonSerializer.getObjectMapper().writeValueAsString(desc);
+        } catch (JsonProcessingException e) {
+            throw new SpRuntimeException("Could not serialize node controller description");
+        }
     }
 
-    protected static boolean put(String url, String body) {
+    private static boolean put(String url, String body) {
         try {
             Request.Put(url)
                     .bodyString(body, ContentType.APPLICATION_JSON)
@@ -109,10 +134,10 @@ public abstract class AbstractClusterManager {
         return false;
     }
 
-    protected static boolean post(String url) {
+    private static boolean post(String url, String body) {
         try {
             Request.Post(url)
-                    .bodyString("{}", ContentType.APPLICATION_JSON)
+                    .bodyString(body, ContentType.APPLICATION_JSON)
                     .connectTimeout(CONNECT_TIMEOUT)
                     .execute();
             return true;
@@ -121,4 +146,5 @@ public abstract class AbstractClusterManager {
         }
         return false;
     }
+
 }
diff --git a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/node/AvailableNodesFetcher.java b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/node/AvailableNodesFetcher.java
index c2d9278..2071e12 100644
--- a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/node/AvailableNodesFetcher.java
+++ b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/node/AvailableNodesFetcher.java
@@ -28,6 +28,7 @@ import java.util.List;
 
 import javax.ws.rs.core.MediaType;
 
+@Deprecated
 public class AvailableNodesFetcher {
 
     public AvailableNodesFetcher() {
diff --git a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/node/NodeClusterManager.java b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/node/NodeClusterManager.java
index 3f58f2f..007dc15 100644
--- a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/node/NodeClusterManager.java
+++ b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/node/NodeClusterManager.java
@@ -17,9 +17,13 @@
  */
 package org.apache.streampipes.manager.node;
 
+import org.apache.streampipes.model.eventrelay.SpDataStreamRelayContainer;
 import org.apache.streampipes.model.message.Message;
+import org.apache.streampipes.model.message.NotificationType;
 import org.apache.streampipes.model.message.Notifications;
 import org.apache.streampipes.model.node.NodeInfoDescription;
+import org.apache.streampipes.storage.api.INodeDataStreamRelay;
+import org.apache.streampipes.storage.api.INodeInfoStorage;
 import org.apache.streampipes.storage.management.StorageDispatcher;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -28,70 +32,132 @@ import java.util.List;
 import java.util.Optional;
 
 public class NodeClusterManager extends AbstractClusterManager {
-
     private static final Logger LOG = LoggerFactory.getLogger(NodeClusterManager.class.getCanonicalName());
 
+
     public static List<NodeInfoDescription> getAvailableNodes() {
         //return new AvailableNodesFetcher().fetchNodes();
-        return StorageDispatcher.INSTANCE.getNoSqlStore().getNodeStorage().getAllActiveNodes();
+        return getNodeStorageApi().getAllActiveNodes();
     }
 
+
     public static List<NodeInfoDescription> getAllNodes() {
         //return new AvailableNodesFetcher().fetchNodes();
-        return StorageDispatcher.INSTANCE.getNoSqlStore().getNodeStorage().getAllNodes();
+        return getNodeStorageApi().getAllNodes();
     }
 
     public static Message updateNode(NodeInfoDescription desc) {
-        boolean successfullyUpdated = syncWithRemoteNodeController(desc);
+        boolean successfullyUpdated = syncWithNodeController(desc, NodeSyncOptions.UPDATE_NODE);
         if (successfullyUpdated) {
-            StorageDispatcher.INSTANCE.getNoSqlStore().getNodeStorage().updateNode(desc);
+            getNodeStorageApi().updateNode(desc);
             return Notifications.success("Node updated");
         }
         return Notifications.error("Could not update node");
     }
 
     public static boolean deactivateNode(String nodeControllerId) {
-        Optional<NodeInfoDescription> storedNode =
-                StorageDispatcher.INSTANCE.getNoSqlStore().getNodeStorage().getNode(nodeControllerId);
+        Optional<NodeInfoDescription> storedNode = getNodeStorageApi().getNode(nodeControllerId);
         boolean status = false;
         if (storedNode.isPresent()) {
-            StorageDispatcher.INSTANCE.getNoSqlStore().getNodeStorage().deactivateNode(nodeControllerId);
-            status = syncStateUpdateWithRemoteNodeController(storedNode.get(), false);
+            getNodeStorageApi().deactivateNode(nodeControllerId);
+            status = syncWithNodeController(storedNode.get(), NodeSyncOptions.DEACTIVATE_NODE);
         }
         return status;
     }
 
     public static boolean activateNode(String nodeControllerId) {
-        Optional<NodeInfoDescription> storedNode =
-                StorageDispatcher.INSTANCE.getNoSqlStore().getNodeStorage().getNode(nodeControllerId);
+        Optional<NodeInfoDescription> storedNode = getNodeStorageApi().getNode(nodeControllerId);
         boolean status = false;
         if (storedNode.isPresent()) {
-            StorageDispatcher.INSTANCE.getNoSqlStore().getNodeStorage().activateNode(nodeControllerId);
-            status = syncStateUpdateWithRemoteNodeController(storedNode.get(), true);
+            getNodeStorageApi().activateNode(nodeControllerId);
+            status = syncWithNodeController(storedNode.get(), NodeSyncOptions.ACTIVATE_NODE);
         }
         return status;
     }
 
-    public static void addNode(NodeInfoDescription desc) {
-        List<NodeInfoDescription> allNodes =
-                StorageDispatcher.INSTANCE.getNoSqlStore().getNodeStorage().getAllNodes();
+    public static Message addOrRejoin(NodeInfoDescription desc) {
+        Optional<NodeInfoDescription> latestDesc = getLatestNodeOrElseEmpty(desc.getNodeControllerId());
 
         boolean alreadyRegistered = false;
-        if (allNodes.size() > 0) {
-            alreadyRegistered = allNodes.stream()
-                    .anyMatch(n -> n.getNodeControllerId().equals(desc.getNodeControllerId()));
+        if (latestDesc.isPresent()) {
+            alreadyRegistered = true;
         }
 
         if (!alreadyRegistered) {
-            LOG.info("New cluster node join registration request on from http://{}:{}", desc.getHostname(), desc.getPort());
-            StorageDispatcher.INSTANCE.getNoSqlStore().getNodeStorage().storeNode(desc);
-            LOG.info("New cluster node successfully joined http://{}:{}", desc.getHostname(), desc.getPort());
+            LOG.info("New cluster node join request from http://{}:{}", desc.getHostname(), desc.getPort());
+            return addNewNode(desc);
         } else {
             LOG.info("Re-joined cluster node from http://{}:{}", desc.getHostname(), desc.getPort());
+            return rejoinAndSyncNode(latestDesc.get());
+        }
+    }
+
+    private static Optional<NodeInfoDescription> getLatestNodeOrElseEmpty(String nodeControllerId) {
+        return getNodeStorageApi().getAllNodes().stream()
+                .filter(n -> n.getNodeControllerId().equals(nodeControllerId))
+                .findAny();
+    }
+
+    private static Message addNewNode(NodeInfoDescription desc) throws RuntimeException {
+        try {
+            getNodeStorageApi().storeNode(desc);
+            LOG.info("New cluster node successfully joined http://{}:{}", desc.getHostname(), desc.getPort());
+            return Notifications.success(NotificationType.NODE_JOIN_SUCCESS);
+        } catch (Exception e) {
+            return Notifications.success(NotificationType.NODE_JOIN_ERROR);
+        }
+    }
+
+    private static Message rejoinAndSyncNode(NodeInfoDescription desc) {
+        LOG.info("Sync latest node description to http://{}:{}", desc.getHostname(), desc.getPort());
+        boolean success = syncWithNodeController(desc, NodeSyncOptions.UPDATE_NODE);
+        if (success) {
+            return restartRelays(desc);
+        }
+        return Notifications.success(NotificationType.NODE_JOIN_ERROR);
+    }
+
+    private static Message restartRelays(NodeInfoDescription desc) {
+        List<SpDataStreamRelayContainer> runningRelays = getDataStreamRelay(desc.getNodeControllerId());
+        if (runningRelays.size() > 0) {
+            runningRelays.forEach(relay -> {
+                LOG.info("Sync active relays name={} to http://{}:{}", relay.getName(), desc.getHostname(),
+                        desc.getPort());
+                syncWithNodeController(relay, NodeSyncOptions.RESTART_RELAYS);
+            });
         }
+        return Notifications.success(NotificationType.NODE_JOIN_SUCCESS);
     }
 
     public static void deleteNode(String nodeControllerId) {
-        StorageDispatcher.INSTANCE.getNoSqlStore().getNodeStorage().deleteNode(nodeControllerId);
+        getNodeStorageApi().deleteNode(nodeControllerId);
     }
+
+
+    public static void persistDataStreamRelay(SpDataStreamRelayContainer relayContainer) {
+        getNodeDataStreamRelayStorageApi().addRelayContainer(relayContainer);
+    }
+
+    public static List<SpDataStreamRelayContainer> getDataStreamRelay(String nodeControllerId) {
+        return getNodeDataStreamRelayStorageApi().getAllByNodeControllerId(nodeControllerId);
+    }
+
+    public static void updateDataStreamRelay(SpDataStreamRelayContainer relayContainer) {
+        getNodeDataStreamRelayStorageApi().updateRelayContainer(relayContainer);
+    }
+
+    public static void deleteDataStreamRelay(SpDataStreamRelayContainer relayContainer) {
+        getNodeDataStreamRelayStorageApi().deleteRelayContainer(relayContainer);
+    }
+
+    // Helpers
+
+    private static INodeInfoStorage getNodeStorageApi() {
+        return StorageDispatcher.INSTANCE.getNoSqlStore().getNodeStorage();
+    }
+
+    private static INodeDataStreamRelay getNodeDataStreamRelayStorageApi(){
+        return StorageDispatcher.INSTANCE.getNoSqlStore().getNodeDataStreamRelayStorage();
+    }
+
 }
diff --git a/streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/api/DataSinkPipelineElementResource.java b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/node/NodeSyncOptions.java
similarity index 67%
rename from streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/api/DataSinkPipelineElementResource.java
rename to streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/node/NodeSyncOptions.java
index c404896..644e688 100644
--- a/streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/api/DataSinkPipelineElementResource.java
+++ b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/node/NodeSyncOptions.java
@@ -15,16 +15,8 @@
  * limitations under the License.
  *
  */
-package org.apache.streampipes.node.controller.container.api;
+package org.apache.streampipes.manager.node;
 
-import org.apache.streampipes.model.graph.DataSinkInvocation;
-
-import javax.ws.rs.Path;
-
-//@Path("/api/v2/node/element/sec")
-//public class DataSinkPipelineElementResource extends InvocableEntityResource<DataSinkInvocation> {
-//
-//    public DataSinkPipelineElementResource() {
-//        super(DataSinkInvocation.class);
-//    }
-//}
+public enum NodeSyncOptions {
+    ACTIVATE_NODE, DEACTIVATE_NODE, UPDATE_NODE,RESTART_RELAYS;
+}
diff --git a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/operations/Operations.java b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/operations/Operations.java
index 7861a6f..6047b1d 100644
--- a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/operations/Operations.java
+++ b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/operations/Operations.java
@@ -18,16 +18,15 @@
 
 package org.apache.streampipes.manager.operations;
 
-import com.fasterxml.jackson.core.JsonProcessingException;
 import org.apache.streampipes.commons.exceptions.NoSuitableSepasAvailableException;
 import org.apache.streampipes.commons.exceptions.SepaParseException;
 import org.apache.streampipes.commons.exceptions.SpRuntimeException;
 import org.apache.streampipes.manager.endpoint.EndpointItemFetcher;
-import org.apache.streampipes.manager.execution.http.MigrationHelpers;
-import org.apache.streampipes.manager.execution.http.PipelineExecutor;
-import org.apache.streampipes.manager.execution.http.PipelineStorageService;
+import org.apache.streampipes.manager.execution.pipeline.PipelineExecutor;
+import org.apache.streampipes.manager.execution.pipeline.PipelineStorageService;
 import org.apache.streampipes.manager.matching.DataSetGroundingSelector;
 import org.apache.streampipes.manager.matching.PipelineVerificationHandler;
+import org.apache.streampipes.manager.migration.PipelineElementMigrationHandler;
 import org.apache.streampipes.manager.recommender.ElementRecommender;
 import org.apache.streampipes.manager.remote.ContainerProvidedOptionsHandler;
 import org.apache.streampipes.manager.runtime.PipelineElementRuntimeInfoFetcher;
@@ -38,28 +37,20 @@ import org.apache.streampipes.manager.topic.WildcardTopicGenerator;
 import org.apache.streampipes.manager.verification.extractor.TypeExtractor;
 import org.apache.streampipes.model.SpDataSet;
 import org.apache.streampipes.model.SpDataStream;
-import org.apache.streampipes.model.base.InvocableStreamPipesEntity;
 import org.apache.streampipes.model.client.endpoint.RdfEndpoint;
 import org.apache.streampipes.model.client.endpoint.RdfEndpointItem;
-import org.apache.streampipes.model.graph.DataProcessorInvocation;
 import org.apache.streampipes.model.message.DataSetModificationMessage;
 import org.apache.streampipes.model.message.Message;
 import org.apache.streampipes.model.message.PipelineModificationMessage;
-import org.apache.streampipes.model.pipeline.Pipeline;
-import org.apache.streampipes.model.pipeline.PipelineElementRecommendationMessage;
-import org.apache.streampipes.model.pipeline.PipelineElementStatus;
-import org.apache.streampipes.model.pipeline.PipelineOperationStatus;
+import org.apache.streampipes.model.pipeline.*;
 import org.apache.streampipes.model.runtime.RuntimeOptionsRequest;
 import org.apache.streampipes.model.runtime.RuntimeOptionsResponse;
 import org.apache.streampipes.model.template.PipelineTemplateDescription;
 import org.apache.streampipes.model.template.PipelineTemplateInvocation;
-import org.apache.streampipes.sdk.helpers.Tuple2;
 import org.apache.streampipes.storage.management.StorageDispatcher;
 
 import java.util.ArrayList;
 import java.util.List;
-import java.util.Optional;
-import java.util.stream.Collectors;
 
 
 /**
@@ -196,35 +187,9 @@ public class Operations {
     return PipelineElementRuntimeInfoFetcher.INSTANCE.getCurrentData(spDataStream);
   }
 
-  public static PipelineOperationStatus migratePipelineProcessors(Pipeline newPipeline, boolean visualize, boolean storeStatus,
-                                                                  boolean monitor) throws SpRuntimeException{
-
-
-    Pipeline currentPipeline = StorageDispatcher.INSTANCE.getNoSqlStore().getPipelineStorageAPI().getPipeline(newPipeline.getPipelineId());
-    Pipeline migrationPipeline = StorageDispatcher.INSTANCE.getNoSqlStore().getPipelineStorageAPI().getPipeline(newPipeline.getPipelineId());
-    PipelineOperationStatus status = new PipelineOperationStatus();
-
-    for(Tuple2<DataProcessorInvocation, DataProcessorInvocation> t : MigrationHelpers.getDelta(newPipeline, migrationPipeline)){
-      MigrationHelpers.exchangePipelineElement(migrationPipeline, t);
-
-      PipelineOperationStatus migrationStatus = new PipelineExecutor(migrationPipeline, visualize, storeStatus, monitor)
-              .migratePipelineElement(currentPipeline, t);
-      migrationStatus.getElementStatus().forEach(status::addPipelineElementStatus);
-      if (migrationStatus.isSuccess()) {
-        try {
-          currentPipeline = MigrationHelpers.deepCopyPipeline(migrationPipeline);
-        } catch (JsonProcessingException e) {
-          throw new SpRuntimeException(e);
-        }
-      } else {
-        Tuple2<DataProcessorInvocation, DataProcessorInvocation> failedMigration = new Tuple2<>(t.b, t.a);
-        MigrationHelpers.exchangePipelineElement(migrationPipeline, failedMigration);
-      }
-    }
-
-    overwritePipeline(migrationPipeline);
-    return MigrationHelpers.verifyPipelineOperationStatus(status,
-            "Successfully migrated Pipeline Elements in Pipeline " + newPipeline.getName(),
-            "Could not migrate all Pipeline Elements in Pipeline " + newPipeline.getName());
+  public static PipelineOperationStatus handlePipelineElementMigration(Pipeline desiredPipeline, boolean visualize,
+                                                                       boolean storeStatus, boolean monitor) throws SpRuntimeException{
+    return new PipelineElementMigrationHandler(desiredPipeline, visualize, storeStatus, monitor).handlePipelineMigration();
   }
+
 }
diff --git a/streampipes-rest/src/main/java/org/apache/streampipes/rest/impl/Node.java b/streampipes-rest/src/main/java/org/apache/streampipes/rest/impl/Node.java
index d0f8c9f..e7db8e0 100644
--- a/streampipes-rest/src/main/java/org/apache/streampipes/rest/impl/Node.java
+++ b/streampipes-rest/src/main/java/org/apache/streampipes/rest/impl/Node.java
@@ -40,8 +40,7 @@ public class Node extends AbstractRestResource implements INode {
     @Produces(MediaType.APPLICATION_JSON)
     @Override
     public Response addNode(@PathParam("username") String username, NodeInfoDescription desc) {
-        NodeClusterManager.addNode(desc);
-        return statusMessage(Notifications.success(NotificationType.STORAGE_SUCCESS));
+        return statusMessage(NodeClusterManager.addOrRejoin(desc));
     }
 
     @PUT
@@ -51,7 +50,8 @@ public class Node extends AbstractRestResource implements INode {
     @Produces(MediaType.APPLICATION_JSON)
     @Override
     public Response updateNode(@PathParam("username") String username,
-                               @PathParam("nodeControllerId") String nodeControllerId, NodeInfoDescription desc) {
+                               @PathParam("nodeControllerId") String nodeControllerId,
+                               NodeInfoDescription desc) {
         return statusMessage(NodeClusterManager.updateNode(desc));
     }
 
diff --git a/streampipes-rest/src/main/java/org/apache/streampipes/rest/impl/PipelineResource.java b/streampipes-rest/src/main/java/org/apache/streampipes/rest/impl/PipelineResource.java
index 28c0dee..e8f005e 100644
--- a/streampipes-rest/src/main/java/org/apache/streampipes/rest/impl/PipelineResource.java
+++ b/streampipes-rest/src/main/java/org/apache/streampipes/rest/impl/PipelineResource.java
@@ -270,11 +270,12 @@ public class PipelineResource extends AbstractAuthGuardedRestResource {
   @JacksonSerialized
   @Operation(summary = "Migrate pipeline elements to new node",
           tags = {"Pipeline"})
-  public Response migratePipelineProcessors(@PathParam("username") String username,
-                                            @PathParam("pipelineId") String pipelineId,
-                                            Pipeline pipelineNew) {
+  public Response migratePipelineElements(@PathParam("username") String username,
+                                          @PathParam("pipelineId") String pipelineId,
+                                          Pipeline desiredPipeline) {
     try {
-      PipelineOperationStatus status = Operations.migratePipelineProcessors(pipelineNew, true, true, true);
+      PipelineOperationStatus status = Operations.handlePipelineElementMigration(desiredPipeline,
+              true, true, true);
       return ok(status);
     } catch (Exception e) {
       e.printStackTrace();
diff --git a/streampipes-serializers-json/src/main/java/org/apache/streampipes/serializers/json/GsonSerializer.java b/streampipes-serializers-json/src/main/java/org/apache/streampipes/serializers/json/GsonSerializer.java
index cf056c2..8a6f628 100644
--- a/streampipes-serializers-json/src/main/java/org/apache/streampipes/serializers/json/GsonSerializer.java
+++ b/streampipes-serializers-json/src/main/java/org/apache/streampipes/serializers/json/GsonSerializer.java
@@ -24,6 +24,8 @@ import com.google.gson.Gson;
 import com.google.gson.GsonBuilder;
 import org.apache.streampipes.model.*;
 import org.apache.streampipes.model.connect.rules.value.*;
+import org.apache.streampipes.model.eventrelay.SpDataStreamRelay;
+import org.apache.streampipes.model.eventrelay.SpDataStreamRelayContainer;
 import org.apache.streampipes.model.message.Message;
 import org.apache.streampipes.model.connect.adapter.*;
 import org.apache.streampipes.model.connect.rules.schema.CreateNestedRuleDescription;
diff --git a/streampipes-serializers-jsonld/src/main/java/org/apache/streampipes/serializers/jsonld/CustomAnnotationProvider.java b/streampipes-serializers-jsonld/src/main/java/org/apache/streampipes/serializers/jsonld/CustomAnnotationProvider.java
index b9ee0fa..d3da942 100644
--- a/streampipes-serializers-jsonld/src/main/java/org/apache/streampipes/serializers/jsonld/CustomAnnotationProvider.java
+++ b/streampipes-serializers-jsonld/src/main/java/org/apache/streampipes/serializers/jsonld/CustomAnnotationProvider.java
@@ -41,6 +41,8 @@ import org.apache.streampipes.model.dashboard.DashboardWidgetModel;
 import org.apache.streampipes.model.dashboard.VisualizablePipeline;
 import org.apache.streampipes.model.datalake.DataExplorerWidgetModel;
 import org.apache.streampipes.model.datalake.DataLakeMeasure;
+import org.apache.streampipes.model.eventrelay.SpDataStreamRelay;
+import org.apache.streampipes.model.eventrelay.SpDataStreamRelayContainer;
 import org.apache.streampipes.model.graph.*;
 import org.apache.streampipes.model.grounding.*;
 import org.apache.streampipes.model.monitoring.ElementStatusInfoSettings;
diff --git a/streampipes-serializers-jsonld/src/main/java/org/apache/streampipes/serializers/jsonld/JsonLdTransformer.java b/streampipes-serializers-jsonld/src/main/java/org/apache/streampipes/serializers/jsonld/JsonLdTransformer.java
index 55b4781..84ae05e 100644
--- a/streampipes-serializers-jsonld/src/main/java/org/apache/streampipes/serializers/jsonld/JsonLdTransformer.java
+++ b/streampipes-serializers-jsonld/src/main/java/org/apache/streampipes/serializers/jsonld/JsonLdTransformer.java
@@ -70,6 +70,7 @@ public class JsonLdTransformer implements RdfTransformer {
           StreamPipes.DASHBOARD_WIDGET_MODEL,
           StreamPipes.DASHBOARD_MODEL,
           StreamPipes.DATA_EXPLORER_WIDGET_MODEL,
+          StreamPipes.DATA_STREAM_RELAY,
           StreamPipes.DATA_STREAM_RELAY_CONTAINER,
           StreamPipes.NODE_INFO_DESCRIPTION,
           StreamPipes.DEPLOYMENT_CONTAINER,
diff --git a/streampipes-storage-api/src/main/java/org/apache/streampipes/storage/api/INoSqlStorage.java b/streampipes-storage-api/src/main/java/org/apache/streampipes/storage/api/INoSqlStorage.java
index 06fdfea..4d84cca 100644
--- a/streampipes-storage-api/src/main/java/org/apache/streampipes/storage/api/INoSqlStorage.java
+++ b/streampipes-storage-api/src/main/java/org/apache/streampipes/storage/api/INoSqlStorage.java
@@ -58,4 +58,6 @@ public interface INoSqlStorage {
   IPipelineElementTemplateStorage getPipelineElementTemplateStorage();
 
   INodeInfoStorage getNodeStorage();
+
+  INodeDataStreamRelay getNodeDataStreamRelayStorage();
 }
diff --git a/streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/management/pe/PipelineElementLifeCycle.java b/streampipes-storage-api/src/main/java/org/apache/streampipes/storage/api/INodeDataStreamRelay.java
similarity index 56%
copy from streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/management/pe/PipelineElementLifeCycle.java
copy to streampipes-storage-api/src/main/java/org/apache/streampipes/storage/api/INodeDataStreamRelay.java
index 0ad32ee..67e052b 100644
--- a/streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/management/pe/PipelineElementLifeCycle.java
+++ b/streampipes-storage-api/src/main/java/org/apache/streampipes/storage/api/INodeDataStreamRelay.java
@@ -15,19 +15,25 @@
  * limitations under the License.
  *
  */
-package org.apache.streampipes.node.controller.container.management.pe;
+package org.apache.streampipes.storage.api;
 
-import org.apache.streampipes.container.model.node.InvocableRegistration;
-import org.apache.streampipes.model.Response;
 
-public interface PipelineElementLifeCycle {
+import org.apache.streampipes.model.eventrelay.SpDataStreamRelayContainer;
 
-    void register(InvocableRegistration registration);
+import java.util.List;
+import java.util.Optional;
 
-    Response invoke(String endpoint, String payload);
+public interface INodeDataStreamRelay {
 
-    Response detach(String runningInstanceId);
+    List<SpDataStreamRelayContainer> getAll();
 
-    void unregister();
+    List<SpDataStreamRelayContainer> getAllByNodeControllerId(String id);
 
+    void addRelayContainer(SpDataStreamRelayContainer relayContainer);
+
+    Optional<SpDataStreamRelayContainer> getRelayContainerById(String id);
+
+    void updateRelayContainer(SpDataStreamRelayContainer relayContainer);
+
+    void deleteRelayContainer(SpDataStreamRelayContainer relayContainer);
 }
diff --git a/streampipes-storage-couchdb/src/main/java/org/apache/streampipes/storage/couchdb/CouchDbStorageManager.java b/streampipes-storage-couchdb/src/main/java/org/apache/streampipes/storage/couchdb/CouchDbStorageManager.java
index 269fb42..739c22b 100644
--- a/streampipes-storage-couchdb/src/main/java/org/apache/streampipes/storage/couchdb/CouchDbStorageManager.java
+++ b/streampipes-storage-couchdb/src/main/java/org/apache/streampipes/storage/couchdb/CouchDbStorageManager.java
@@ -120,5 +120,8 @@ public enum CouchDbStorageManager implements INoSqlStorage {
     return new NodeInfoStorageImpl();
   }
 
-
+  @Override
+  public INodeDataStreamRelay getNodeDataStreamRelayStorage() {
+    return new NodeDataStreamRelayImpl();
+  }
 }
diff --git a/streampipes-storage-couchdb/src/main/java/org/apache/streampipes/storage/couchdb/impl/NodeDataStreamRelayImpl.java b/streampipes-storage-couchdb/src/main/java/org/apache/streampipes/storage/couchdb/impl/NodeDataStreamRelayImpl.java
new file mode 100644
index 0000000..2564418
--- /dev/null
+++ b/streampipes-storage-couchdb/src/main/java/org/apache/streampipes/storage/couchdb/impl/NodeDataStreamRelayImpl.java
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.apache.streampipes.storage.couchdb.impl;
+
+import org.apache.streampipes.model.eventrelay.SpDataStreamRelayContainer;
+import org.apache.streampipes.storage.api.INodeDataStreamRelay;
+import org.apache.streampipes.storage.couchdb.dao.AbstractDao;
+import org.apache.streampipes.storage.couchdb.utils.Utils;
+
+import java.util.List;
+import java.util.Optional;
+import java.util.stream.Collectors;
+
+public class NodeDataStreamRelayImpl extends AbstractDao<SpDataStreamRelayContainer> implements INodeDataStreamRelay {
+
+    public NodeDataStreamRelayImpl() {
+        super(Utils::getCouchDbNodeDataStreamRelayClient, SpDataStreamRelayContainer.class);
+    }
+
+    @Override
+    public List<SpDataStreamRelayContainer> getAll() {
+        return findAll();
+    }
+
+    @Override
+    public List<SpDataStreamRelayContainer> getAllByNodeControllerId(String id) {
+        return getAll().stream()
+                .filter(e -> e.getDeploymentTargetNodeId().equals(id))
+                .collect(Collectors.toList());
+    }
+
+    @Override
+    public void addRelayContainer(SpDataStreamRelayContainer relayContainer) {
+        persist(relayContainer);
+    }
+
+    @Override
+    public Optional<SpDataStreamRelayContainer> getRelayContainerById(String s) {
+        return getAll().stream()
+                .filter(e -> e.getRunningStreamRelayInstanceId().equals(s))
+                .findAny();
+    }
+
+    @Override
+    public void updateRelayContainer(SpDataStreamRelayContainer relayContainer) {
+        Optional<SpDataStreamRelayContainer> rc =
+                getRelayContainerById(relayContainer.getRunningStreamRelayInstanceId());
+        if(rc.isPresent()) {
+            relayContainer.setCouchDbId(rc.get().getCouchDbId());
+            relayContainer.setCouchDbRev(rc.get().getCouchDbRev());
+
+            update(relayContainer);
+        }
+    }
+
+    @Override
+    public void deleteRelayContainer(SpDataStreamRelayContainer relayContainer) {
+        Optional<SpDataStreamRelayContainer> rc = getRelayContainerById(relayContainer.getRunningStreamRelayInstanceId());
+        rc.ifPresent(r -> delete(r.getCouchDbId()));
+    }
+}
diff --git a/streampipes-storage-couchdb/src/main/java/org/apache/streampipes/storage/couchdb/utils/Utils.java b/streampipes-storage-couchdb/src/main/java/org/apache/streampipes/storage/couchdb/utils/Utils.java
index c1464ba..be64b45 100644
--- a/streampipes-storage-couchdb/src/main/java/org/apache/streampipes/storage/couchdb/utils/Utils.java
+++ b/streampipes-storage-couchdb/src/main/java/org/apache/streampipes/storage/couchdb/utils/Utils.java
@@ -144,6 +144,12 @@ public class Utils {
     return dbClient;
   }
 
+  public static CouchDbClient getCouchDbNodeDataStreamRelayClient() {
+    CouchDbClient dbClient = new CouchDbClient(props("relays"));
+    dbClient.setGsonBuilder(GsonSerializer.getGsonBuilder());
+    return dbClient;
+  }
+
   public static CouchDbClient getCouchDbInternalUsersClient() {
     CouchDbClient dbClient = new CouchDbClient(props("_users"));
     return dbClient;
diff --git a/ui/src/app/core-model/gen/streampipes-model.ts b/ui/src/app/core-model/gen/streampipes-model.ts
index f34d68a..2587943 100644
--- a/ui/src/app/core-model/gen/streampipes-model.ts
+++ b/ui/src/app/core-model/gen/streampipes-model.ts
@@ -19,10 +19,10 @@
 /* tslint:disable */
 /* eslint-disable */
 // @ts-nocheck
-// Generated using typescript-generator version 2.27.744 on 2021-02-23 09:17:52.
+// Generated using typescript-generator version 2.27.744 on 2021-02-26 19:57:32.
 
 export class AbstractStreamPipesEntity {
-    "@class": "org.apache.streampipes.model.base.AbstractStreamPipesEntity" | "org.apache.streampipes.model.base.NamedStreamPipesEntity" | "org.apache.streampipes.model.connect.adapter.AdapterDescription" | "org.apache.streampipes.model.connect.adapter.AdapterSetDescription" | "org.apache.streampipes.model.connect.adapter.GenericAdapterSetDescription" | "org.apache.streampipes.model.connect.adapter.SpecificAdapterSetDescription" | "org.apache.streampipes.model.connect.adapter.AdapterStre [...]
+    "@class": "org.apache.streampipes.model.base.AbstractStreamPipesEntity" | "org.apache.streampipes.model.base.NamedStreamPipesEntity" | "org.apache.streampipes.model.connect.adapter.AdapterDescription" | "org.apache.streampipes.model.connect.adapter.AdapterSetDescription" | "org.apache.streampipes.model.connect.adapter.GenericAdapterSetDescription" | "org.apache.streampipes.model.connect.adapter.SpecificAdapterSetDescription" | "org.apache.streampipes.model.connect.adapter.AdapterStre [...]
     elementId: string;
 
     static fromData(data: AbstractStreamPipesEntity, target?: AbstractStreamPipesEntity): AbstractStreamPipesEntity {
@@ -119,7 +119,7 @@ export class Accuracy extends EventPropertyQualityDefinition {
 }
 
 export class NamedStreamPipesEntity extends AbstractStreamPipesEntity {
-    "@class": "org.apache.streampipes.model.base.NamedStreamPipesEntity" | "org.apache.streampipes.model.connect.adapter.AdapterDescription" | "org.apache.streampipes.model.connect.adapter.AdapterSetDescription" | "org.apache.streampipes.model.connect.adapter.GenericAdapterSetDescription" | "org.apache.streampipes.model.connect.adapter.SpecificAdapterSetDescription" | "org.apache.streampipes.model.connect.adapter.AdapterStreamDescription" | "org.apache.streampipes.model.connect.adapter.G [...]
+    "@class": "org.apache.streampipes.model.base.NamedStreamPipesEntity" | "org.apache.streampipes.model.connect.adapter.AdapterDescription" | "org.apache.streampipes.model.connect.adapter.AdapterSetDescription" | "org.apache.streampipes.model.connect.adapter.GenericAdapterSetDescription" | "org.apache.streampipes.model.connect.adapter.SpecificAdapterSetDescription" | "org.apache.streampipes.model.connect.adapter.AdapterStreamDescription" | "org.apache.streampipes.model.connect.adapter.G [...]
     appId: string;
     applicationLinks: ApplicationLink[];
     connectedTo: string[];
@@ -151,8 +151,8 @@ export class NamedStreamPipesEntity extends AbstractStreamPipesEntity {
         instance.applicationLinks = __getCopyArrayFn(ApplicationLink.fromData)(data.applicationLinks);
         instance.internallyManaged = data.internallyManaged;
         instance.connectedTo = __getCopyArrayFn(__identity<string>())(data.connectedTo);
-        instance.dom = data.dom;
         instance.uri = data.uri;
+        instance.dom = data.dom;
         return instance;
     }
 }
@@ -1824,8 +1824,8 @@ export class GenericAdapterSetDescription extends AdapterSetDescription implemen
         }
         const instance = target || new GenericAdapterSetDescription();
         super.fromData(data, instance);
-        instance.formatDescription = FormatDescription.fromData(data.formatDescription);
         instance.protocolDescription = ProtocolDescription.fromData(data.protocolDescription);
+        instance.formatDescription = FormatDescription.fromData(data.formatDescription);
         instance.eventSchema = EventSchema.fromData(data.eventSchema);
         return instance;
     }
@@ -1843,8 +1843,8 @@ export class GenericAdapterStreamDescription extends AdapterStreamDescription im
         }
         const instance = target || new GenericAdapterStreamDescription();
         super.fromData(data, instance);
-        instance.formatDescription = FormatDescription.fromData(data.formatDescription);
         instance.protocolDescription = ProtocolDescription.fromData(data.protocolDescription);
+        instance.formatDescription = FormatDescription.fromData(data.formatDescription);
         instance.eventSchema = EventSchema.fromData(data.eventSchema);
         return instance;
     }
@@ -3074,7 +3074,7 @@ export class SpDataSet extends SpDataStream {
 }
 
 export class SpDataStreamRelay extends NamedStreamPipesEntity {
-    "@class": "org.apache.streampipes.model.SpDataStreamRelay";
+    "@class": "org.apache.streampipes.model.eventrelay.SpDataStreamRelay";
     eventGrounding: EventGrounding;
 
     static fromData(data: SpDataStreamRelay, target?: SpDataStreamRelay): SpDataStreamRelay {
diff --git a/ui/src/app/pipelines/dialog/pipeline-status/pipeline-status-dialog.component.ts b/ui/src/app/pipelines/dialog/pipeline-status/pipeline-status-dialog.component.ts
index 4014443..8c27f41 100644
--- a/ui/src/app/pipelines/dialog/pipeline-status/pipeline-status-dialog.component.ts
+++ b/ui/src/app/pipelines/dialog/pipeline-status/pipeline-status-dialog.component.ts
@@ -29,7 +29,7 @@ import {Component, Input} from "@angular/core";
 export class PipelineStatusDialogComponent {
 
     statusDetailsVisible: any;
-    elementStati : [[PipelineElementStatus]];
+    elementStati : Array<Array<PipelineElementStatus>>;
 
     @Input()
     pipelineOperationStatus: PipelineOperationStatus;
@@ -40,18 +40,15 @@ export class PipelineStatusDialogComponent {
     }
 
     ngOnInit(){
-        console.log(this.pipelineOperationStatus.elementStatus)
-        let nodes: [String];
-        nodes = [];
+        let nodes = [];
         this.pipelineOperationStatus.elementStatus.forEach(stat => {
             if (!nodes.includes(stat.elementNode)){
                 nodes.push(stat.elementNode)
             }
         })
-        console.log(nodes)
+
         nodes.forEach(node =>{
-            let nodeStati : [PipelineElementStatus];
-            nodeStati = [];
+            let nodeStati = [];
             this.pipelineOperationStatus.elementStatus.forEach(stat =>{
                 if(stat.elementNode == node){
                     nodeStati.push(stat);
@@ -59,7 +56,6 @@ export class PipelineStatusDialogComponent {
             })
             this.elementStati.push(nodeStati);
         })
-        console.log(this.elementStati)
     }
 
     close() {