You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@streampipes.apache.org by wi...@apache.org on 2020/11/20 09:03:05 UTC

[incubator-streampipes] 03/03: initial changes to InvocableGraphBuilder

This is an automated email from the ASF dual-hosted git repository.

wiener pushed a commit to branch edge-extensions
in repository https://gitbox.apache.org/repos/asf/incubator-streampipes.git

commit 3588b8a1849f10e8e0a1497fd18fb12bb9067a31
Author: Patrick Wiener <wi...@fzi.de>
AuthorDate: Fri Nov 20 10:01:24 2020 +0100

    initial changes to InvocableGraphBuilder
---
 .../master/management/WorkerRestClient.java        |   2 +-
 .../streampipes/container/util/ConsulUtil.java     |  10 +-
 streampipes-node-controller-container/pom.xml      |   3 +-
 .../{init => }/NodeControllerContainer.java        |   5 +-
 .../controller/container/config/ConfigKeys.java    |   1 -
 .../container/management/info/NodeInfoStorage.java |   2 +-
 .../management/resource/ResourceManager.java       |  52 +++----
 .../AbstractNodeContainerResource.java}            |  49 ++++---
 .../container/rest/DebugRelayResource.java         |  55 +++++++
 .../HealthCheckResource.java}                      |  11 +-
 .../InfoStatusResource.java}                       |  14 +-
 .../NodeControllerResourceConfig.java}             |  18 +--
 .../PELifeCycleResource.java}                      |  81 +++--------
 .../http/InvocableEntityUrlGenerator.java          |   4 +-
 .../manager/matching/InvocationGraphBuilder.java   | 161 +++++++++++++++------
 .../manager/matching/ProtocolSelector.java         |  44 +++---
 .../apache/streampipes/vocabulary/StreamPipes.java |   1 +
 .../save-pipeline/save-pipeline.component.html     |  13 +-
 .../save-pipeline/save-pipeline.component.scss     |   4 +
 ui/src/app/editor/editor.module.ts                 |   4 +-
 20 files changed, 323 insertions(+), 211 deletions(-)

diff --git a/streampipes-connect-container-master/src/main/java/org/apache/streampipes/connect/container/master/management/WorkerRestClient.java b/streampipes-connect-container-master/src/main/java/org/apache/streampipes/connect/container/master/management/WorkerRestClient.java
index 747fdbb..3d1a4cc 100644
--- a/streampipes-connect-container-master/src/main/java/org/apache/streampipes/connect/container/master/management/WorkerRestClient.java
+++ b/streampipes-connect-container-master/src/main/java/org/apache/streampipes/connect/container/master/management/WorkerRestClient.java
@@ -263,7 +263,7 @@ public class WorkerRestClient {
 
     private static byte[] getIconAsset(String baseUrl,  String appId) throws AdapterException {
         String url = baseUrl + "/" + appId + "/assets/icon";
-        logger.info("Trying to get icon from endpoint: " + url);
+        logger.debug("Trying to get icon from endpoint: " + url);
 
         try {
             byte[] responseString = Request.Get(url)
diff --git a/streampipes-container/src/main/java/org/apache/streampipes/container/util/ConsulUtil.java b/streampipes-container/src/main/java/org/apache/streampipes/container/util/ConsulUtil.java
index ecfc10e..d90df3d 100644
--- a/streampipes-container/src/main/java/org/apache/streampipes/container/util/ConsulUtil.java
+++ b/streampipes-container/src/main/java/org/apache/streampipes/container/util/ConsulUtil.java
@@ -170,8 +170,8 @@ public class ConsulUtil {
   }
 
 
-  public static int getElementEndpointPort(String route) {
-    String value = ConsulUtil.getKeyValue(route)
+  public static int getIntValue(String route) {
+    String value = getKeyValue(route)
             .values()
             .stream()
             .findFirst()
@@ -188,8 +188,8 @@ public class ConsulUtil {
     return Integer.parseInt(new Gson().fromJson(value, ConfigItem.class).getValue());
   }
 
-  public static String getElementEndpointHostname(String route) {
-    String value = ConsulUtil.getKeyValue(route)
+  public static String getStringValue(String route) {
+    String value = getKeyValue(route)
             .values()
             .stream()
             .findFirst()
@@ -241,7 +241,7 @@ public class ConsulUtil {
     for (ServiceHealth node : nodes) {
       if (node.getService().getTags().containsAll(filterByTags)) {
         String endpoint = node.getService().getAddress() + ":" + node.getService().getPort();
-        LOG.info("Active" + serviceGroup + " endpoint:" + endpoint);
+        LOG.info("Active " + serviceGroup + " endpoint: " + endpoint);
         endpoints.add(endpoint);
       }
     }
diff --git a/streampipes-node-controller-container/pom.xml b/streampipes-node-controller-container/pom.xml
index ca770a5..9f274d6 100644
--- a/streampipes-node-controller-container/pom.xml
+++ b/streampipes-node-controller-container/pom.xml
@@ -100,7 +100,7 @@
         <dependency>
             <groupId>com.github.oshi</groupId>
             <artifactId>oshi-core</artifactId>
-            <version>4.1.1</version>
+            <version>5.3.6</version>
         </dependency>
         <dependency>
             <groupId>org.eclipse.paho</groupId>
@@ -116,7 +116,6 @@
             <groupId>org.apache.streampipes</groupId>
             <artifactId>streampipes-messaging-kafka</artifactId>
             <version>0.68.0-SNAPSHOT</version>
-            <scope>compile</scope>
         </dependency>
     </dependencies>
 
diff --git a/streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/init/NodeControllerContainer.java b/streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/NodeControllerContainer.java
similarity index 93%
rename from streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/init/NodeControllerContainer.java
rename to streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/NodeControllerContainer.java
index 96de524..8210dc4 100644
--- a/streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/init/NodeControllerContainer.java
+++ b/streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/NodeControllerContainer.java
@@ -1,4 +1,4 @@
-package org.apache.streampipes.node.controller.container.init;
+package org.apache.streampipes.node.controller.container;
 /*
  * Licensed to the Apache Software Foundation (ASF) under one or more
  * contributor license agreements.  See the NOTICE file distributed with
@@ -18,6 +18,7 @@ package org.apache.streampipes.node.controller.container.init;
  */
 
 import org.apache.streampipes.container.util.ConsulUtil;
+import org.apache.streampipes.node.controller.container.rest.NodeControllerResourceConfig;
 import org.apache.streampipes.node.controller.container.config.NodeControllerConfig;
 import org.apache.streampipes.node.controller.container.management.info.NodeInfoStorage;
 import org.apache.streampipes.node.controller.container.management.node.NodeJanitorManager;
@@ -34,7 +35,7 @@ import java.util.Collections;
 
 @Configuration
 @EnableAutoConfiguration
-@Import({ NodeControllerContainerResourceConfig.class })
+@Import({ NodeControllerResourceConfig.class })
 public class NodeControllerContainer {
 
     private static final Logger LOG =
diff --git a/streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/config/ConfigKeys.java b/streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/config/ConfigKeys.java
index 1335ee6..e3ea78e 100644
--- a/streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/config/ConfigKeys.java
+++ b/streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/config/ConfigKeys.java
@@ -31,6 +31,5 @@ public class ConfigKeys {
     final static String NODE_SUPPORTED_PE_APP_ID_KEY = "SP_NODE_SUPPORTED_PE_APP_ID";
     final static String DOCKER_PRUNING_FREQ_SECS_KEY = "SP_DOCKER_PRUNING_FREQ_SECS";
     final static String NODE_RESOURCE_UPDATE_FREQ_SECS_KEY = "SP_NODE_RESOURCE_UPDATE_FREQ_SECS";
-
     final static String NODE_EVENT_BUFFER_SIZE = "SP_NODE_EVENT_BUFFER_SIZE";
 }
diff --git a/streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/management/info/NodeInfoStorage.java b/streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/management/info/NodeInfoStorage.java
index 63a985c..246dd37 100644
--- a/streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/management/info/NodeInfoStorage.java
+++ b/streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/management/info/NodeInfoStorage.java
@@ -210,7 +210,7 @@ public class NodeInfoStorage {
     }
 
     private static Long getDiskUsage(FileSystem fs) {
-        OSFileStore[] fsArray = fs.getFileStores();
+        List<OSFileStore> fsArray = fs.getFileStores();
         long diskTotal = 0L;
         for(OSFileStore f : fsArray) {
             // has SATA disk
diff --git a/streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/management/resource/ResourceManager.java b/streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/management/resource/ResourceManager.java
index aa7a835..e9b5c31 100644
--- a/streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/management/resource/ResourceManager.java
+++ b/streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/management/resource/ResourceManager.java
@@ -65,7 +65,7 @@ public class ResourceManager {
         new Thread(getCurrentResources, "rm").start();
     }
 
-    private Runnable getCurrentResources = () -> {
+    private final Runnable getCurrentResources = () -> {
 
         while(true) {
             try {
@@ -94,12 +94,12 @@ public class ResourceManager {
                 nodeResources.put("cpuLoadInPercent", cpuLoad);
                 nodeResources.put("cpuTemperature", String.format("%.2f°C", cpuTemperature));
                 nodeResources.put("cpuTemperatureCelcius", cpuTemperature);
-                nodeResources.put("freeMemory", freeMemory);
-                nodeResources.put("usedMemory", usedMemory);
-                nodeResources.put("totalMemory", totalMemory);
+                nodeResources.put("freeMemoryInBytes", freeMemory);
+                nodeResources.put("usedMemoryInBytes", usedMemory);
+                nodeResources.put("totalMemoryInBytes", totalMemory);
 
                 for (Map.Entry<String, Map<String, Long>> k : diskUsage.entrySet()) {
-                    nodeResources.put("availableDisk", k.getValue().get("available"));
+                    nodeResources.put("freeDiskSpaceInBytes", k.getValue().get("usableDiskSpace"));
                 }
 
             } catch (InterruptedException e) {
@@ -118,67 +118,67 @@ public class ResourceManager {
     }
 
     private Map<String, Map<String,Long>>  getDiskUsage(FileSystem fs) {
-        OSFileStore[] fsArray = fs.getFileStores();
+        List<OSFileStore> fsArray = fs.getFileStores();
         Map<String, Map<String, Long>> m = new HashMap<>();
         for(OSFileStore f : fsArray) {
             Map<String, Long> i = new HashMap<>();
             // has SATA disk
             if (f.getVolume().contains("/dev/sda")){
-                i.put("available", f.getUsableSpace());
-                i.put("total", f.getTotalSpace());
+                i.put("usableDiskSpace", f.getUsableSpace());
+                i.put("totalDiskSpace", f.getTotalSpace());
                 m.put(f.getVolume(), i);
             }
             else if (f.getVolume().contains("/dev/nvme")){
-                i.put("available", f.getUsableSpace());
-                i.put("total", f.getTotalSpace());
+                i.put("usableDiskSpace", f.getUsableSpace());
+                i.put("totalDiskSpace", f.getTotalSpace());
                 m.put(f.getVolume(), i);
             }
             else if (f.getVolume().contains("/dev/disk")){
-                i.put("available", f.getUsableSpace());
-                i.put("total", f.getTotalSpace());
+                i.put("usableDiskSpace", f.getUsableSpace());
+                i.put("totalDiskSpace", f.getTotalSpace());
                 m.put(f.getVolume(), i);
             }
             // Docker in RPi
             else if (f.getVolume().contains("/dev/root")){
-                i.put("available", f.getUsableSpace());
-                i.put("total", f.getTotalSpace());
+                i.put("usableDiskSpace", f.getUsableSpace());
+                i.put("totalDiskSpace", f.getTotalSpace());
                 m.put(f.getVolume(), i);
             }
             // Docker in Jetson Nano
             else if (f.getVolume().contains("/dev/mmcblk0p1")){
-                i.put("available", f.getUsableSpace());
-                i.put("total", f.getTotalSpace());
+                i.put("usableDiskSpace", f.getUsableSpace());
+                i.put("totalDiskSpace", f.getTotalSpace());
                 m.put(f.getVolume(), i);
             }
 //            // has SATA disk
 //            if (f.getVolume().contains("/dev/sda") && ( f.getMount().equals("/") || f.getMount().equals("/home"))){
-//                i.put("available", f.getUsableSpace());
-//                i.put("total", f.getTotalSpace());
+//                i.put("usableDiskSpace", f.getUsableSpace());
+//                i.put("totalDiskSpace", f.getTotalSpace());
 //                m.put(f.getVolume(), i);
 //            }
 //            // has overlay disk (container setup)
 //            else if (f.getVolume().equals("overlay") && ( f.getMount().equals("/") || f.getMount().equals("/home"))){
-//                    i.put("available", f.getUsableSpace());
-//                    i.put("total", f.getTotalSpace());
+//                    i.put("usableDiskSpace", f.getUsableSpace());
+//                    i.put("totalDiskSpace", f.getTotalSpace());
 //                    m.put(f.getVolume(), i);
 //            }
 //            // has NVME disk
 //            else if(f.getVolume().contains("/dev/nvme") && ( f.getMount().equals("/") || f.getMount().equals("/home"))) {
-//                i.put("available", f.getUsableSpace());
-//                i.put("total", f.getTotalSpace());
+//                i.put("usableDiskSpace", f.getUsableSpace());
+//                i.put("totalDiskSpace", f.getTotalSpace());
 //                m.put(f.getVolume(), i);
 //            }
 //            // disk on macOS
 //            else if (f.getVolume().contains("/dev/disk") && f.getMount().equals("/")) {
-//                i.put("available", f.getUsableSpace());
-//                i.put("total", f.getTotalSpace());
+//                i.put("usableDiskSpace", f.getUsableSpace());
+//                i.put("totalDiskSpace", f.getTotalSpace());
 //                m.put(f.getVolume(), i);
 //            }
         }
         if (m.isEmpty()) {
             Map<String, Long> i = new HashMap<>();
-            i.put("available", 0L);
-            i.put("total", 0L);
+            i.put("usableDiskSpace", 0L);
+            i.put("totalDiskSpace", 0L);
             m.put("n/a", i);
         }
         return m;
diff --git a/streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/api/NodeInfoStatusResource.java b/streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/rest/AbstractNodeContainerResource.java
similarity index 54%
copy from streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/api/NodeInfoStatusResource.java
copy to streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/rest/AbstractNodeContainerResource.java
index 628ef09..86c4444 100644
--- a/streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/api/NodeInfoStatusResource.java
+++ b/streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/rest/AbstractNodeContainerResource.java
@@ -1,4 +1,3 @@
-package org.apache.streampipes.node.controller.container.api;
 /*
  * Licensed to the Apache Software Foundation (ASF) under one or more
  * contributor license agreements.  See the NOTICE file distributed with
@@ -16,35 +15,47 @@ package org.apache.streampipes.node.controller.container.api;
  * limitations under the License.
  *
  */
-import org.apache.streampipes.node.controller.container.management.info.NodeInfoStorage;
-import org.apache.streampipes.node.controller.container.management.resource.ResourceManager;
+package org.apache.streampipes.node.controller.container.rest;
+
+import org.apache.streampipes.model.message.Message;
+import sun.security.provider.certpath.OCSPResponse;
 
-import javax.ws.rs.GET;
-import javax.ws.rs.Path;
-import javax.ws.rs.Produces;
-import javax.ws.rs.core.MediaType;
 import javax.ws.rs.core.Response;
 
-@Path("/node")
-public class NodeInfoStatusResource {
+public abstract class AbstractNodeContainerResource {
 
-    @GET
-    @Path("/info")
-    @Produces(MediaType.APPLICATION_JSON)
-    public Response getInfo() {
+    protected <T> Response ok(T entity) {
         return Response
                 .ok()
-                .entity(NodeInfoStorage.getInstance().retrieveNodeInfo())
+                .entity(entity)
                 .build();
     }
 
-    @GET
-    @Path("/status")
-    @Produces(MediaType.APPLICATION_JSON)
-    public Response getStatus() {
+    protected <T> Response ok() {
         return Response
                 .ok()
-                .entity(ResourceManager.getInstance().retrieveNodeResources())
+                .build();
+    }
+
+    protected <T> Response error(T entity) {
+        return Response
+                .status(500)
+                .entity(entity)
+                .build();
+    }
+
+    protected Response statusMessage(Message message) {
+        return ok(message);
+    }
+
+    protected Response fail() {
+        return Response.serverError().build();
+    }
+
+    protected <T> Response fail(T entity) {
+        return Response
+                .serverError()
+                .entity(entity)
                 .build();
     }
 }
diff --git a/streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/rest/DebugRelayResource.java b/streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/rest/DebugRelayResource.java
new file mode 100644
index 0000000..0ff1ca6
--- /dev/null
+++ b/streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/rest/DebugRelayResource.java
@@ -0,0 +1,55 @@
+package org.apache.streampipes.node.controller.container.rest;/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+import org.apache.streampipes.commons.exceptions.SpRuntimeException;
+import org.apache.streampipes.node.controller.container.management.relay.EventRelayManager;
+import org.apache.streampipes.node.controller.container.management.relay.RunningRelayInstances;
+
+import javax.ws.rs.POST;
+import javax.ws.rs.Path;
+import javax.ws.rs.core.Response;
+
+public class DebugRelayResource extends AbstractNodeContainerResource {
+
+    // TODO: Debug-only.
+    @POST
+    @Path("/relay/start")
+    public Response debugRelayEventStream(String msg) throws SpRuntimeException {
+        // TODO implement
+
+        System.out.println(msg);
+        EventRelayManager eventRelayManager = new EventRelayManager();
+        eventRelayManager.start();
+        RunningRelayInstances.INSTANCE.addRelay(eventRelayManager.getRelayedTopic(), eventRelayManager);
+
+        return ok();
+    }
+
+    @POST
+    @Path("/relay/stop")
+    public Response debugStopRelayEventStream(String msg) throws SpRuntimeException {
+        // TODO implement
+
+        System.out.println(msg);
+        EventRelayManager eventRelayManager = RunningRelayInstances.INSTANCE.get("org.apache.streampipes.flowrate01");
+        assert eventRelayManager != null;
+        eventRelayManager.stop();
+
+        return ok();
+    }
+}
diff --git a/streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/api/NodeResource.java b/streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/rest/HealthCheckResource.java
similarity index 75%
rename from streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/api/NodeResource.java
rename to streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/rest/HealthCheckResource.java
index 06759b3..e4dddf4 100644
--- a/streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/api/NodeResource.java
+++ b/streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/rest/HealthCheckResource.java
@@ -1,4 +1,3 @@
-package org.apache.streampipes.node.controller.container.api;
 /*
  * Licensed to the Apache Software Foundation (ASF) under one or more
  * contributor license agreements.  See the NOTICE file distributed with
@@ -16,6 +15,9 @@ package org.apache.streampipes.node.controller.container.api;
  * limitations under the License.
  *
  */
+package org.apache.streampipes.node.controller.container.rest;
+
+import org.apache.streampipes.node.controller.container.config.NodeControllerConfig;
 
 import javax.ws.rs.GET;
 import javax.ws.rs.Path;
@@ -24,14 +26,11 @@ import javax.ws.rs.core.MediaType;
 import javax.ws.rs.core.Response;
 
 @Path("/")
-public class NodeResource {
+public class HealthCheckResource extends AbstractNodeContainerResource{
 
     @GET
     @Produces(MediaType.APPLICATION_JSON)
     public Response getHealth() {
-        return Response
-                .ok()
-                .status(Response.Status.OK)
-                .build();
+        return ok(String.format("hello from node controller: %s", NodeControllerConfig.INSTANCE.getNodeControllerId()));
     }
 }
diff --git a/streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/api/NodeInfoStatusResource.java b/streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/rest/InfoStatusResource.java
similarity index 77%
rename from streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/api/NodeInfoStatusResource.java
rename to streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/rest/InfoStatusResource.java
index 628ef09..40b3812 100644
--- a/streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/api/NodeInfoStatusResource.java
+++ b/streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/rest/InfoStatusResource.java
@@ -1,4 +1,4 @@
-package org.apache.streampipes.node.controller.container.api;
+package org.apache.streampipes.node.controller.container.rest;
 /*
  * Licensed to the Apache Software Foundation (ASF) under one or more
  * contributor license agreements.  See the NOTICE file distributed with
@@ -26,25 +26,19 @@ import javax.ws.rs.core.MediaType;
 import javax.ws.rs.core.Response;
 
 @Path("/node")
-public class NodeInfoStatusResource {
+public class InfoStatusResource extends AbstractNodeContainerResource{
 
     @GET
     @Path("/info")
     @Produces(MediaType.APPLICATION_JSON)
     public Response getInfo() {
-        return Response
-                .ok()
-                .entity(NodeInfoStorage.getInstance().retrieveNodeInfo())
-                .build();
+        return ok(NodeInfoStorage.getInstance().retrieveNodeInfo());
     }
 
     @GET
     @Path("/status")
     @Produces(MediaType.APPLICATION_JSON)
     public Response getStatus() {
-        return Response
-                .ok()
-                .entity(ResourceManager.getInstance().retrieveNodeResources())
-                .build();
+        return ok(ResourceManager.getInstance().retrieveNodeResources());
     }
 }
diff --git a/streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/init/NodeControllerContainerResourceConfig.java b/streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/rest/NodeControllerResourceConfig.java
similarity index 61%
rename from streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/init/NodeControllerContainerResourceConfig.java
rename to streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/rest/NodeControllerResourceConfig.java
index 2704c0a..68930be 100644
--- a/streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/init/NodeControllerContainerResourceConfig.java
+++ b/streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/rest/NodeControllerResourceConfig.java
@@ -1,4 +1,4 @@
-package org.apache.streampipes.node.controller.container.init;
+package org.apache.streampipes.node.controller.container.rest;
 /*
  * Licensed to the Apache Software Foundation (ASF) under one or more
  * contributor license agreements.  See the NOTICE file distributed with
@@ -17,18 +17,18 @@ package org.apache.streampipes.node.controller.container.init;
  *
  */
 
-import org.apache.streampipes.node.controller.container.api.NodeControllerResource;
-import org.apache.streampipes.node.controller.container.api.NodeInfoStatusResource;
-import org.apache.streampipes.node.controller.container.api.NodeResource;
 import org.glassfish.jersey.server.ResourceConfig;
 import org.springframework.stereotype.Component;
 
 @Component
-public class NodeControllerContainerResourceConfig extends ResourceConfig {
+public class NodeControllerResourceConfig extends ResourceConfig {
 
-    public NodeControllerContainerResourceConfig() {
-        register(NodeResource.class);
-        register(NodeInfoStatusResource.class);
-        register(NodeControllerResource.class);
+    public NodeControllerResourceConfig() {
+        register(HealthCheckResource.class);
+        register(InfoStatusResource.class);
+        register(PELifeCycleResource.class);
+
+        // TODO remove later - only for local relay tests
+        register(DebugRelayResource.class);
     }
 }
diff --git a/streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/api/NodeControllerResource.java b/streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/rest/PELifeCycleResource.java
similarity index 73%
rename from streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/api/NodeControllerResource.java
rename to streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/rest/PELifeCycleResource.java
index fb77e4b..091331b 100644
--- a/streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/api/NodeControllerResource.java
+++ b/streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/rest/PELifeCycleResource.java
@@ -1,4 +1,4 @@
-package org.apache.streampipes.node.controller.container.api;/*
+package org.apache.streampipes.node.controller.container.rest;/*
  * Licensed to the Apache Software Foundation (ASF) under one or more
  * contributor license agreements.  See the NOTICE file distributed with
  * this work for additional information regarding copyright ownership.
@@ -17,8 +17,6 @@ package org.apache.streampipes.node.controller.container.api;/*
  */
 
 import org.apache.streampipes.commons.exceptions.SpRuntimeException;
-import org.apache.streampipes.container.declarer.InvocableDeclarer;
-import org.apache.streampipes.container.init.RunningInstances;
 import org.apache.streampipes.container.transform.Transformer;
 import org.apache.streampipes.container.util.Util;
 import org.apache.streampipes.model.base.InvocableStreamPipesEntity;
@@ -29,7 +27,6 @@ import org.apache.streampipes.node.controller.container.management.container.Doc
 import org.apache.streampipes.node.controller.container.management.pe.PipelineElementManager;
 import org.apache.streampipes.node.controller.container.management.relay.EventRelayManager;
 import org.apache.streampipes.node.controller.container.management.relay.RunningRelayInstances;
-import org.eclipse.paho.client.mqttv3.MqttException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -39,33 +36,41 @@ import javax.ws.rs.core.Response;
 import java.io.IOException;
 
 @Path("/node/container")
-public class NodeControllerResource<I extends InvocableStreamPipesEntity> {
+public class PELifeCycleResource<I extends InvocableStreamPipesEntity> extends AbstractNodeContainerResource{
     private static final Logger LOG =
-            LoggerFactory.getLogger(NodeControllerResource.class.getCanonicalName());
-
+            LoggerFactory.getLogger(PELifeCycleResource.class.getCanonicalName());
 
     private static final String COLON = ":";
 
+    /**
+     *
+     * @return a list of currently running Docker Containers
+     */
     @GET
     @Produces(MediaType.APPLICATION_JSON)
     public Response getPipelineElementContainer(){
-        return Response
-                .ok()
-                .entity(DockerOrchestratorManager.getInstance().list())
-                .build();
+        return ok(DockerOrchestratorManager.getInstance().list());
     }
 
+    /**
+     * Deploys a new Docker Container
+     *
+     * @param container to be deployed
+     * @return deployment status
+     */
     @POST
     @Path("/deploy")
     @Consumes(MediaType.APPLICATION_JSON)
     @Produces(MediaType.APPLICATION_JSON)
     public Response deployPipelineElementContainer(PipelineElementDockerContainer container) {
-        return Response
-                .ok()
-                .entity(DockerOrchestratorManager.getInstance().deploy(container))
-                .build();
+        return ok(DockerOrchestratorManager.getInstance().deploy(container));
     }
 
+    /**
+     * Register pipeline elements in consul
+     * @param message
+     * @return
+     */
     @POST
     @Path("/register")
     @Consumes(MediaType.APPLICATION_JSON)
@@ -81,10 +86,7 @@ public class NodeControllerResource<I extends InvocableStreamPipesEntity> {
 //                .build();
 //        client.execute(request);
 
-        return Response
-                .ok()
-                .status(Response.Status.OK)
-                .build();
+        return ok();
     }
 
     @POST
@@ -117,9 +119,7 @@ public class NodeControllerResource<I extends InvocableStreamPipesEntity> {
         } catch (IOException e) {
             e.printStackTrace();
         }
-        return Response
-                .ok()
-                .build();
+        return ok();
     }
 
     // TODO move endpoint to /elementId/instances/runningInstanceId
@@ -144,9 +144,7 @@ public class NodeControllerResource<I extends InvocableStreamPipesEntity> {
         EventRelayManager relay = RunningRelayInstances.INSTANCE.removeRelay(appId);
         relay.stop();
 
-        return Response
-                .ok()
-                .build();
+        return ok();
     }
 
     @DELETE
@@ -154,38 +152,7 @@ public class NodeControllerResource<I extends InvocableStreamPipesEntity> {
     @Consumes(MediaType.APPLICATION_JSON)
     @Produces(MediaType.APPLICATION_JSON)
     public Response removePipelineElementContainer(PipelineElementDockerContainer container) {
-        return Response
-                .ok(DockerOrchestratorManager.getInstance().remove(container))
-                .build();
-    }
-
-    // TODO: Debug-only.
-    @POST
-    @Path("/relay/start")
-    public Response debugRelayEventStream(String msg) throws SpRuntimeException {
-        // TODO implement
-
-        System.out.println(msg);
-        EventRelayManager eventRelayManager = new EventRelayManager();
-        eventRelayManager.start();
-        RunningRelayInstances.INSTANCE.addRelay(eventRelayManager.getRelayedTopic(), eventRelayManager);
-
-        return Response
-                .ok()
-                .build();
+        return ok(DockerOrchestratorManager.getInstance().remove(container));
     }
 
-    @POST
-    @Path("/relay/stop")
-    public Response debugStopRelayEventStream(String msg) throws SpRuntimeException {
-        // TODO implement
-
-        System.out.println(msg);
-        EventRelayManager eventRelayManager = RunningRelayInstances.INSTANCE.get("org.apache.streampipes.flowrate01");
-        eventRelayManager.stop();
-
-        return Response
-                .ok()
-                .build();
-    }
 }
diff --git a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/http/InvocableEntityUrlGenerator.java b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/http/InvocableEntityUrlGenerator.java
index 2999aca..94bd5cb 100644
--- a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/http/InvocableEntityUrlGenerator.java
+++ b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/http/InvocableEntityUrlGenerator.java
@@ -93,8 +93,8 @@ public class InvocableEntityUrlGenerator extends EndpointUrlGenerator<InvocableS
                         + pipelineElement.getDeploymentTargetNodeId()
                         + SLASH;
 
-                String host = ConsulUtil.getElementEndpointHostname(route + PE_HOST_KEY);
-                int port = ConsulUtil.getElementEndpointPort(route + PE_PORT_KEY);
+                String host = ConsulUtil.getStringValue(route + PE_HOST_KEY);
+                int port = ConsulUtil.getIntValue(route + PE_PORT_KEY);
 
                 // Necessary because secondary pipeline element description is not stored in backend
                 // It uses information from primary pipeline element. Node controller will locally forward
diff --git a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/matching/InvocationGraphBuilder.java b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/matching/InvocationGraphBuilder.java
index 3f11c1a..80c9acd 100644
--- a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/matching/InvocationGraphBuilder.java
+++ b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/matching/InvocationGraphBuilder.java
@@ -19,6 +19,7 @@
 package org.apache.streampipes.manager.matching;
 
 import org.apache.streampipes.config.backend.BackendConfig;
+import org.apache.streampipes.container.util.ConsulUtil;
 import org.apache.streampipes.manager.data.PipelineGraph;
 import org.apache.streampipes.manager.data.PipelineGraphHelpers;
 import org.apache.streampipes.manager.matching.output.OutputSchemaFactory;
@@ -27,7 +28,7 @@ import org.apache.streampipes.model.SpDataStream;
 import org.apache.streampipes.model.base.InvocableStreamPipesEntity;
 import org.apache.streampipes.model.base.NamedStreamPipesEntity;
 import org.apache.streampipes.model.graph.DataProcessorInvocation;
-import org.apache.streampipes.model.grounding.EventGrounding;
+import org.apache.streampipes.model.grounding.*;
 import org.apache.streampipes.model.monitoring.ElementStatusInfoSettings;
 import org.apache.streampipes.model.output.OutputStrategy;
 import org.apache.streampipes.model.schema.EventSchema;
@@ -41,75 +42,97 @@ import java.util.stream.Collectors;
 
 public class InvocationGraphBuilder {
 
-  private PipelineGraph pipelineGraph;
-  private String pipelineId;
+  private final PipelineGraph pipelineGraph;
+  private final String pipelineId;
   private Integer uniquePeIndex = 0;
-
-  private List<InvocableStreamPipesEntity> graphs;
+  private final List<InvocableStreamPipesEntity> graphs;
 
   public InvocationGraphBuilder(PipelineGraph pipelineGraph, String pipelineId) {
     this.graphs = new ArrayList<>();
     this.pipelineGraph = pipelineGraph;
     this.pipelineId = pipelineId;
-
   }
 
   public List<InvocableStreamPipesEntity> buildGraphs() {
 
-    List<SpDataStream> streams = PipelineGraphHelpers.findStreams(pipelineGraph);
-
-    for (SpDataStream stream : streams) {
-      Set<InvocableStreamPipesEntity> connectedElements = getConnections(stream);
-      configure(stream, connectedElements);
-    }
+    PipelineGraphHelpers
+            .findStreams(pipelineGraph)
+            .forEach(stream -> configure(stream, getConnections(stream)));
 
     return graphs;
   }
 
   private void configure(NamedStreamPipesEntity source, Set<InvocableStreamPipesEntity> targets) {
 
-    EventGrounding inputGrounding = new GroundingBuilder(source, targets)
-            .getEventGrounding();
+    EventGrounding inputGrounding = new GroundingBuilder(source, targets).getEventGrounding();
 
+    // set output stream event grounding for source data processors
     if (source instanceof InvocableStreamPipesEntity) {
       if (source instanceof DataProcessorInvocation && ((DataProcessorInvocation) source).isConfigured()) {
 
-        DataProcessorInvocation dataProcessorInvocation = (DataProcessorInvocation) source;
-        Tuple2<EventSchema, ? extends OutputStrategy> outputSettings;
-        OutputSchemaGenerator<?> schemaGenerator = new OutputSchemaFactory(dataProcessorInvocation)
-                .getOuputSchemaGenerator();
-
-        if (((DataProcessorInvocation) source).getInputStreams().size() == 1) {
-          outputSettings = schemaGenerator.buildFromOneStream(dataProcessorInvocation
-                  .getInputStreams()
-                  .get(0));
-        } else if (graphExists(dataProcessorInvocation.getDOM())) {
-          DataProcessorInvocation existingInvocation = (DataProcessorInvocation) find(dataProcessorInvocation.getDOM());
-
-          outputSettings = schemaGenerator.buildFromTwoStreams(existingInvocation
-                  .getInputStreams().get(0), dataProcessorInvocation.getInputStreams().get(1));
-          graphs.remove(existingInvocation);
-        } else {
-          outputSettings = new Tuple2<>(new EventSchema(), dataProcessorInvocation
-                  .getOutputStrategies().get(0));
-        }
+        DataProcessorInvocation sourceInvokation = (DataProcessorInvocation) source;
 
-        SpDataStream outputStream = new SpDataStream();
-        outputStream.setEventGrounding(inputGrounding);
-        dataProcessorInvocation.setOutputStrategies(Collections.singletonList(outputSettings.b));
-        outputStream.setEventSchema(outputSettings.a);
-        ((DataProcessorInvocation) source).setOutputStream(outputStream);
+        Tuple2<EventSchema, ? extends OutputStrategy> outputSettings = getOutputSettings(sourceInvokation);
+        sourceInvokation.setOutputStrategies(Collections.singletonList(outputSettings.b));
+        sourceInvokation.setOutputStream(makeOutputStream(inputGrounding, outputSettings));
       }
-
       if (!graphExists(source.getDOM())) {
         graphs.add((InvocableStreamPipesEntity) source);
       }
     }
 
+    // set input stream event grounding for target element data processors and sinks
     targets.forEach(t -> {
-      t.getInputStreams()
-              .get(getIndex(source.getDOM(), t))
-              .setEventGrounding(inputGrounding);
+      // check if source and target share same node
+      if (source instanceof InvocableStreamPipesEntity) {
+        if (((InvocableStreamPipesEntity) source).getDeploymentTargetNodeId() != null ||
+                t.getDeploymentTargetNodeId() != null) {
+
+          if (matchingDeploymentTarget((InvocableStreamPipesEntity) source, t)) {
+            // shared grounding
+            // TODO: set event relay to false
+            t.getInputStreams()
+                    .get(getIndex(source.getDOM(), t))
+                    .setEventGrounding(inputGrounding);
+
+          } else {
+            // check if target runs on cloud or edge node
+            if (t.getDeploymentTargetNodeId().equals("default")) {
+
+              // target runs on cloud node: use central cloud broker, e.g. kafka
+              // TODO: set event relay to true
+              // TODO: add cloud broker to List<EventRelays>
+              t.getInputStreams()
+                      .get(getIndex(source.getDOM(), t))
+                      .setEventGrounding(inputGrounding);
+
+            } else {
+              // target runs on edge node: use target edge node broker
+              // TODO: set event relay to true
+              // TODO: add target edge node broker to List<EventRelays>
+
+              String broker = getEdgeBroker(t);
+
+              t.getInputStreams()
+                      .get(getIndex(source.getDOM(), t))
+                      .setEventGrounding(inputGrounding);
+            }
+          }
+        } else {
+          t.getInputStreams()
+                  .get(getIndex(source.getDOM(), t))
+                  .setEventGrounding(inputGrounding);
+        }
+      } else {
+        t.getInputStreams()
+                .get(getIndex(source.getDOM(), t))
+                .setEventGrounding(inputGrounding);
+      }
+
+      // old
+//      t.getInputStreams()
+//              .get(getIndex(source.getDOM(), t))
+//              .setEventGrounding(inputGrounding);
 
       t.getInputStreams()
               .get(getIndex(source.getDOM(), t))
@@ -128,7 +151,56 @@ public class InvocationGraphBuilder {
       configure(t, getConnections(t));
 
     });
+  }
 
+  private Tuple2<EventSchema,? extends OutputStrategy> getOutputSettings(DataProcessorInvocation dataProcessorInvocation) {
+    Tuple2<EventSchema,? extends OutputStrategy> outputSettings;
+    OutputSchemaGenerator<?> schemaGenerator = new OutputSchemaFactory(dataProcessorInvocation)
+            .getOuputSchemaGenerator();
+
+    if (dataProcessorInvocation.getInputStreams().size() == 1) {
+      outputSettings = schemaGenerator
+              .buildFromOneStream(dataProcessorInvocation
+                      .getInputStreams()
+                      .get(0));
+    } else if (graphExists(dataProcessorInvocation.getDOM())) {
+      DataProcessorInvocation existingInvocation = (DataProcessorInvocation) find(dataProcessorInvocation.getDOM());
+      outputSettings = schemaGenerator
+              .buildFromTwoStreams(
+                      existingInvocation.getInputStreams().get(0),
+                      dataProcessorInvocation.getInputStreams().get(1));
+      graphs.remove(existingInvocation);
+    } else {
+      outputSettings = new Tuple2<>(new EventSchema(), dataProcessorInvocation.getOutputStrategies().get(0));
+    }
+    return outputSettings;
+  }
+
+  private SpDataStream makeOutputStream(EventGrounding inputGrounding,
+                                        Tuple2<EventSchema,? extends OutputStrategy> outputSettings) {
+    SpDataStream outputStream = new SpDataStream();
+    outputStream.setEventGrounding(inputGrounding);
+    outputStream.setEventSchema(outputSettings.a);
+    return outputStream;
+  }
+
+  private String getEdgeBroker(InvocableStreamPipesEntity target) {
+    return ConsulUtil.getStringValue(
+            "sp/v1/node/org.apache.streampipes.node.controller/"
+                    + target.getDeploymentTargetNodeId()
+                    + "/config/SP_NODE_BROKER_HOST");
+  }
+
+
+  private boolean matchingDeploymentTarget(InvocableStreamPipesEntity source, InvocableStreamPipesEntity target) {
+    if (source instanceof DataProcessorInvocation && target instanceof DataProcessorInvocation) {
+      if (source.getDeploymentTargetNodeId().equals(target.getDeploymentTargetNodeId())) {
+        System.out.println("same node - no relay");
+        return true;
+      }
+      return false;
+    }
+    return false;
   }
 
   private ElementStatusInfoSettings makeStatusInfoSettings(String elementIdentifier) {
@@ -165,13 +237,11 @@ public class InvocationGraphBuilder {
   }
 
   private Set<InvocableStreamPipesEntity> getConnections(NamedStreamPipesEntity source) {
-    Set<String> outgoingEdges = pipelineGraph.outgoingEdgesOf(source);
-    return outgoingEdges
+    return pipelineGraph.outgoingEdgesOf(source)
             .stream()
             .map(o -> pipelineGraph.getEdgeTarget(o))
             .map(g -> (InvocableStreamPipesEntity) g)
             .collect(Collectors.toSet());
-
   }
 
   private Integer getIndex(String sourceDomId, InvocableStreamPipesEntity targetElement) {
@@ -191,5 +261,4 @@ public class InvocationGraphBuilder {
             .findFirst()
             .get();
   }
-
 }
\ No newline at end of file
diff --git a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/matching/ProtocolSelector.java b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/matching/ProtocolSelector.java
index ec3a891..f38212e 100644
--- a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/matching/ProtocolSelector.java
+++ b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/matching/ProtocolSelector.java
@@ -34,14 +34,15 @@ import java.util.Set;
 
 public class ProtocolSelector extends GroundingSelector {
 
-    private String outputTopic;
-    private List<SpProtocol> prioritizedProtocols;
+    private final String outputTopic;
+    private final List<SpProtocol> prioritizedProtocols;
 
     public ProtocolSelector(NamedStreamPipesEntity source, Set<InvocableStreamPipesEntity> targets) {
         super(source, targets);
         this.outputTopic = TopicGenerator.generateRandomTopic();
-        this.prioritizedProtocols =
-                BackendConfig.INSTANCE.getMessagingSettings().getPrioritizedProtocols();
+        this.prioritizedProtocols = BackendConfig.INSTANCE
+                .getMessagingSettings()
+                .getPrioritizedProtocols();
     }
 
     public TransportProtocol getPreferredProtocol() {
@@ -50,36 +51,34 @@ public class ProtocolSelector extends GroundingSelector {
                     .getEventGrounding()
                     .getTransportProtocol();
         } else {
-            for(SpProtocol prioritizedProtocol: prioritizedProtocols) {
-                if (prioritizedProtocol.getProtocolClass().equals(KafkaTransportProtocol.class.getCanonicalName()) &&
-                        supportsProtocol(KafkaTransportProtocol.class)) {
-                    return kafkaTopic();
-                }
-                else if (prioritizedProtocol.getProtocolClass().equals(JmsTransportProtocol.class.getCanonicalName()) &&
-                        supportsProtocol(JmsTransportProtocol.class)) {
-                    return jmsTopic();
-                } else if (prioritizedProtocol.getProtocolClass().equals(MqttTransportProtocol.class.getCanonicalName()) &&
-                        supportsProtocol(MqttTransportProtocol.class)) {
-                    return mqttTopic();
+            for(SpProtocol p: prioritizedProtocols) {
+                if (matches(p, KafkaTransportProtocol.class) && supportsProtocol(KafkaTransportProtocol.class)) {
+                    return kafkaTransportProtocol();
+                } else if (matches(p, JmsTransportProtocol.class) && supportsProtocol(JmsTransportProtocol.class)) {
+                    return jmsTransportProtocol();
+                } else if (matches(p, MqttTransportProtocol.class) && supportsProtocol(MqttTransportProtocol.class)) {
+                    return mqttTransportProtocol();
+                } else {
+                    throw new IllegalArgumentException("Transport protocol not found: " + p.getProtocolClass());
                 }
             }
         }
-        return kafkaTopic();
+        throw new IllegalArgumentException("Could not get preferred transport protocol");
     }
 
-    private TransportProtocol mqttTopic() {
+    private TransportProtocol mqttTransportProtocol() {
         return new MqttTransportProtocol(BackendConfig.INSTANCE.getMqttHost(),
                 BackendConfig.INSTANCE.getMqttPort(),
                 outputTopic);
     }
 
-    private TransportProtocol jmsTopic() {
+    private TransportProtocol jmsTransportProtocol() {
         return new JmsTransportProtocol(BackendConfig.INSTANCE.getJmsHost(),
                 BackendConfig.INSTANCE.getJmsPort(),
                 outputTopic);
     }
 
-    private TransportProtocol kafkaTopic() {
+    private TransportProtocol kafkaTransportProtocol() {
         return new KafkaTransportProtocol(BackendConfig.INSTANCE.getKafkaHost(),
                 BackendConfig.INSTANCE.getKafkaPort(),
                 outputTopic,
@@ -87,10 +86,12 @@ public class ProtocolSelector extends GroundingSelector {
                 BackendConfig.INSTANCE.getZookeeperPort());
     }
 
+    private <T extends TransportProtocol> boolean matches(SpProtocol p, Class<T> clazz) {
+        return p.getProtocolClass().equals(clazz.getCanonicalName());
+    }
 
-    public <T extends TransportProtocol> boolean supportsProtocol(Class<T> protocol) {
+    private <T extends TransportProtocol> boolean supportsProtocol(Class<T> protocol) {
         List<InvocableStreamPipesEntity> elements = buildInvocables();
-
         return elements
                 .stream()
                 .allMatch(e -> e
@@ -98,6 +99,5 @@ public class ProtocolSelector extends GroundingSelector {
                         .getTransportProtocols()
                         .stream()
                         .anyMatch(protocol::isInstance));
-
     }
 }
diff --git a/streampipes-vocabulary/src/main/java/org/apache/streampipes/vocabulary/StreamPipes.java b/streampipes-vocabulary/src/main/java/org/apache/streampipes/vocabulary/StreamPipes.java
index 9a8d673..8897ffa 100644
--- a/streampipes-vocabulary/src/main/java/org/apache/streampipes/vocabulary/StreamPipes.java
+++ b/streampipes-vocabulary/src/main/java/org/apache/streampipes/vocabulary/StreamPipes.java
@@ -418,4 +418,5 @@ public class StreamPipes {
   public static final String PE_CONFIGURED = NS + "isPeConfigured" ;
 
   public static final String HAS_REQUIRED_FILETYPES = NS + "hasRequiredFiletypes" ;
+  public static final String HAS_EVENT_STREAM_RELAYS = NS + "hasEventStreamRelays";
 }
diff --git a/ui/src/app/editor/dialog/save-pipeline/save-pipeline.component.html b/ui/src/app/editor/dialog/save-pipeline/save-pipeline.component.html
index f8c2199..fb900b7 100644
--- a/ui/src/app/editor/dialog/save-pipeline/save-pipeline.component.html
+++ b/ui/src/app/editor/dialog/save-pipeline/save-pipeline.component.html
@@ -49,7 +49,18 @@
                 <mat-slide-toggle color="primary" [(ngModel)]="advancedSettings">
                     Choose deployment options
                 </mat-slide-toggle>
-                <mat-divider style="margin: 2em 0 2em 0;"></mat-divider>
+                <mat-divider *ngIf="advancedSettings" style="margin: 2em 0 2em 0;"></mat-divider>
+<!--                <div *ngIf="advancedSettings">-->
+<!--                    <div fxFlex="100" fxLayout="row">-->
+<!--                        <div fxFlex="50" fxLayout="row" fxLayoutAlign="center center">-->
+<!--                            Event strategy-->
+<!--                        </div>-->
+<!--                    </div>-->
+<!--                    <div fxFlex="50" fxLayout="row" fxLayoutAlign="start center">-->
+<!--                        <mat-slider></mat-slider>-->
+<!--                    </div>-->
+<!--                </div>-->
+                <mat-divider *ngIf="advancedSettings" style="margin: 2em 0 2em 0;"></mat-divider>
                 <div *ngIf="advancedSettings">
                     <b>Node selection</b>
                     <div *ngFor="let processors of pipeline.sepas">
diff --git a/ui/src/app/editor/dialog/save-pipeline/save-pipeline.component.scss b/ui/src/app/editor/dialog/save-pipeline/save-pipeline.component.scss
index c5dce8d..2f8c6bb 100644
--- a/ui/src/app/editor/dialog/save-pipeline/save-pipeline.component.scss
+++ b/ui/src/app/editor/dialog/save-pipeline/save-pipeline.component.scss
@@ -43,4 +43,8 @@
 
 .status-subtext {
   font-size: 12pt;
+}
+
+mat-slider {
+  width: 300px;
 }
\ No newline at end of file
diff --git a/ui/src/app/editor/editor.module.ts b/ui/src/app/editor/editor.module.ts
index db54354..d44fc86 100644
--- a/ui/src/app/editor/editor.module.ts
+++ b/ui/src/app/editor/editor.module.ts
@@ -59,6 +59,7 @@ import {CustomOutputStrategyComponent} from "./components/output-strategy/custom
 import {PropertySelectionComponent} from "./components/output-strategy/property-selection/property-selection.component";
 import {UserDefinedOutputStrategyComponent} from "./components/output-strategy/user-defined-output/user-defined-output.component";
 import {ConnectModule} from "../connect/connect.module";
+import {MatSliderModule} from "@angular/material/slider";
 
 @NgModule({
     imports: [
@@ -75,7 +76,8 @@ import {ConnectModule} from "../connect/connect.module";
         FormsModule,
         MatProgressSpinnerModule,
         ShowdownModule,
-        ReactiveFormsModule
+        ReactiveFormsModule,
+        MatSliderModule
     ],
     declarations: [
         CompatibleElementsComponent,