You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@streampipes.apache.org by wi...@apache.org on 2020/12/19 21:45:43 UTC

[incubator-streampipes] branch edge-extensions updated: [WIP] initial support for data stream relays from edge nodes

This is an automated email from the ASF dual-hosted git repository.

wiener pushed a commit to branch edge-extensions
in repository https://gitbox.apache.org/repos/asf/incubator-streampipes.git


The following commit(s) were added to refs/heads/edge-extensions by this push:
     new 9f4168b  [WIP] initial support for data stream relays from edge nodes
9f4168b is described below

commit 9f4168b79d7500b0435ed611a1d9733eb7d6743e
Author: Patrick Wiener <wi...@fzi.de>
AuthorDate: Sat Dec 19 22:45:20 2020 +0100

    [WIP] initial support for data stream relays from edge nodes
---
 .../extensions/ExtensionsModelSubmitter.java       |  12 +-
 .../streampipes/model/SpDataStreamRelay.java       |   1 +
 .../model/SpDataStreamRelayContainer.java          | 139 ++++++++
 .../model/graph/DataProcessorInvocation.java       |   2 +
 .../streampipes/model/pipeline/Pipeline.java       |  10 +
 .../management/pe/InvocableElementManager.java     |   2 +
 .../{EventRelayManager.java => EventRelay.java}    |   6 +-
 .../management/relay/RunningRelayInstances.java    |  23 +-
 .../relay/bridges/MultiBrokerBridge.java           |   1 +
 .../container/rest/DataStreamRelayResource.java    |  80 +++++
 .../container/rest/DebugRelayResource.java         |  57 ---
 .../container/rest/InfoStatusResource.java         |   4 +-
 .../container/rest/InvocableEntityResource.java    |  57 ++-
 .../rest/NodeControllerResourceConfig.java         |   4 +-
 .../manager/execution/http/GraphSubmitter.java     |  12 +-
 .../manager/execution/http/HttpRequestBuilder.java |  25 +-
 .../manager/execution/http/PipelineExecutor.java   | 146 +++++---
 .../http/StreamRelayEndpointUrlGenerator.java      |  55 +++
 .../manager/matching/InvocationGraphBuilder.java   | 381 +++++++++++++--------
 .../jsonld/CustomAnnotationProvider.java           |   3 +-
 .../serializers/jsonld/JsonLdTransformer.java      |   3 +-
 .../apache/streampipes/vocabulary/StreamPipes.java |   2 +
 ui/src/app/core-model/gen/streampipes-model.ts     |   4 +-
 .../save-pipeline/save-pipeline.component.html     |  46 ---
 .../save-pipeline/save-pipeline.component.ts       |   2 +
 25 files changed, 729 insertions(+), 348 deletions(-)

diff --git a/streampipes-container-extensions/src/main/java/org/apache/streampipes/container/extensions/ExtensionsModelSubmitter.java b/streampipes-container-extensions/src/main/java/org/apache/streampipes/container/extensions/ExtensionsModelSubmitter.java
index 82983c7..0c28bb6 100644
--- a/streampipes-container-extensions/src/main/java/org/apache/streampipes/container/extensions/ExtensionsModelSubmitter.java
+++ b/streampipes-container-extensions/src/main/java/org/apache/streampipes/container/extensions/ExtensionsModelSubmitter.java
@@ -64,6 +64,10 @@ public abstract class ExtensionsModelSubmitter extends ModelSubmitter<Extensions
     private static final String SLASH = "/";
     private static final String COLON = ":";
 
+    private static final String NODE_CONTROLLER_ID = "SP_NODE_CONTROLLER_ID";
+    private static final String NODE_CONTROLLER_CONTAINER_HOST = "SP_NODE_CONTROLLER_CONTAINER_HOST";
+    private static final String NODE_CONTROLLER_CONTAINER_PORT = "SP_NODE_CONTROLLER_CONTAINER_PORT";
+
     public void init(ExtensionsConfig conf) {
         DeclarersSingleton.getInstance().setHostName(conf.getHost());
         DeclarersSingleton.getInstance().setPort(conf.getPort());
@@ -89,7 +93,7 @@ public abstract class ExtensionsModelSubmitter extends ModelSubmitter<Extensions
         String adapterUrl = PROTOCOL + conf.getHost() + COLON + conf.getPort() + SLASH;
 
         // check wether pipeline element is managed by node controller
-        if (System.getenv("SP_NODE_CONTROLLER_ID") != null) {
+        if (System.getenv(NODE_CONTROLLER_ID) != null) {
             // secondary
             // register pipeline element service via node controller
             NodeControllerUtil.register(
@@ -142,9 +146,9 @@ public abstract class ExtensionsModelSubmitter extends ModelSubmitter<Extensions
     private ConnectWorkerContainer getContainerDescription(String endpointUrl, boolean runsOnEdgeNode) {
 
         if (runsOnEdgeNode) {
-            String deploymentTargetNodeId = System.getenv("SP_NODE_CONTROLLER_ID");
-            String deploymentTargetNodeHostname = System.getenv("SP_NODE_CONTROLLER_CONTAINER_HOST");
-            int deploymentTargetNodePort = Integer.parseInt(System.getenv("SP_NODE_CONTROLLER_CONTAINER_PORT"));
+            String deploymentTargetNodeId = System.getenv(NODE_CONTROLLER_ID);
+            String deploymentTargetNodeHostname = System.getenv(NODE_CONTROLLER_CONTAINER_HOST);
+            int deploymentTargetNodePort = Integer.parseInt(System.getenv(NODE_CONTROLLER_CONTAINER_PORT));
 
             List<AdapterDescription> adapters = new ArrayList<>();
             for (Adapter a : AdapterDeclarerSingleton.getInstance().getAllAdapters()) {
diff --git a/streampipes-model/src/main/java/org/apache/streampipes/model/SpDataStreamRelay.java b/streampipes-model/src/main/java/org/apache/streampipes/model/SpDataStreamRelay.java
index 4486dac..addbd55 100644
--- a/streampipes-model/src/main/java/org/apache/streampipes/model/SpDataStreamRelay.java
+++ b/streampipes-model/src/main/java/org/apache/streampipes/model/SpDataStreamRelay.java
@@ -59,6 +59,7 @@ public class SpDataStreamRelay extends NamedStreamPipesEntity {
     }
 
     public SpDataStreamRelay(EventGrounding eventGrounding) {
+        super(prefix + RandomStringUtils.randomAlphabetic(6));
         this.eventGrounding = eventGrounding;
     }
 
diff --git a/streampipes-model/src/main/java/org/apache/streampipes/model/SpDataStreamRelayContainer.java b/streampipes-model/src/main/java/org/apache/streampipes/model/SpDataStreamRelayContainer.java
new file mode 100644
index 0000000..cb1cd40
--- /dev/null
+++ b/streampipes-model/src/main/java/org/apache/streampipes/model/SpDataStreamRelayContainer.java
@@ -0,0 +1,139 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.apache.streampipes.model;
+
+import io.fogsy.empire.annotations.RdfProperty;
+import io.fogsy.empire.annotations.RdfsClass;
+import org.apache.commons.lang.RandomStringUtils;
+import org.apache.streampipes.model.base.NamedStreamPipesEntity;
+import org.apache.streampipes.model.grounding.EventGrounding;
+import org.apache.streampipes.vocabulary.StreamPipes;
+
+import javax.persistence.CascadeType;
+import javax.persistence.Entity;
+import javax.persistence.FetchType;
+import javax.persistence.OneToOne;
+import java.util.ArrayList;
+import java.util.List;
+
+
+@RdfsClass(StreamPipes.DATA_STREAM_RELAY_CONTAINER)
+@Entity
+public class SpDataStreamRelayContainer extends NamedStreamPipesEntity {
+
+    private static final long serialVersionUID = -4675162465357705480L;
+
+    private static final String prefix = "urn:apache.org:relaystreamcontainer:";
+
+    @OneToOne(fetch = FetchType.EAGER,
+            cascade = {CascadeType.PERSIST, CascadeType.MERGE})
+    @RdfProperty(StreamPipes.HAS_GROUNDING)
+    protected EventGrounding inputGrounding;
+
+    @OneToOne(fetch = FetchType.EAGER,
+            cascade = {CascadeType.PERSIST, CascadeType.MERGE})
+    @RdfProperty(StreamPipes.HAS_EVENT_RELAY)
+    private List<SpDataStreamRelay> outputStreamRelays;
+
+    @RdfProperty(StreamPipes.HAS_EVENT_RELAY_STRATEGY)
+    private String eventRelayStrategy;
+
+    @RdfProperty(StreamPipes.DATA_STREAM_RELAY_RUNNING_INSTANCE_ID)
+    private String runningStreamRelayInstanceId;
+
+    @RdfProperty(StreamPipes.DEPLOYMENT_TARGET_NODE_ID)
+    private String deploymentTargetNodeId;
+
+    @RdfProperty(StreamPipes.DEPLOYMENT_TARGET_NODE_HOSTNAME)
+    private String deploymentTargetNodeHostname;
+
+    @RdfProperty(StreamPipes.DEPLOYMENT_TARGET_NODE_PORT)
+    private Integer deploymentTargetNodePort;
+
+    public SpDataStreamRelayContainer() {
+        super(prefix + RandomStringUtils.randomAlphabetic(6));
+        this.outputStreamRelays = new ArrayList<>();
+    }
+
+    public SpDataStreamRelayContainer(String elementId, List<SpDataStreamRelay> outputStreamRelays,
+                                      String eventRelayStrategy) {
+        super(elementId);
+        this.outputStreamRelays = outputStreamRelays;
+        this.eventRelayStrategy = eventRelayStrategy;
+    }
+
+    public SpDataStreamRelayContainer(NamedStreamPipesEntity other) {
+        super(other);
+    }
+
+    public EventGrounding getInputGrounding() {
+        return inputGrounding;
+    }
+
+    public void setInputGrounding(EventGrounding inputGrounding) {
+        this.inputGrounding = inputGrounding;
+    }
+
+    public List<SpDataStreamRelay> getOutputStreamRelays() {
+        return outputStreamRelays;
+    }
+
+    public void setOutputStreamRelays(List<SpDataStreamRelay> outputStreamRelays) {
+        this.outputStreamRelays = outputStreamRelays;
+    }
+
+    public String getRunningStreamRelayInstanceId() {
+        return runningStreamRelayInstanceId;
+    }
+
+    public void setRunningStreamRelayInstanceId(String runningStreamRelayInstanceId) {
+        this.runningStreamRelayInstanceId = runningStreamRelayInstanceId;
+    }
+
+    public String getEventRelayStrategy() {
+        return eventRelayStrategy;
+    }
+
+    public void setEventRelayStrategy(String eventRelayStrategy) {
+        this.eventRelayStrategy = eventRelayStrategy;
+    }
+
+    public String getDeploymentTargetNodeId() {
+        return deploymentTargetNodeId;
+    }
+
+    public void setDeploymentTargetNodeId(String deploymentTargetNodeId) {
+        this.deploymentTargetNodeId = deploymentTargetNodeId;
+    }
+
+    public String getDeploymentTargetNodeHostname() {
+        return deploymentTargetNodeHostname;
+    }
+
+    public void setDeploymentTargetNodeHostname(String deploymentTargetNodeHostname) {
+        this.deploymentTargetNodeHostname = deploymentTargetNodeHostname;
+    }
+
+    public Integer getDeploymentTargetNodePort() {
+        return deploymentTargetNodePort;
+    }
+
+    public void setDeploymentTargetNodePort(Integer deploymentTargetNodePort) {
+        this.deploymentTargetNodePort = deploymentTargetNodePort;
+    }
+}
diff --git a/streampipes-model/src/main/java/org/apache/streampipes/model/graph/DataProcessorInvocation.java b/streampipes-model/src/main/java/org/apache/streampipes/model/graph/DataProcessorInvocation.java
index bd168c9..016f3c7 100644
--- a/streampipes-model/src/main/java/org/apache/streampipes/model/graph/DataProcessorInvocation.java
+++ b/streampipes-model/src/main/java/org/apache/streampipes/model/graph/DataProcessorInvocation.java
@@ -174,6 +174,8 @@ public class DataProcessorInvocation extends InvocableStreamPipesEntity implemen
 
   public boolean addOutputStreamRelay(SpDataStreamRelay spDataStreamRelay) { return outputStreamRelays.add(spDataStreamRelay); }
 
+  public boolean removeOutputStreamRelay(SpDataStreamRelay spDataStreamRelay) { return outputStreamRelays.remove(spDataStreamRelay); }
+
   public List<SpDataStreamRelay> getOutputStreamRelays() {
     return outputStreamRelays;
   }
diff --git a/streampipes-model/src/main/java/org/apache/streampipes/model/pipeline/Pipeline.java b/streampipes-model/src/main/java/org/apache/streampipes/model/pipeline/Pipeline.java
index 5de1173..58774ca 100644
--- a/streampipes-model/src/main/java/org/apache/streampipes/model/pipeline/Pipeline.java
+++ b/streampipes-model/src/main/java/org/apache/streampipes/model/pipeline/Pipeline.java
@@ -44,6 +44,7 @@ public class Pipeline extends ElementComposition {
   private boolean publicElement;
 
   private String createdByUser;
+  private String eventRelayStrategy;
 
   private List<String> pipelineCategories;
 
@@ -133,6 +134,14 @@ public class Pipeline extends ElementComposition {
     this.createdAt = createdAt;
   }
 
+  public String getEventRelayStrategy() {
+    return eventRelayStrategy;
+  }
+
+  public void setEventRelayStrategy(String eventRelayStrategy) {
+    this.eventRelayStrategy = eventRelayStrategy;
+  }
+
   public Pipeline clone() {
     Pipeline pipeline = new Pipeline();
     pipeline.setName(name);
@@ -145,6 +154,7 @@ public class Pipeline extends ElementComposition {
     pipeline.setCreatedAt(createdAt);
     pipeline.setPipelineId(pipelineId);
     pipeline.setRev(rev);
+    pipeline.setEventRelayStrategy(eventRelayStrategy);
     return pipeline;
   }
 
diff --git a/streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/management/pe/InvocableElementManager.java b/streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/management/pe/InvocableElementManager.java
index 854e4a6..3b57df8 100644
--- a/streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/management/pe/InvocableElementManager.java
+++ b/streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/management/pe/InvocableElementManager.java
@@ -69,6 +69,8 @@ public class InvocableElementManager implements ElementLifeCyle {
             NodeInfoStorage.getInstance()
                     .retrieveNodeInfo()
                     .setSupportedPipelineElementAppIds(registration.getSupportedPipelineElementAppIds());
+
+            LOG.info("Successfully registered pipeline element container");
         } catch (IOException e) {
             e.printStackTrace();
         }
diff --git a/streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/management/relay/EventRelayManager.java b/streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/management/relay/EventRelay.java
similarity index 86%
rename from streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/management/relay/EventRelayManager.java
rename to streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/management/relay/EventRelay.java
index 84882bf..7c52867 100644
--- a/streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/management/relay/EventRelayManager.java
+++ b/streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/management/relay/EventRelay.java
@@ -21,15 +21,15 @@ import org.apache.streampipes.commons.exceptions.SpRuntimeException;
 import org.apache.streampipes.model.grounding.*;
 import org.apache.streampipes.node.controller.container.management.relay.metrics.RelayMetrics;
 
-public class EventRelayManager extends BaseEventRelay {
+public class EventRelay extends BaseEventRelay {
 
     private static final String DEFAULT_EVENT_RELAY_STRATEGY = "buffer";
 
-    public EventRelayManager(TransportProtocol source, TransportProtocol target) {
+    public EventRelay(TransportProtocol source, TransportProtocol target) {
         super(source, target, DEFAULT_EVENT_RELAY_STRATEGY);
     }
 
-    public EventRelayManager(TransportProtocol source, TransportProtocol target, String relayStrategy) {
+    public EventRelay(TransportProtocol source, TransportProtocol target, String relayStrategy) {
         super(source, target, relayStrategy);
     }
 
diff --git a/streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/management/relay/RunningRelayInstances.java b/streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/management/relay/RunningRelayInstances.java
index a3d1649..e95ab57 100644
--- a/streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/management/relay/RunningRelayInstances.java
+++ b/streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/management/relay/RunningRelayInstances.java
@@ -19,21 +19,18 @@ package org.apache.streampipes.node.controller.container.management.relay;
 
 import org.apache.streampipes.node.controller.container.management.IRunningInstances;
 
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
+import java.util.*;
 import java.util.stream.Collectors;
 
-public enum RunningRelayInstances implements IRunningInstances<EventRelayManager> {
+public enum RunningRelayInstances implements IRunningInstances<Map<String,EventRelay>> {
     INSTANCE;
 
-    private final Map<String, EventRelayManager> runningInstances = new HashMap<>();
+    private final Map<String, Map<String, EventRelay>> runningInstances = new HashMap<>();
 
     // TODO: persist active relays to support failure handling
     @Override
-    public void add(String id, EventRelayManager eventRelayManager) {
-        runningInstances.put(id, eventRelayManager);
+    public void add(String id, Map<String,EventRelay> eventRelayMap) {
+        runningInstances.put(id, eventRelayMap);
     }
 
     @Override
@@ -42,7 +39,7 @@ public enum RunningRelayInstances implements IRunningInstances<EventRelayManager
     }
 
     @Override
-    public EventRelayManager get(String id) {
+    public Map<String,EventRelay> get(String id) {
         return isRunning(id) ? runningInstances.get(id) : null;
     }
 
@@ -51,7 +48,11 @@ public enum RunningRelayInstances implements IRunningInstances<EventRelayManager
         runningInstances.remove(id);
     }
 
-    public List<EventRelayManager> getRunningInstances() {
-        return new ArrayList<>(runningInstances.values());
+    public List<EventRelay> getRunningInstances() {
+        return runningInstances.values().stream()
+                .map(Map::values)
+                .flatMap(Collection::stream)
+                .collect(Collectors.toList());
+        //return new ArrayList<>(runningInstances.values().forEach(e -> e.values()));
     }
 }
diff --git a/streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/management/relay/bridges/MultiBrokerBridge.java b/streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/management/relay/bridges/MultiBrokerBridge.java
index 3bc1af6..cc7c0c5 100644
--- a/streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/management/relay/bridges/MultiBrokerBridge.java
+++ b/streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/management/relay/bridges/MultiBrokerBridge.java
@@ -21,6 +21,7 @@ import org.apache.streampipes.commons.exceptions.SpRuntimeException;
 import org.apache.streampipes.messaging.EventConsumer;
 import org.apache.streampipes.messaging.EventProducer;
 import org.apache.streampipes.messaging.EventRelay;
+import org.apache.streampipes.model.Response;
 import org.apache.streampipes.model.grounding.*;
 import org.apache.streampipes.node.controller.container.config.NodeControllerConfig;
 import org.apache.streampipes.node.controller.container.management.relay.metrics.RelayMetrics;
diff --git a/streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/rest/DataStreamRelayResource.java b/streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/rest/DataStreamRelayResource.java
new file mode 100644
index 0000000..7de261a
--- /dev/null
+++ b/streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/rest/DataStreamRelayResource.java
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.apache.streampipes.node.controller.container.rest;
+
+import org.apache.streampipes.container.transform.Transformer;
+import org.apache.streampipes.container.util.Util;
+import org.apache.streampipes.model.Response;
+import org.apache.streampipes.model.SpDataStreamRelayContainer;
+import org.apache.streampipes.model.grounding.TransportProtocol;
+import org.apache.streampipes.model.runtime.Event;
+import org.apache.streampipes.node.controller.container.management.relay.EventRelay;
+import org.apache.streampipes.node.controller.container.management.relay.RunningRelayInstances;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.ws.rs.*;
+import javax.ws.rs.core.MediaType;
+import java.util.HashMap;
+import java.util.Map;
+
+@Path("/api/v2/node/stream/relay")
+public class DataStreamRelayResource extends AbstractNodeContainerResource {
+    private static final Logger LOG = LoggerFactory.getLogger(DataStreamRelayResource.class.getCanonicalName());
+
+    @POST
+    @Path("/invoke")
+    @Consumes(MediaType.APPLICATION_JSON)
+    @Produces(MediaType.APPLICATION_JSON)
+    public String invokeRelaySourceDataStream(SpDataStreamRelayContainer graph) {
+
+        String strategy = graph.getEventRelayStrategy();
+        String runningInstanceId = graph.getRunningStreamRelayInstanceId();
+        TransportProtocol source = graph.getInputGrounding().getTransportProtocol();
+
+        Map<String, EventRelay> eventRelayMap = new HashMap<>();
+
+        graph.getOutputStreamRelays().forEach(r -> {
+            TransportProtocol target = r.getEventGrounding().getTransportProtocol();
+
+            EventRelay eventRelay = new EventRelay(source, target, strategy);
+            eventRelay.start();
+            eventRelayMap.put(r.getElementId(), eventRelay);
+        });
+
+        RunningRelayInstances.INSTANCE.add(graph.getRunningStreamRelayInstanceId(), eventRelayMap);
+
+        return Util.toResponseString(new Response(runningInstanceId,true,""));
+    }
+
+    @DELETE
+    @Path("/detach/{runningInstanceId}")
+    @Consumes(MediaType.APPLICATION_JSON)
+    @Produces(MediaType.APPLICATION_JSON)
+    public String detachRelaySourceDataStream(@PathParam("runningInstanceId") String runningInstanceId) {
+
+        Map<String, EventRelay> relay = RunningRelayInstances.INSTANCE.get(runningInstanceId);
+        if (relay != null) {
+            relay.values().forEach(EventRelay::stop);
+        }
+
+        RunningRelayInstances.INSTANCE.remove(runningInstanceId);
+
+        return Util.toResponseString(new Response(runningInstanceId, true, ""));
+    }
+}
diff --git a/streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/rest/DebugRelayResource.java b/streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/rest/DebugRelayResource.java
deleted file mode 100644
index 8f99a8f..0000000
--- a/streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/rest/DebugRelayResource.java
+++ /dev/null
@@ -1,57 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-package org.apache.streampipes.node.controller.container.rest;
-
-import org.apache.streampipes.commons.exceptions.SpRuntimeException;
-import org.apache.streampipes.node.controller.container.management.relay.EventRelayManager;
-import org.apache.streampipes.node.controller.container.management.relay.RunningRelayInstances;
-
-import javax.ws.rs.POST;
-import javax.ws.rs.Path;
-import javax.ws.rs.core.Response;
-
-@Path("/api/v2/relay")
-public class DebugRelayResource extends AbstractNodeContainerResource {
-
-    // TODO: Debug-only.
-    @POST
-    @Path("/start")
-    public Response debugRelayEventStream(String msg) throws SpRuntimeException {
-        // TODO implement
-
-//        System.out.println(msg);
-//        EventRelayManager eventRelayManager = new EventRelayManager();
-//        eventRelayManager.start();
-//        RunningRelayInstances.INSTANCE.add(eventRelayManager.getRelayTopic(), eventRelayManager);
-
-        return ok();
-    }
-
-    @POST
-    @Path("/stop")
-    public Response debugStopRelayEventStream(String msg) throws SpRuntimeException {
-        // TODO implement
-
-//        System.out.println(msg);
-//        EventRelayManager eventRelayManager = RunningRelayInstances.INSTANCE.get("org.apache.streampipes.flowrate01");
-//        assert eventRelayManager != null;
-//        eventRelayManager.stop();
-
-        return ok();
-    }
-}
diff --git a/streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/rest/InfoStatusResource.java b/streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/rest/InfoStatusResource.java
index 30d2459..276b8fa 100644
--- a/streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/rest/InfoStatusResource.java
+++ b/streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/rest/InfoStatusResource.java
@@ -19,7 +19,7 @@ package org.apache.streampipes.node.controller.container.rest;
 
 import com.fasterxml.jackson.core.JsonProcessingException;
 import org.apache.streampipes.node.controller.container.management.info.NodeInfoStorage;
-import org.apache.streampipes.node.controller.container.management.relay.EventRelayManager;
+import org.apache.streampipes.node.controller.container.management.relay.EventRelay;
 import org.apache.streampipes.node.controller.container.management.relay.RunningRelayInstances;
 import org.apache.streampipes.node.controller.container.management.relay.metrics.RelayMetrics;
 import org.apache.streampipes.node.controller.container.management.resources.ResourceManager;
@@ -57,7 +57,7 @@ public class InfoStatusResource extends AbstractNodeContainerResource{
 
         List<RelayMetrics> metricsList = RunningRelayInstances.INSTANCE.getRunningInstances()
                 .stream()
-                .map(EventRelayManager::getRelayMetrics)
+                .map(EventRelay::getRelayMetrics)
                 .collect(Collectors.toList());
 
         try {
diff --git a/streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/rest/InvocableEntityResource.java b/streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/rest/InvocableEntityResource.java
index c899701..5f13a8d 100644
--- a/streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/rest/InvocableEntityResource.java
+++ b/streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/rest/InvocableEntityResource.java
@@ -19,20 +19,17 @@ package org.apache.streampipes.node.controller.container.rest;
 
 import org.apache.streampipes.container.model.node.InvocableRegistration;
 import org.apache.streampipes.container.transform.Transformer;
-import org.apache.streampipes.model.SpDataStream;
 import org.apache.streampipes.model.SpDataStreamRelay;
 import org.apache.streampipes.model.base.InvocableStreamPipesEntity;
 import org.apache.streampipes.model.graph.DataProcessorInvocation;
 import org.apache.streampipes.model.graph.DataSinkInvocation;
-import org.apache.streampipes.model.grounding.EventGrounding;
 import org.apache.streampipes.model.grounding.TransportProtocol;
 import org.apache.streampipes.model.node.PipelineElementDockerContainer;
 import org.apache.streampipes.node.controller.container.management.orchestrator.docker.DockerContainerOrchestrator;
 import org.apache.streampipes.node.controller.container.management.pe.InvocableElementManager;
 import org.apache.streampipes.node.controller.container.management.pe.RunningInvocableInstances;
-import org.apache.streampipes.node.controller.container.management.relay.EventRelayManager;
+import org.apache.streampipes.node.controller.container.management.relay.EventRelay;
 import org.apache.streampipes.node.controller.container.management.relay.RunningRelayInstances;
-import org.apache.streampipes.serializers.json.JacksonSerializer;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -40,10 +37,12 @@ import javax.ws.rs.*;
 import javax.ws.rs.core.MediaType;
 import javax.ws.rs.core.Response;
 import java.io.IOException;
+import java.util.HashMap;
 import java.util.List;
+import java.util.Map;
 
 @Path("/api/v2/node/container")
-public class InvocableEntityResource<I extends InvocableStreamPipesEntity> extends AbstractNodeContainerResource{
+public class InvocableEntityResource extends AbstractNodeContainerResource {
     private static final Logger LOG = LoggerFactory.getLogger(InvocableEntityResource.class.getCanonicalName());
 
     @GET
@@ -61,19 +60,8 @@ public class InvocableEntityResource<I extends InvocableStreamPipesEntity> exten
 
     @POST
     @Path("/register")
-    public void register(String body) {
-        try {
-            InvocableRegistration invocableRegistration = JacksonSerializer
-                    .getObjectMapper()
-                    .readValue(body, InvocableRegistration.class);
-
-            // register pipeline elements at consul and node controller
-            InvocableElementManager.getInstance().register(invocableRegistration);
-            LOG.info("Sucessfully registered pipeline element container");
-
-        } catch (IOException e) {
-            LOG.error("Could not register pipeline element container - " + e.toString());
-        }
+    public void register(InvocableRegistration registration) {
+        InvocableElementManager.getInstance().register(registration);
     }
 
     @POST
@@ -83,30 +71,29 @@ public class InvocableEntityResource<I extends InvocableStreamPipesEntity> exten
     public String invoke(@PathParam("identifier") String identifier,
                            @PathParam("elementId") String elementId, String payload) {
 
-        // TODO implement
         String endpoint;
         InvocableStreamPipesEntity graph;
         try {
             if (identifier.equals("sepa")) {
                 graph = Transformer.fromJsonLd(DataProcessorInvocation.class, payload);
-
                 endpoint = graph.getBelongsTo();
-
                 TransportProtocol source = ((DataProcessorInvocation) graph)
                         .getOutputStream()
                         .getEventGrounding()
                         .getTransportProtocol();
 
-                String relayStrategy = ((DataProcessorInvocation) graph).getEventRelayStrategy();
+                String strategy = ((DataProcessorInvocation) graph).getEventRelayStrategy();
+                List<SpDataStreamRelay> dataStreamRelays = ((DataProcessorInvocation) graph).getOutputStreamRelays();
 
-                ((DataProcessorInvocation) graph).getOutputStreamRelays().forEach(r -> {
+                Map<String, EventRelay> eventRelayMap = new HashMap<>();
+                dataStreamRelays.forEach(r -> {
                     TransportProtocol target = r.getEventGrounding().getTransportProtocol();
-                    EventRelayManager relayManager = new EventRelayManager(source, target, relayStrategy);
-                    relayManager.start();
 
-                    RunningRelayInstances.INSTANCE.add(graph.getDeploymentRunningInstanceId(),
-                            relayManager);
+                    EventRelay eventRelay = new EventRelay(source, target, strategy);
+                    eventRelay.start();
+                    eventRelayMap.put(r.getElementId(), eventRelay);
                 });
+                RunningRelayInstances.INSTANCE.add(graph.getDeploymentRunningInstanceId(), eventRelayMap);
 
                 org.apache.streampipes.model.Response resp = InvocableElementManager.getInstance().invoke(endpoint,
                         payload);
@@ -123,7 +110,14 @@ public class InvocableEntityResource<I extends InvocableStreamPipesEntity> exten
                 graph = Transformer.fromJsonLd(DataSinkInvocation.class, payload);
                 endpoint = graph.getBelongsTo();
 
-                InvocableElementManager.getInstance().invoke(endpoint, payload);
+                org.apache.streampipes.model.Response resp = InvocableElementManager.getInstance()
+                        .invoke(endpoint, payload);
+
+                if (resp.isSuccess()) {
+                    RunningInvocableInstances.INSTANCE.add(graph.getDeploymentRunningInstanceId(), graph);
+                }
+
+                return resp.toString();
 
             }
         } catch (IOException e) {
@@ -141,15 +135,13 @@ public class InvocableEntityResource<I extends InvocableStreamPipesEntity> exten
         LOG.info("receive stop request elementId={}, runningInstanceId={}", elementId, runningInstanceId);
 
         // TODO store host and port locally to retrieve by runningInstanceId
-
         String endpoint = RunningInvocableInstances.INSTANCE.get(runningInstanceId).getBelongsTo();
         String resp = InvocableElementManager.getInstance().detach(endpoint + "/" + runningInstanceId);
 
         // Stop relay for invocable if existing
-        // TODO: maybe use unique identifier for retrieving relay
-        EventRelayManager relay = RunningRelayInstances.INSTANCE.get(runningInstanceId);
+        Map<String, EventRelay> relay = RunningRelayInstances.INSTANCE.get(runningInstanceId);
         if (relay != null) {
-            relay.stop();
+            relay.values().forEach(EventRelay::stop);
         }
 
         RunningInvocableInstances.INSTANCE.remove(runningInstanceId);
@@ -165,5 +157,4 @@ public class InvocableEntityResource<I extends InvocableStreamPipesEntity> exten
         InvocableElementManager.getInstance().unregister();
         return ok(DockerContainerOrchestrator.getInstance().remove(container));
     }
-
 }
diff --git a/streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/rest/NodeControllerResourceConfig.java b/streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/rest/NodeControllerResourceConfig.java
index a18816c..8e1e117 100644
--- a/streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/rest/NodeControllerResourceConfig.java
+++ b/streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/rest/NodeControllerResourceConfig.java
@@ -27,8 +27,6 @@ public class NodeControllerResourceConfig extends ResourceConfig {
         register(HealthCheckResource.class);
         register(InfoStatusResource.class);
         register(InvocableEntityResource.class);
-
-        // TODO remove later - only for local relay tests
-        register(DebugRelayResource.class);
+        register(DataStreamRelayResource.class);
     }
 }
diff --git a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/http/GraphSubmitter.java b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/http/GraphSubmitter.java
index 173e0b0..edbb551 100644
--- a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/http/GraphSubmitter.java
+++ b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/http/GraphSubmitter.java
@@ -18,6 +18,7 @@
 
 package org.apache.streampipes.manager.execution.http;
 
+import org.apache.streampipes.model.SpDataStreamRelayContainer;
 import org.apache.streampipes.model.base.NamedStreamPipesEntity;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -28,6 +29,7 @@ import org.apache.streampipes.model.pipeline.PipelineOperationStatus;
 
 import java.util.List;
 import java.util.Optional;
+import java.util.stream.Collectors;
 
 public class GraphSubmitter {
 
@@ -37,18 +39,23 @@ public class GraphSubmitter {
   private final List<SpDataSet> dataSets;
   private final String pipelineId;
   private final String pipelineName;
+  private final List<SpDataStreamRelayContainer> streamRelays;
 
   public GraphSubmitter(String pipelineId, String pipelineName, List<InvocableStreamPipesEntity> graphs,
-                        List<SpDataSet> dataSets) {
+                        List<SpDataSet> dataSets, List<SpDataStreamRelayContainer> streamRelays) {
     this.graphs = graphs;
     this.pipelineId = pipelineId;
     this.pipelineName = pipelineName;
     this.dataSets = dataSets;
+    this.streamRelays = streamRelays;
   }
 
   public PipelineOperationStatus invokeGraphs() {
     PipelineOperationStatus status = initPipelineOperationStatus();
 
+    if (streamRelays.stream().anyMatch(s -> s.getOutputStreamRelays().size() > 0)) {
+      streamRelays.forEach(streamRelay -> invoke(new StreamRelayEndpointUrlGenerator(streamRelay), streamRelay, status));
+    }
     graphs.forEach(graph -> invoke(new InvocableEntityUrlGenerator(graph), graph, status));
     // only invoke datasets when following pipeline elements are started
     if (allInvocableEntitiesRunning(status)) {
@@ -67,6 +74,9 @@ public class GraphSubmitter {
 
     graphs.forEach(graph -> detach(new InvocableEntityUrlGenerator(graph), graph, status));
     dataSets.forEach(dataset -> detach(new DataSetEntityUrlGenerator(dataset), dataset, status));
+    if (streamRelays.stream().anyMatch(s -> s.getOutputStreamRelays().size() > 0)) {
+      streamRelays.forEach(streamRelay -> detach(new StreamRelayEndpointUrlGenerator(streamRelay), streamRelay, status));
+    }
 
     return verifyPipelineOperationStatus(
             status,
diff --git a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/http/HttpRequestBuilder.java b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/http/HttpRequestBuilder.java
index 6ab296a..f6706eb 100644
--- a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/http/HttpRequestBuilder.java
+++ b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/http/HttpRequestBuilder.java
@@ -18,14 +18,17 @@
 
 package org.apache.streampipes.manager.execution.http;
 
+import com.fasterxml.jackson.core.JsonProcessingException;
 import com.google.gson.Gson;
 import com.google.gson.JsonSyntaxException;
 import org.apache.http.client.fluent.Request;
 import org.apache.http.client.fluent.Response;
 import org.apache.http.entity.ContentType;
 import org.apache.streampipes.commons.Utils;
+import org.apache.streampipes.model.base.InvocableStreamPipesEntity;
 import org.apache.streampipes.model.base.NamedStreamPipesEntity;
 import org.apache.streampipes.model.pipeline.PipelineElementStatus;
+import org.apache.streampipes.serializers.json.JacksonSerializer;
 import org.apache.streampipes.serializers.jsonld.JsonLdTransformer;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -47,12 +50,18 @@ public class HttpRequestBuilder {
   }
 
   public PipelineElementStatus invoke() {
-    LOG.info("Invoking element: " + endpointUrl);
     try {
-      String jsonLd = jsonLd();
+      String json;
+      if (payload instanceof InvocableStreamPipesEntity) {
+        LOG.info("Invoking pipeline element: " + endpointUrl);
+        json = jsonLd();
+      } else {
+        LOG.info("Invoking data stream relay: " + endpointUrl);
+        json = jackson();
+      }
       Response httpResp = Request
               .Post(endpointUrl)
-              .bodyString(jsonLd, ContentType.APPLICATION_JSON)
+              .bodyString(json, ContentType.APPLICATION_JSON)
               .connectTimeout(CONNECT_TIMEOUT)
               .execute();
       return handleResponse(httpResp);
@@ -63,6 +72,12 @@ public class HttpRequestBuilder {
   }
 
   public PipelineElementStatus detach() {
+    if (payload instanceof InvocableStreamPipesEntity) {
+      LOG.info("Detaching pipeline element: " + endpointUrl);
+    } else {
+      LOG.info("Detaching data stream relay: " + endpointUrl);
+    }
+
     try {
       Response httpResp = Request
               .Delete(endpointUrl)
@@ -88,4 +103,8 @@ public class HttpRequestBuilder {
   private PipelineElementStatus convert(org.apache.streampipes.model.Response response) {
     return new PipelineElementStatus(endpointUrl, payload.getName(), response.isSuccess(), response.getOptionalMessage());
   }
+
+  private String jackson() throws JsonProcessingException {
+    return JacksonSerializer.getObjectMapper().writeValueAsString(payload);
+  }
 }
diff --git a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/http/PipelineExecutor.java b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/http/PipelineExecutor.java
index ce16327..9172398 100644
--- a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/http/PipelineExecutor.java
+++ b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/http/PipelineExecutor.java
@@ -18,6 +18,10 @@
 
 package org.apache.streampipes.manager.execution.http;
 
+import org.apache.streampipes.model.SpDataStream;
+import org.apache.streampipes.model.SpDataStreamRelay;
+import org.apache.streampipes.model.SpDataStreamRelayContainer;
+import org.apache.streampipes.model.grounding.EventGrounding;
 import org.lightcouch.DocumentConflictException;
 import org.apache.streampipes.manager.execution.status.PipelineStatusManager;
 import org.apache.streampipes.manager.execution.status.SepMonitoringManager;
@@ -36,10 +40,7 @@ import org.apache.streampipes.storage.management.StorageDispatcher;
 import org.apache.streampipes.user.management.encryption.CredentialsManager;
 
 import java.security.GeneralSecurityException;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Date;
-import java.util.List;
+import java.util.*;
 import java.util.stream.Collectors;
 
 public class PipelineExecutor {
@@ -76,15 +77,24 @@ public class PipelineExecutor {
 
     graphs.forEach(g -> g.setStreamRequirements(Arrays.asList()));
 
-    PipelineOperationStatus status = new GraphSubmitter(pipeline.getPipelineId(),
-            pipeline.getName(), decryptedGraphs, dataSets)
-            .invokeGraphs();
+    List<SpDataStreamRelayContainer> dataStreamRelayContainers = generateDataStreamRelays(decryptedGraphs);
+
+    PipelineOperationStatus status = new GraphSubmitter(
+            pipeline.getPipelineId(),
+            pipeline.getName(),
+            decryptedGraphs,
+            dataSets,
+            dataStreamRelayContainers).invokeGraphs();
 
     if (status.isSuccess()) {
       storeInvocationGraphs(pipeline.getPipelineId(), graphs, dataSets);
 
-      PipelineStatusManager.addPipelineStatus(pipeline.getPipelineId(),
-              new PipelineStatusMessage(pipeline.getPipelineId(), System.currentTimeMillis(), PipelineStatusMessageType.PIPELINE_STARTED.title(), PipelineStatusMessageType.PIPELINE_STARTED.description()));
+      PipelineStatusManager.addPipelineStatus(
+              pipeline.getPipelineId(),
+              new PipelineStatusMessage(pipeline.getPipelineId(),
+                      System.currentTimeMillis(),
+                      PipelineStatusMessageType.PIPELINE_STARTED.title(),
+                      PipelineStatusMessageType.PIPELINE_STARTED.description()));
 
       if (monitor) {
         SepMonitoringManager.addObserver(pipeline.getPipelineId());
@@ -97,40 +107,18 @@ public class PipelineExecutor {
     return status;
   }
 
-  private List<InvocableStreamPipesEntity> decryptSecrets(List<InvocableStreamPipesEntity> graphs) {
-    List<InvocableStreamPipesEntity> decryptedGraphs = new ArrayList<>();
-    graphs.stream().map(g -> {
-      if (g instanceof DataProcessorInvocation) {
-        return new DataProcessorInvocation((DataProcessorInvocation) g);
-      } else {
-        return new DataSinkInvocation((DataSinkInvocation) g);
-      }
-    }).forEach(g -> {
-      g.getStaticProperties()
-              .stream()
-              .filter(SecretStaticProperty.class::isInstance)
-              .forEach(sp -> {
-                try {
-                  String decrypted = CredentialsManager.decrypt(pipeline.getCreatedByUser(),
-                          ((SecretStaticProperty) sp).getValue());
-                  ((SecretStaticProperty) sp).setValue(decrypted);
-                  ((SecretStaticProperty) sp).setEncrypted(false);
-                } catch (GeneralSecurityException e) {
-                  e.printStackTrace();
-                }
-              });
-      decryptedGraphs.add(g);
-    });
-    return decryptedGraphs;
-  }
-
   public PipelineOperationStatus stopPipeline() {
     List<InvocableStreamPipesEntity> graphs = TemporaryGraphStorage.graphStorage.get(pipeline.getPipelineId());
     List<SpDataSet> dataSets = TemporaryGraphStorage.datasetStorage.get(pipeline.getPipelineId());
 
-    PipelineOperationStatus status = new GraphSubmitter(pipeline.getPipelineId(),
-            pipeline.getName(),  graphs, dataSets)
-            .detachGraphs();
+    List<SpDataStreamRelayContainer> dataStreamRelayContainers = generateDataStreamRelays(graphs);
+
+    PipelineOperationStatus status = new GraphSubmitter(
+            pipeline.getPipelineId(),
+            pipeline.getName(),
+            graphs,
+            dataSets,
+            dataStreamRelayContainers).detachGraphs();
 
     if (status.isSuccess()) {
       if (visualize) {
@@ -158,6 +146,82 @@ public class PipelineExecutor {
     return status;
   }
 
+  private String extractTopic(EventGrounding eg) {
+    return eg.getTransportProtocol().getTopicDefinition().getActualTopicName();
+  }
+
+  private List<SpDataStreamRelayContainer> generateDataStreamRelays(List<InvocableStreamPipesEntity> graphs) {
+    Set<String> topicSet = new HashSet<>();
+    List<SpDataStreamRelayContainer> dsRelayContainers = new ArrayList<>();
+    SpDataStreamRelayContainer dsRelayContainer = new SpDataStreamRelayContainer();
+    List<SpDataStreamRelay> dataStreamRelays = new ArrayList<>();
+
+    graphs.stream()
+            .filter(g -> pipeline.getStreams().stream()
+                    .filter(ds -> topicSet.add(extractTopic(ds.getEventGrounding())))
+                    .anyMatch(ds -> (!matchingDeploymentTargets(ds, g) && connected(ds, g))))
+            .forEach(g -> pipeline.getStreams()
+                    .forEach(ds -> {
+                      dsRelayContainer.setRunningStreamRelayInstanceId(pipeline.getPipelineId());
+                      // TODO: retrieve relay strategy from somewhere, e.g. make it accessible on pipeline level
+                      dsRelayContainer.setEventRelayStrategy(pipeline.getEventRelayStrategy());
+                      dsRelayContainer.setName(ds.getName() + " (Data Stream Relay)");
+                      dsRelayContainer.setInputGrounding(new EventGrounding(ds.getEventGrounding()));
+                      dsRelayContainer.setDeploymentTargetNodeId(ds.getDeploymentTargetNodeId());
+                      dsRelayContainer.setDeploymentTargetNodeHostname(ds.getDeploymentTargetNodeHostname());
+                      dsRelayContainer.setDeploymentTargetNodePort(ds.getDeploymentTargetNodePort());
+
+                      dataStreamRelays.add(new SpDataStreamRelay(new EventGrounding(g.getInputStreams()
+                              .get(getIndex(ds.getDOM(), g))
+                              .getEventGrounding())));
+                    })
+            );
+
+    dsRelayContainer.setOutputStreamRelays(dataStreamRelays);
+    dsRelayContainers.add(dsRelayContainer);
+
+    return dsRelayContainers;
+  }
+
+  private boolean matchingDeploymentTargets(SpDataStream source, InvocableStreamPipesEntity target) {
+    return source.getDeploymentTargetNodeId().equals(target.getDeploymentTargetNodeId());
+  }
+
+  private boolean connected(SpDataStream source, InvocableStreamPipesEntity target) {
+    int index = getIndex(source.getDOM(), target);
+    if (index != -1) {
+      return target.getConnectedTo().get(index).equals(source.getDOM());
+    }
+    return false;
+  }
+
+  private List<InvocableStreamPipesEntity> decryptSecrets(List<InvocableStreamPipesEntity> graphs) {
+    List<InvocableStreamPipesEntity> decryptedGraphs = new ArrayList<>();
+    graphs.stream().map(g -> {
+      if (g instanceof DataProcessorInvocation) {
+        return new DataProcessorInvocation((DataProcessorInvocation) g);
+      } else {
+        return new DataSinkInvocation((DataSinkInvocation) g);
+      }
+    }).forEach(g -> {
+      g.getStaticProperties()
+              .stream()
+              .filter(SecretStaticProperty.class::isInstance)
+              .forEach(sp -> {
+                try {
+                  String decrypted = CredentialsManager.decrypt(pipeline.getCreatedByUser(),
+                          ((SecretStaticProperty) sp).getValue());
+                  ((SecretStaticProperty) sp).setValue(decrypted);
+                  ((SecretStaticProperty) sp).setEncrypted(false);
+                } catch (GeneralSecurityException e) {
+                  e.printStackTrace();
+                }
+              });
+      decryptedGraphs.add(g);
+    });
+    return decryptedGraphs;
+  }
+
   private void setPipelineStarted(Pipeline pipeline) {
     pipeline.setRunning(true);
     pipeline.setStartedAt(new Date().getTime());
@@ -183,4 +247,8 @@ public class PipelineExecutor {
     return StorageDispatcher.INSTANCE.getNoSqlStore().getPipelineStorageAPI();
   }
 
+  private Integer getIndex(String sourceDomId, InvocableStreamPipesEntity targetElement) {
+    return targetElement.getConnectedTo().indexOf(sourceDomId);
+  }
+
 }
diff --git a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/http/StreamRelayEndpointUrlGenerator.java b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/http/StreamRelayEndpointUrlGenerator.java
new file mode 100644
index 0000000..e62ff41
--- /dev/null
+++ b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/http/StreamRelayEndpointUrlGenerator.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.apache.streampipes.manager.execution.http;
+import org.apache.streampipes.model.SpDataStreamRelayContainer;
+
+public class StreamRelayEndpointUrlGenerator extends EndpointUrlGenerator<SpDataStreamRelayContainer> {
+
+    private static final String BASE_ROUTE = "api/v2/node/stream/relay";
+    private static final String INVOKE_ROUTE = "/invoke";
+    private static final String DETACH_ROUTE = "/detach";
+
+    private final SpDataStreamRelayContainer streamRelay;
+
+    public StreamRelayEndpointUrlGenerator(SpDataStreamRelayContainer streamRelay) {
+        super(streamRelay);
+        this.streamRelay = streamRelay;
+    }
+
+    @Override
+    public String generateInvokeEndpoint() {
+        return generateEndpoint(INVOKE_ROUTE);
+    }
+
+    @Override
+    public String generateDetachEndpoint() {
+        return generateEndpoint(DETACH_ROUTE)
+                + SLASH
+                + streamRelay.getRunningStreamRelayInstanceId();
+    }
+
+    private String generateEndpoint(String path) {
+        return HTTP_PROTOCOL
+                + streamRelay.getDeploymentTargetNodeHostname()
+                + COLON
+                + streamRelay.getDeploymentTargetNodePort()
+                + SLASH
+                + BASE_ROUTE
+                + path;
+    }
+}
diff --git a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/matching/InvocationGraphBuilder.java b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/matching/InvocationGraphBuilder.java
index 2375543..e4c61ac 100644
--- a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/matching/InvocationGraphBuilder.java
+++ b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/matching/InvocationGraphBuilder.java
@@ -18,7 +18,9 @@
 
 package org.apache.streampipes.manager.matching;
 
+import org.apache.streampipes.commons.exceptions.SpRuntimeException;
 import org.apache.streampipes.config.backend.BackendConfig;
+import org.apache.streampipes.config.backend.SpEdgeNodeProtocol;
 import org.apache.streampipes.config.backend.SpProtocol;
 import org.apache.streampipes.container.util.ConsulUtil;
 import org.apache.streampipes.manager.data.PipelineGraph;
@@ -30,6 +32,7 @@ import org.apache.streampipes.model.SpDataStreamRelay;
 import org.apache.streampipes.model.base.InvocableStreamPipesEntity;
 import org.apache.streampipes.model.base.NamedStreamPipesEntity;
 import org.apache.streampipes.model.graph.DataProcessorInvocation;
+import org.apache.streampipes.model.graph.DataSinkInvocation;
 import org.apache.streampipes.model.grounding.*;
 import org.apache.streampipes.model.monitoring.ElementStatusInfoSettings;
 import org.apache.streampipes.model.output.OutputStrategy;
@@ -41,6 +44,7 @@ import java.util.stream.Collectors;
 
 public class InvocationGraphBuilder {
 
+  private static final String DEFAULT_TAG = "default";
   private final PipelineGraph pipelineGraph;
   private final String pipelineId;
   private Integer uniquePeIndex = 0;
@@ -62,7 +66,6 @@ public class InvocationGraphBuilder {
   }
 
   private void configure(NamedStreamPipesEntity source, Set<InvocableStreamPipesEntity> targets) {
-
     EventGrounding inputGrounding = new GroundingBuilder(source, targets).getEventGrounding();
 
     // set output stream event grounding for source data processors
@@ -83,163 +86,117 @@ public class InvocationGraphBuilder {
     // set input stream event grounding for target element data processors and sinks
     targets.forEach(t -> {
       // check if source and target share same node
-      if (source instanceof InvocableStreamPipesEntity) {
-        if (((InvocableStreamPipesEntity) source).getDeploymentTargetNodeId() != null ||
-                t.getDeploymentTargetNodeId() != null) {
+      if (source instanceof InvocableStreamPipesEntity && deploymentTargetNotNull(source, t)) {
 
-          if (matchingDeploymentTarget((InvocableStreamPipesEntity) source, t)) {
-            // both PE on same node - share grounding
+          if (matchingDeploymentTargets(source, t)) {
+            // both processor on same node - share grounding
             t.getInputStreams()
                     .get(getIndex(source.getDOM(), t))
                     .setEventGrounding(inputGrounding);
 
-          } else {
-            // check if target runs on cloud or edge node
-            if (t.getDeploymentTargetNodeId().equals("default")) {
-              // target runs on cloud node: use central cloud broker, e.g. kafka
-              // TODO: set event relay to true
-              // TODO: add cloud broker to List<EventRelays>
-              if (source instanceof DataProcessorInvocation) {
-
-                String relayTopic = inputGrounding.getTransportProtocol().getTopicDefinition().getActualTopicName();
-
-                if (relayNotExists(relayTopic, source)) {
-                  // TODO: use prioritized cloud transport protocol instead of kafka
-                  SpProtocol prioritizedProtocol =
-                          BackendConfig.INSTANCE.getMessagingSettings().getPrioritizedProtocols().get(0);
-
-                  EventGrounding relayEventGrounding = new EventGrounding();
-
-                  if (isPrioritized(prioritizedProtocol, JmsTransportProtocol.class)) {
-                    JmsTransportProtocol tp = new JmsTransportProtocol(
-                            BackendConfig.INSTANCE.getJmsHost(),
-                            BackendConfig.INSTANCE.getJmsPort(),
-                            relayTopic);
-                    relayEventGrounding.setTransportProtocol(tp);
-                  }
-                  else if (isPrioritized(prioritizedProtocol, KafkaTransportProtocol.class)) {
-                    KafkaTransportProtocol tp = new KafkaTransportProtocol(
-                            BackendConfig.INSTANCE.getKafkaHost(),
-                            BackendConfig.INSTANCE.getKafkaPort(),
-                            relayTopic,
-                            BackendConfig.INSTANCE.getZookeeperHost(),
-                            BackendConfig.INSTANCE.getZookeeperPort());
-                    relayEventGrounding.setTransportProtocol(tp);
-                  }
-                  else if (isPrioritized(prioritizedProtocol, MqttTransportProtocol.class)){
-                    MqttTransportProtocol tp = new MqttTransportProtocol(
-                            BackendConfig.INSTANCE.getMqttHost(),
-                            BackendConfig.INSTANCE.getMqttPort(),
-                            relayTopic);
-                    relayEventGrounding.setTransportProtocol(tp);
-                  }
-
-                  relayEventGrounding.setTransportFormats(inputGrounding.getTransportFormats());
-
-                  // TODO: when modifying pipelines new relay are added to old ones. Should initialize new ArrayList
-                  // graphExists()
-                  if(!graphExists(t.getDOM())) {
-                    ((DataProcessorInvocation) source)
-                            .addOutputStreamRelay(new SpDataStreamRelay(relayEventGrounding));
-                  } else {
-                    ((DataProcessorInvocation) source)
-                            .getOutputStreamRelays()
-                            .get(getIndex(source.getDOM(), t))
-                            .getEventGrounding()
-                            .getTransportProtocol()
-                            .setTopicDefinition(inputGrounding.getTransportProtocol().getTopicDefinition());
-                  }
-
-                  t.getInputStreams()
-                          .get(getIndex(source.getDOM(), t))
-                          .getEventGrounding()
-                          .getTransportProtocol()
-                          .setTopicDefinition(inputGrounding.getTransportProtocol().getTopicDefinition());
-
-                } else {
-                  // split in the end
-                  t.getInputStreams()
-                      .get(getIndex(source.getDOM(), t))
-                      .getEventGrounding()
-                      .getTransportProtocol()
-                      .setTopicDefinition(inputGrounding.getTransportProtocol().getTopicDefinition());
-                }
+          }
+          else if (defaultDeploymentTarget(t) && source instanceof DataProcessorInvocation) {
+            // target runs on cloud node: use central cloud broker, e.g. kafka
+
+            if (!eventRelayExists(source, t)) {
+
+              if(!graphExists(t.getDOM())) {
+                // add initial relay grounding to source processor
+                ((DataProcessorInvocation) source).addOutputStreamRelay(
+                        new SpDataStreamRelay(generateRelayGrounding(inputGrounding,false)));
+              } else {
+                // modify relay topic of existing relay grounding
+                modifyTopicForEventRelay(source, t, extractTopic(inputGrounding));
               }
-
+              modifyTopicForTargetInputStream(source, t, extractTopic(inputGrounding));
             } else {
-              // target runs on edge node: use target edge node broker
-              // TODO: set event relay to true
-              // TODO: add target edge node broker to List<EventRelays>
-
-              String relayTopic = inputGrounding.getTransportProtocol().getTopicDefinition().getActualTopicName();
-
-              if (relayNotExists(relayTopic, source)) {
-
-                EventGrounding relayEventGrounding = new EventGrounding();
-
-                relayEventGrounding.setTransportProtocol(
-                        new MqttTransportProtocol(
-                                getTargetNodeBrokerHost(t),
-                                getTargetNodeBrokerPort(t),
-                                relayTopic
-                        ));
-
-                relayEventGrounding.setTransportFormats(inputGrounding.getTransportFormats());
-
-                // TODO: when modifying pipelines new relay are added to old ones. Should initialize new ArrayList
-                if(!graphExists(t.getDOM())) {
-                  ((DataProcessorInvocation) source)
-                          .addOutputStreamRelay(new SpDataStreamRelay(relayEventGrounding));
-                } else {
-                  t.getInputStreams()
-                          .get(getIndex(source.getDOM(), t))
-                          .setEventGrounding(relayEventGrounding);
-                }
 
+              EventGrounding updatedRelayGrounding = generateRelayGrounding(inputGrounding, false);
+              removeExistingStreamRelay(source, t);
+              ((DataProcessorInvocation) source).addOutputStreamRelay(new SpDataStreamRelay(updatedRelayGrounding));
+              modifyTopicForTargetInputStream(source, t, extractTopic(inputGrounding));
+            }
+          }
+          else {
+            // target runs on other edge node: use target edge node broker
+            //if (!eventRelayExists(inputGrounding, source)) {
+            if (!eventRelayExists(source ,t)) {
+
+              EventGrounding relayGrounding = generateRelayGrounding(inputGrounding, t,true);
+              if(!graphExists(t.getDOM())) {
+                ((DataProcessorInvocation) source).addOutputStreamRelay(new SpDataStreamRelay(relayGrounding));
+              } else {
                 t.getInputStreams()
                         .get(getIndex(source.getDOM(), t))
-                        .setEventGrounding(((DataProcessorInvocation) source)
-                                .getOutputStreamRelays()
-                                .get(getIndex(source.getDOM(), t))
-                                .getEventGrounding());
-
+                        .setEventGrounding(relayGrounding);
               }
-//                t.getInputStreams()
-//                        .get(getIndex(source.getDOM(), t))
-//                        .setEventGrounding(((DataProcessorInvocation) source)
-//                                .getOutputStreamRelays()
-//                                .get(getIndex(source.getDOM(), t))
-//                                .getEventGrounding());
 
+              t.getInputStreams()
+                      .get(getIndex(source.getDOM(), t))
+                      .setEventGrounding(((DataProcessorInvocation) source)
+                              .getOutputStreamRelays()
+                              .get(getIndex(source.getDOM(), t))
+                              .getEventGrounding());
+            } else {
+              EventGrounding updatedRelayGrounding = generateRelayGrounding(inputGrounding, t,true);
+              removeExistingStreamRelay(source, t);
+              ((DataProcessorInvocation) source).addOutputStreamRelay(new SpDataStreamRelay(updatedRelayGrounding));
+
+              t.getInputStreams()
+                      .get(getIndex(source.getDOM(), t))
+                      .setEventGrounding(updatedRelayGrounding);
             }
           }
+//            t.getInputStreams()
+//                    .get(getIndex(source.getDOM(), t))
+//                    .setEventGrounding(inputGrounding);
         } else {
-            t.getInputStreams()
-                    .get(getIndex(source.getDOM(), t))
-                    .setEventGrounding(inputGrounding);
-        }
-      } else {
 
         // TODO: Handle following edge situation:
         //  data stream -> invocable (processor, sink) in edge deployments that do not reside on same node
         // idea: trigger corresponding node controller to relay topic to adjecent broker (either node broker or
         // global cloud broker)
-        t.getInputStreams()
-                .get(getIndex(source.getDOM(), t))
-                .setEventGrounding(inputGrounding);
+
+          if (matchingDeploymentTargets(source, t)) {
+            t.getInputStreams()
+                    .get(getIndex(source.getDOM(), t))
+                    .setEventGrounding(inputGrounding);
+
+          } else if (defaultDeploymentTarget(t)) {
+            // target runs on cloud node: use central cloud broker, e.g. kafka
+            EventGrounding eg = generateRelayGrounding(inputGrounding,false);
+            // TODO: make topic unique for target in case we have multiple source stream relays to the target
+            String oldTopic = eg.getTransportProtocol().getTopicDefinition().getActualTopicName();
+            eg.getTransportProtocol().getTopicDefinition().setActualTopicName(oldTopic + "."
+                    + this.pipelineId);
+            t.getInputStreams()
+                    .get(getIndex(source.getDOM(),t))
+                    .setEventGrounding(eg);
+          } else if (targetInvocableOnEdgeNode(t)) {
+            // case 2: target on other edge node -> relay + target node broker
+            // TODO: make topic unique for target in case we have multiple source stream relays to the target
+            EventGrounding eg = generateRelayGrounding(inputGrounding,t,true);
+            String oldTopic = eg.getTransportProtocol().getTopicDefinition().getActualTopicName();
+            eg.getTransportProtocol().getTopicDefinition().setActualTopicName(oldTopic + "."
+                    + this.pipelineId);
+            t.getInputStreams()
+                    .get(getIndex(source.getDOM(), t))
+                    .setEventGrounding(eg);
+          } else {
+            // default case while modelling. no deployment target known
+            t.getInputStreams()
+                    .get(getIndex(source.getDOM(), t))
+                    .setEventGrounding(inputGrounding);
+          }
       }
 
-      // old
-//      t.getInputStreams()
-//              .get(getIndex(source.getDOM(), t))
-//              .setEventGrounding(inputGrounding);
 
       t.getInputStreams()
               .get(getIndex(source.getDOM(), t))
               .setEventSchema(getInputSchema(source));
 
-      String elementIdentifier = makeElementIdentifier(pipelineId, inputGrounding
-              .getTransportProtocol().getTopicDefinition().getActualTopicName(), t.getName());
+      String elementIdentifier = makeElementIdentifier(pipelineId,
+              inputGrounding.getTransportProtocol().getTopicDefinition().getActualTopicName(), t.getName());
 
       t.setElementId(t.getBelongsTo() + "/" + elementIdentifier);
       t.setDeploymentRunningInstanceId(elementIdentifier);
@@ -253,14 +210,71 @@ public class InvocationGraphBuilder {
     });
   }
 
-  private boolean relayNotExists(String relayTopic, NamedStreamPipesEntity source) {
-    return ((DataProcessorInvocation) source)
+  private void removeExistingStreamRelay(NamedStreamPipesEntity source, InvocableStreamPipesEntity t) {
+    ((DataProcessorInvocation) source).removeOutputStreamRelay(
+            ((DataProcessorInvocation) source)
+                    .getOutputStreamRelays()
+                    .get(getIndex(source.getDOM(), t)));
+  }
+
+  private boolean targetInvocableOnEdgeNode(InvocableStreamPipesEntity t) {
+    return t.getDeploymentTargetNodeId() != null && !t.getDeploymentTargetNodeId().equals(DEFAULT_TAG);
+  }
+
+  private boolean defaultDeploymentTarget(InvocableStreamPipesEntity t) {
+    return t.getDeploymentTargetNodeId() != null && t.getDeploymentTargetNodeId().equals(DEFAULT_TAG);
+  }
+
+  private void modifyTopicForTargetInputStream(NamedStreamPipesEntity s, InvocableStreamPipesEntity t,
+                                               String topic) {
+    t.getInputStreams()
+            .get(getIndex(s.getDOM(), t))
+            .getEventGrounding()
+            .getTransportProtocol()
+            .getTopicDefinition()
+            .setActualTopicName(topic);
+  }
+
+  private void modifyTopicForEventRelay(NamedStreamPipesEntity s, InvocableStreamPipesEntity t,
+                                        String topic) {
+    ((DataProcessorInvocation) s)
+            .getOutputStreamRelays()
+            .get(getIndex(s.getDOM(), t))
+            .getEventGrounding()
+            .getTransportProtocol()
+            .getTopicDefinition()
+            .setActualTopicName(topic);
+  }
+
+  private boolean deploymentTargetNotNull(NamedStreamPipesEntity s, InvocableStreamPipesEntity t) {
+    if (s instanceof SpDataStream) {
+      return ((SpDataStream) s).getDeploymentTargetNodeId() != null && t.getDeploymentTargetNodeId() != null;
+    } else {
+      return ((InvocableStreamPipesEntity) s).getDeploymentTargetNodeId() != null &&
+              t.getDeploymentTargetNodeId() != null;
+    }
+  }
+
+  private String extractTopic(EventGrounding eg) {
+    return eg.getTransportProtocol().getTopicDefinition().getActualTopicName();
+  }
+
+  private boolean eventRelayExists(EventGrounding eg, NamedStreamPipesEntity s) {
+    return ((DataProcessorInvocation) s)
             .getOutputStreamRelays()
             .stream()
-            .noneMatch(r -> r.getEventGrounding()
+            .anyMatch(r -> r.getEventGrounding()
                     .getTransportProtocol()
                     .getTopicDefinition()
-                    .getActualTopicName().equals(relayTopic));
+                    .getActualTopicName().equals(extractTopic(eg)));
+  }
+
+  private boolean eventRelayExists(NamedStreamPipesEntity s, InvocableStreamPipesEntity t) {
+    int idx = getIndex(s.getDOM(), t);
+    return ((DataProcessorInvocation) s).getOutputStreamRelays()
+            .stream()
+            .anyMatch(i -> extractTopic(i.getEventGrounding()).equals(
+                    extractTopic(t.getInputStreams().get(idx).getEventGrounding())));
   }
 
   private Tuple2<EventSchema,? extends OutputStrategy> getOutputSettings(DataProcessorInvocation dataProcessorInvocation) {
@@ -311,11 +325,17 @@ public class InvocationGraphBuilder {
   }
 
 
-  private boolean matchingDeploymentTarget(InvocableStreamPipesEntity source, InvocableStreamPipesEntity target) {
-    if (source instanceof DataProcessorInvocation && target instanceof DataProcessorInvocation) {
-      if (source.getDeploymentTargetNodeId().equals(target.getDeploymentTargetNodeId())) {
-        return true;
-      }
+  private boolean matchingDeploymentTargets(NamedStreamPipesEntity s, InvocableStreamPipesEntity t) {
+    if (s instanceof DataProcessorInvocation &&
+            (t instanceof DataProcessorInvocation || t instanceof DataSinkInvocation) && deploymentTargetNotNull(s,t)) {
+        return ((DataProcessorInvocation) s)
+                .getDeploymentTargetNodeId()
+                .equals(t.getDeploymentTargetNodeId());
+    } else if (s instanceof SpDataStream &&
+            (t instanceof DataProcessorInvocation || t instanceof DataSinkInvocation) && deploymentTargetNotNull(s,t)) {
+        return ((SpDataStream) s)
+                .getDeploymentTargetNodeId()
+                .equals(t.getDeploymentTargetNodeId());
     }
     return false;
   }
@@ -379,8 +399,83 @@ public class InvocationGraphBuilder {
             .get();
   }
 
-  public static Boolean isPrioritized(SpProtocol prioritizedProtocol,
-                                      Class<?> protocolClass) {
-    return prioritizedProtocol.getProtocolClass().equals(protocolClass.getCanonicalName());
+  private EventGrounding generateRelayGrounding(EventGrounding sourceInvocableOutputGrounding,
+                                                boolean edgeToEdgeRelay) {
+    return generateRelayGrounding(sourceInvocableOutputGrounding, null, edgeToEdgeRelay);
+  }
+
+  private EventGrounding generateRelayGrounding(EventGrounding sourceInvocableOutputGrounding,
+                                                InvocableStreamPipesEntity target, boolean edgeToEdgeRelay) {
+    EventGrounding eg = new EventGrounding();
+    String topic = extractTopic(sourceInvocableOutputGrounding);
+    if (edgeToEdgeRelay) {
+      eg.setTransportProtocol(getTargetNodeProtocol(
+              BackendConfig.INSTANCE.getMessagingSettings().getEdgeNodeProtocol(), topic, target));
+    } else {
+      eg.setTransportProtocol(getPrioritizedGlobalProtocol(
+              BackendConfig.INSTANCE.getMessagingSettings().getPrioritizedProtocols().get(0), topic));
+    }
+    eg.setTransportFormats(sourceInvocableOutputGrounding.getTransportFormats());
+    return eg;
+  }
+
+  private TransportProtocol getPrioritizedGlobalProtocol(SpProtocol p, String topic) {
+    if (matches(p, JmsTransportProtocol.class)) {
+      return jmsTransportProtocol(topic);
+    } else if (matches(p, KafkaTransportProtocol.class)) {
+      return kafkaTransportProtocol(topic);
+    } else if (matches(p, MqttTransportProtocol.class)){
+      return mqttTransportProtocol(topic);
+    }
+    throw new SpRuntimeException("Could not retrieve prioritized transport protocol");
+  }
+
+  private TransportProtocol getTargetNodeProtocol(SpEdgeNodeProtocol p, String topic,
+                                                  InvocableStreamPipesEntity target) {
+    if (matches(p, MqttTransportProtocol.class)){
+      return mqttTransportProtocol(topic, target);
+    }
+    throw new SpRuntimeException("Could not retrieve prioritized transport protocol");
+  }
+
+  private JmsTransportProtocol jmsTransportProtocol(String topic) {
+    return new JmsTransportProtocol(
+            BackendConfig.INSTANCE.getJmsHost(),
+            BackendConfig.INSTANCE.getJmsPort(),
+            topic);
+  }
+
+  private KafkaTransportProtocol kafkaTransportProtocol(String topic) {
+    return new KafkaTransportProtocol(
+            BackendConfig.INSTANCE.getKafkaHost(),
+            BackendConfig.INSTANCE.getKafkaPort(),
+            topic,
+            BackendConfig.INSTANCE.getZookeeperHost(),
+            BackendConfig.INSTANCE.getZookeeperPort());
+  }
+
+  private MqttTransportProtocol mqttTransportProtocol(String topic) {
+    return mqttTransportProtocol(topic, null);
+  }
+
+  private MqttTransportProtocol mqttTransportProtocol(String topic, InvocableStreamPipesEntity target) {
+    if (target != null) {
+      return new MqttTransportProtocol(
+              getTargetNodeBrokerHost(target),
+              getTargetNodeBrokerPort(target),
+              topic);
+    }
+    return new MqttTransportProtocol(
+            BackendConfig.INSTANCE.getMqttHost(),
+            BackendConfig.INSTANCE.getMqttPort(),
+            topic);
+  }
+
+  private <T extends TransportProtocol> boolean matches(SpProtocol p, Class<T> clazz) {
+    return p.getProtocolClass().equals(clazz.getCanonicalName());
+  }
+
+  private <T extends TransportProtocol> boolean matches(SpEdgeNodeProtocol p, Class<T> clazz) {
+    return p.getProtocolClass().equals(clazz.getCanonicalName());
   }
 }
\ No newline at end of file
diff --git a/streampipes-serializers/src/main/java/org/apache/streampipes/serializers/jsonld/CustomAnnotationProvider.java b/streampipes-serializers/src/main/java/org/apache/streampipes/serializers/jsonld/CustomAnnotationProvider.java
index 4b6f1e7..7d0c404 100644
--- a/streampipes-serializers/src/main/java/org/apache/streampipes/serializers/jsonld/CustomAnnotationProvider.java
+++ b/streampipes-serializers/src/main/java/org/apache/streampipes/serializers/jsonld/CustomAnnotationProvider.java
@@ -184,7 +184,8 @@ public class CustomAnnotationProvider implements EmpireAnnotationProvider {
             DataExplorerWidgetModel.class,
             StreamPipesJsonLdContainer.class,
             DataLakeMeasure.class,
-            SpDataStreamRelay.class
+            SpDataStreamRelay.class,
+            SpDataStreamRelayContainer.class
     );
   }
 }
diff --git a/streampipes-serializers/src/main/java/org/apache/streampipes/serializers/jsonld/JsonLdTransformer.java b/streampipes-serializers/src/main/java/org/apache/streampipes/serializers/jsonld/JsonLdTransformer.java
index 5d2be10..6a6d059 100644
--- a/streampipes-serializers/src/main/java/org/apache/streampipes/serializers/jsonld/JsonLdTransformer.java
+++ b/streampipes-serializers/src/main/java/org/apache/streampipes/serializers/jsonld/JsonLdTransformer.java
@@ -69,7 +69,8 @@ public class JsonLdTransformer implements RdfTransformer {
           StreamPipes.CONNECT_WORKER_CONTAINER,
           StreamPipes.DASHBOARD_WIDGET_MODEL,
           StreamPipes.DASHBOARD_MODEL,
-          StreamPipes.DATA_EXPLORER_WIDGET_MODEL
+          StreamPipes.DATA_EXPLORER_WIDGET_MODEL,
+          StreamPipes.DATA_STREAM_RELAY_CONTAINER
   );
 
   private List<String> selectedRootElements;
diff --git a/streampipes-vocabulary/src/main/java/org/apache/streampipes/vocabulary/StreamPipes.java b/streampipes-vocabulary/src/main/java/org/apache/streampipes/vocabulary/StreamPipes.java
index c91b49c..d7efdfc 100644
--- a/streampipes-vocabulary/src/main/java/org/apache/streampipes/vocabulary/StreamPipes.java
+++ b/streampipes-vocabulary/src/main/java/org/apache/streampipes/vocabulary/StreamPipes.java
@@ -421,4 +421,6 @@ public class StreamPipes {
   public static final String HAS_REQUIRED_FILETYPES = NS + "hasRequiredFiletypes" ;
   public static final String HAS_EVENT_RELAY = NS + "hasEventRelay";
   public static final String HAS_EVENT_RELAY_STRATEGY = NS + "hasEventRelayStrategy";
+  public static final String DATA_STREAM_RELAY_CONTAINER = NS + "DataStreamRelayContainer" ;
+  public static final String DATA_STREAM_RELAY_RUNNING_INSTANCE_ID = NS + "hasDataStreamRelayRunningInstanceId";
 }
diff --git a/ui/src/app/core-model/gen/streampipes-model.ts b/ui/src/app/core-model/gen/streampipes-model.ts
index e87a2d9..d831e9b 100644
--- a/ui/src/app/core-model/gen/streampipes-model.ts
+++ b/ui/src/app/core-model/gen/streampipes-model.ts
@@ -19,7 +19,7 @@
 /* tslint:disable */
 /* eslint-disable */
 // @ts-nocheck
-// Generated using typescript-generator version 2.24.612 on 2020-12-12 16:11:38.
+// Generated using typescript-generator version 2.24.612 on 2020-12-19 20:40:34.
 
 export class AbstractStreamPipesEntity {
     "@class": "org.apache.streampipes.model.base.NamedStreamPipesEntity" | "org.apache.streampipes.model.connect.adapter.AdapterDescription" | "org.apache.streampipes.model.connect.adapter.AdapterSetDescription" | "org.apache.streampipes.model.connect.adapter.GenericAdapterSetDescription" | "org.apache.streampipes.model.connect.adapter.SpecificAdapterSetDescription" | "org.apache.streampipes.model.connect.adapter.AdapterStreamDescription" | "org.apache.streampipes.model.connect.adapter.G [...]
@@ -2254,6 +2254,7 @@ export class Pipeline extends ElementComposition {
     actions: DataSinkInvocation[];
     createdAt: number;
     createdByUser: string;
+    eventRelayStrategy: string;
     pipelineCategories: string[];
     publicElement: boolean;
     running: boolean;
@@ -2271,6 +2272,7 @@ export class Pipeline extends ElementComposition {
         instance.createdAt = data.createdAt;
         instance.publicElement = data.publicElement;
         instance.createdByUser = data.createdByUser;
+        instance.eventRelayStrategy = data.eventRelayStrategy;
         instance.pipelineCategories = __getCopyArrayFn(__identity<string>())(data.pipelineCategories);
         instance._id = data._id;
         instance._rev = data._rev;
diff --git a/ui/src/app/editor/dialog/save-pipeline/save-pipeline.component.html b/ui/src/app/editor/dialog/save-pipeline/save-pipeline.component.html
index 0db73ad..77588b6 100644
--- a/ui/src/app/editor/dialog/save-pipeline/save-pipeline.component.html
+++ b/ui/src/app/editor/dialog/save-pipeline/save-pipeline.component.html
@@ -154,52 +154,6 @@
                     </div>
 
                 </div>
-<!--                <mat-divider *ngIf="advancedSettings" style="margin: 1em 0 1em 0;"></mat-divider>-->
-<!--                <div *ngIf="advancedSettings">-->
-<!--                    <b>Node Selection</b>-->
-<!--                    <div *ngFor="let processors of pipeline.sepas">-->
-<!--                        <div fxFlex="100" fxLayout="row">-->
-<!--                            <div fxFlex="50" fxLayout="row" fxLayoutAlign="left center">-->
-<!--                                <span>{{processors.name}}</span>-->
-<!--                            </div>-->
-<!--                        </div>-->
-<!--                        <div fxFlex="50" fxLayout="row" fxLayoutAlign="start center">-->
-<!--                            <mat-form-field appearance="outline">-->
-<!--                            <mat-select [(ngModel)]="processors.deploymentTargetNodeId"-->
-<!--                                        [disabled]="disableNodeSelection.value"-->
-<!--                                        style="margin: 0;height: 20px;"-->
-<!--                                        placeholder="select execution node"-->
-<!--                                        required>-->
-<!--                                <mat-option [value]="nodeInfo.nodeControllerId"-->
-<!--                                            *ngFor="let nodeInfo of deploymentOptions[processors.appId]">-->
-<!--                                    <em>{{nodeInfo.nodeMetadata.nodeAddress}}</em>-->
-<!--                                </mat-option>-->
-<!--                            </mat-select>-->
-<!--                            </mat-form-field>-->
-<!--                        </div>-->
-<!--                    </div>-->
-<!--                    <div *ngFor="let sinks of pipeline.actions">-->
-<!--                        <div fxFlex="100" fxLayout="row">-->
-<!--                            <div fxFlex="50" fxLayout="row" fxLayoutAlign="left center">-->
-<!--                                <span>{{sinks.name}}</span>-->
-<!--                            </div>-->
-<!--                        </div>-->
-<!--                        <div fxFlex="50" fxLayout="row" fxLayoutAlign="start center">-->
-<!--                            <mat-form-field appearance="outline">-->
-<!--                                <mat-select [(ngModel)]="sinks.deploymentTargetNodeId"-->
-<!--                                            [disabled]="disableNodeSelection.value"-->
-<!--                                            style="margin: 0; height: 20px;"-->
-<!--                                            placeholder="select execution node"-->
-<!--                                            required>-->
-<!--                                    <mat-option [value]="nodeInfo.nodeControllerId" *ngFor="let nodeInfo of-->
-<!--                                    deploymentOptions[sinks.appId]">-->
-<!--                                        <em>{{nodeInfo.nodeMetadata.nodeAddress}}</em>-->
-<!--                                    </mat-option>-->
-<!--                                </mat-select>-->
-<!--                            </mat-form-field>-->
-<!--                        </div>-->
-<!--                    </div>-->
-<!--                </div>-->
             </div>
             <div fxFlex="100" fxLayout="column" fxLayoutAlign="center center" *ngIf="saving">
                 <mat-spinner [mode]="'indeterminate'" [diameter]="50"></mat-spinner>
diff --git a/ui/src/app/editor/dialog/save-pipeline/save-pipeline.component.ts b/ui/src/app/editor/dialog/save-pipeline/save-pipeline.component.ts
index f782519..6d5a748 100644
--- a/ui/src/app/editor/dialog/save-pipeline/save-pipeline.component.ts
+++ b/ui/src/app/editor/dialog/save-pipeline/save-pipeline.component.ts
@@ -240,12 +240,14 @@ export class SavePipelineComponent implements OnInit {
     if (this.currentModifiedPipelineId && this.updateMode === 'update') {
       this.modifyPipelineElementsDeployments(this.tmpPipeline.sepas)
       this.modifyPipelineElementsDeployments(this.tmpPipeline.actions)
+      this.tmpPipeline.eventRelayStrategy = this.selectedRelayStrategyVal;
       this.pipeline = this.tmpPipeline;
       storageRequest = this.pipelineService.updatePipeline(this.pipeline);
     } else {
       this.pipeline._id = undefined;
       this.modifyPipelineElementsDeployments(this.tmpPipeline.sepas)
       this.modifyPipelineElementsDeployments(this.tmpPipeline.actions)
+      this.tmpPipeline.eventRelayStrategy = this.selectedRelayStrategyVal;
       this.pipeline = this.tmpPipeline;
       storageRequest = this.pipelineService.storePipeline(this.pipeline);
     }