You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@streampipes.apache.org by wi...@apache.org on 2021/10/04 20:42:25 UTC

[incubator-streampipes] branch edge-extensions updated (8f040a3 -> 997d4a7)

This is an automated email from the ASF dual-hosted git repository.

wiener pushed a change to branch edge-extensions
in repository https://gitbox.apache.org/repos/asf/incubator-streampipes.git.


    from 8f040a3  add support for reconfigurable CondeInputStaticProperty
     new 87a23b1  edit node add details
     new 997d4a7  [hotfix] set broker according to deployment target

The 2 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 docker-save.sh                                     |  11 +-
 streampipes-connect/pom.xml                        |  16 +--
 .../streampipes/connect/adapter/Adapter.java       |  75 +++++++++-----
 .../connect/adapter/GroundingService.java          | 113 +++++++++++++--------
 .../api/DataSinkPipelineElementResource.java       |   2 +-
 streampipes-node-controller/arm.Dockerfile         |   6 +-
 .../node/controller/config/EnvConfigParam.java     |   1 +
 .../node/controller/config/NodeConfiguration.java  |  13 +++
 .../management/NodeControllerSubmitter.java        |   2 +-
 .../docker/DockerContainerDeclarerSingleton.java   |  11 +-
 .../relay/bridges/MultiBrokerBridge.java           |   6 +-
 .../management/resource/utils/FileSystemType.java  |   3 +-
 .../management/resource/utils/ResourceUtils.java   |   3 +
 .../statscollector/DockerStatsCollector.java       |  50 ++++++++-
 .../statscollector/DockerStatsUtils.java           |  15 +++
 .../manager/matching/InvocationGraphBuilder.java   |  31 ++++--
 .../manager/matching/ProtocolSelector.java         |  40 ++++++--
 .../node-add-details.component.html                |  21 ++++
 .../node-add-details.component.scss                |  10 ++
 .../node-add-details/node-add-details.component.ts |  23 ++++-
 .../node-configuration-details.component.ts        |   1 +
 .../save-pipeline/save-pipeline.component.ts       |  22 ++--
 22 files changed, 352 insertions(+), 123 deletions(-)

[incubator-streampipes] 01/02: edit node add details

Posted by wi...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

wiener pushed a commit to branch edge-extensions
in repository https://gitbox.apache.org/repos/asf/incubator-streampipes.git

commit 87a23b1244357d0096a419ad64200fe5cc9bf4eb
Author: Patrick Wiener <wi...@fzi.de>
AuthorDate: Mon Sep 13 16:11:22 2021 +0200

    edit node add details
---
 .../node-add-details.component.html                | 21 ++++++++++++++++++++
 .../node-add-details.component.scss                | 10 ++++++++++
 .../node-add-details/node-add-details.component.ts | 23 +++++++++++++++++-----
 3 files changed, 49 insertions(+), 5 deletions(-)

diff --git a/ui/src/app/configuration/node-configuration/node-add-details/node-add-details.component.html b/ui/src/app/configuration/node-configuration/node-add-details/node-add-details.component.html
index c05a54d..5ea8afa 100644
--- a/ui/src/app/configuration/node-configuration/node-add-details/node-add-details.component.html
+++ b/ui/src/app/configuration/node-configuration/node-add-details/node-add-details.component.html
@@ -36,6 +36,27 @@
                         <mat-label>API token</mat-label>
                         <input [formControlName]="'token'" matInput required>
                     </mat-form-field>
+
+                    <label id="example-radio-group-label">Pick associated resource layer</label>
+                    <mat-radio-group
+                            aria-labelledby="example-radio-group-label"
+                            class="resource-layer-radio-group"
+                            [(ngModel)]="associatedResourceLayer">
+                        <mat-radio-button class="resource-layer-radio-button"
+                                          [required]="true"
+                                          *ngFor="let layer of resourceLayers"
+                                          [value]="layer">{{layer}}
+                        </mat-radio-button>
+                    </mat-radio-group>
+
+                    <mat-form-field>
+                        <mat-label>Latitude</mat-label>
+                        <input [formControlName]="'latitude'" matInput required>
+                    </mat-form-field>
+                    <mat-form-field>
+                        <mat-label>Longitude</mat-label>
+                        <input [formControlName]="'longitude'" matInput required>
+                    </mat-form-field>
                 </div>
             </form>
 
diff --git a/ui/src/app/configuration/node-configuration/node-add-details/node-add-details.component.scss b/ui/src/app/configuration/node-configuration/node-add-details/node-add-details.component.scss
index 77add90..fbebfc4 100644
--- a/ui/src/app/configuration/node-configuration/node-add-details/node-add-details.component.scss
+++ b/ui/src/app/configuration/node-configuration/node-add-details/node-add-details.component.scss
@@ -124,3 +124,13 @@ mat-slider {
   overflow-x: auto;
   white-space: nowrap;
 }
+
+.resource-layer-radio-group {
+  display: flex;
+  flex-direction: row;
+  margin: 15px 0;
+}
+
+.resource-layer-radio-button {
+  margin: 5px;
+}
diff --git a/ui/src/app/configuration/node-configuration/node-add-details/node-add-details.component.ts b/ui/src/app/configuration/node-configuration/node-add-details/node-add-details.component.ts
index b41bd7b..cad66dd 100644
--- a/ui/src/app/configuration/node-configuration/node-add-details/node-add-details.component.ts
+++ b/ui/src/app/configuration/node-configuration/node-add-details/node-add-details.component.ts
@@ -55,10 +55,16 @@ export class NodeAddDetailsComponent implements OnInit {
   backend: string = '';
   nodeIp: string = '';
   apiToken: string = '';
+  latitude: number;
+  longitude: number;
   storagePath: string = '/var/lib/streampipes';
   resourceUpdateFreq: string = '30';
   dockerNodePruningFreq: string = '3600';
   eventBufferSize: string = '1000';
+  associatedResourceLayer: string;
+  resourceLayers: string[] = ['edge', 'fog', 'cloud'];
+  nodeType: string;
+  nodeTypes: string[] = ['virtual','phyiscal'];
 
   dockerRunCommand: string;
 
@@ -75,31 +81,38 @@ export class NodeAddDetailsComponent implements OnInit {
   ngOnInit(): void {
     this.dockerCommandCreated = false;
     this.advancedSettings = false;
+    this.associatedResourceLayer = 'edge';
     this.tmpTags = []
     this.getVersion();
 
     this.nodeMetadataFormGroup.addControl("backendHost", new FormControl(this.backend,
         [Validators.required,
           Validators.maxLength(40)]))
-
     this.nodeMetadataFormGroup.addControl("nodeHost", new FormControl(this.nodeIp,
         [Validators.required,
-          Validators.maxLength(15)]))
+          Validators.maxLength(40)]))
     this.nodeMetadataFormGroup.addControl("token", new FormControl(this.apiToken,
         [Validators.maxLength(80)]))
-
+    this.nodeMetadataFormGroup.addControl("latitude", new FormControl(this.latitude,
+        [Validators.required]))
+    this.nodeMetadataFormGroup.addControl("longitude", new FormControl(this.longitude,
+        [Validators.required]))
 
     this.nodeMetadataFormGroup.controls["backendHost"].valueChanges.subscribe(value => {
       this.backend = value;
     });
-
     this.nodeMetadataFormGroup.controls["nodeHost"].valueChanges.subscribe(value => {
       this.nodeIp = value;
     });
-
     this.nodeMetadataFormGroup.controls["token"].valueChanges.subscribe(value => {
       this.apiToken = value;
     });
+    this.nodeMetadataFormGroup.controls["latitude"].valueChanges.subscribe(value => {
+      this.latitude = value;
+    });
+    this.nodeMetadataFormGroup.controls["longitude"].valueChanges.subscribe(value => {
+      this.longitude = value;
+    });
 
     this.firstFormGroup = this._formBuilder.group({
       firstCtrl: ['', Validators.required]

[incubator-streampipes] 02/02: [hotfix] set broker according to deployment target

Posted by wi...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

wiener pushed a commit to branch edge-extensions
in repository https://gitbox.apache.org/repos/asf/incubator-streampipes.git

commit 997d4a77b5c53ef8443d31c9e7057b9aa1e37851
Author: Patrick Wiener <wi...@fzi.de>
AuthorDate: Mon Oct 4 22:41:50 2021 +0200

    [hotfix] set broker according to deployment target
---
 docker-save.sh                                     |  11 +-
 streampipes-connect/pom.xml                        |  16 +--
 .../streampipes/connect/adapter/Adapter.java       |  75 +++++++++-----
 .../connect/adapter/GroundingService.java          | 113 +++++++++++++--------
 .../api/DataSinkPipelineElementResource.java       |   2 +-
 streampipes-node-controller/arm.Dockerfile         |   6 +-
 .../node/controller/config/EnvConfigParam.java     |   1 +
 .../node/controller/config/NodeConfiguration.java  |  13 +++
 .../management/NodeControllerSubmitter.java        |   2 +-
 .../docker/DockerContainerDeclarerSingleton.java   |  11 +-
 .../relay/bridges/MultiBrokerBridge.java           |   6 +-
 .../management/resource/utils/FileSystemType.java  |   3 +-
 .../management/resource/utils/ResourceUtils.java   |   3 +
 .../statscollector/DockerStatsCollector.java       |  50 ++++++++-
 .../statscollector/DockerStatsUtils.java           |  15 +++
 .../manager/matching/InvocationGraphBuilder.java   |  31 ++++--
 .../manager/matching/ProtocolSelector.java         |  40 ++++++--
 .../node-configuration-details.component.ts        |   1 +
 .../save-pipeline/save-pipeline.component.ts       |  22 ++--
 19 files changed, 303 insertions(+), 118 deletions(-)

diff --git a/docker-save.sh b/docker-save.sh
index ee6cb17..1ff219e 100755
--- a/docker-save.sh
+++ b/docker-save.sh
@@ -24,13 +24,13 @@ docker_bundled_edge_tar=bundled-edge-img.tar
 docker_bundled_edge_arm_tar=bundled-edge-img-armv7.tar
 docker_bundled_edge_aarch64_tar=bundled-edge-img-aarch64.tar
 docker_bundled_core_tar=bundled-core-img.tar
+docker_bundled_cloud_tar=bundled-cloud-img.tar
 
 docker_img_edge=(
 $repo/node-controller:$version \
 $repo/extensions-all-jvm:$version \
 eclipse-mosquitto:1.6.12 )
 
-
 docker_img_edge_arm=(
 $repo/node-controller:$version-armv7 \
 $repo/extensions-all-jvm:$version-armv7 )
@@ -50,6 +50,11 @@ fogsyio/kafka:2.2.0 \
 fogsyio/zookeeper:3.4.13 \
 fogsyio/influxdb:1.7 )
 
+docker_img_cloud=(
+$repo/node-controller:$version \
+$repo/extensions-all-jvm:$version
+)
+
 docker_save_bundle(){
   echo "Start saving Docker images to tar ..."
   create_dir_if_not_exists
@@ -67,6 +72,9 @@ docker_save_bundle(){
   elif [ "$1" == "core" ]; then
       echo "Save core images to tar ..."
       docker save ${docker_img_core[@]} -o $dir/$docker_bundled_core_tar
+  elif [ "$1" == "cloud" ]; then
+      echo "Save cloud images to tar ..."
+      docker save ${docker_img_cloud[@]} -o $dir/$docker_bundled_cloud_tar
   else
       echo "Save all images to tar ..."
       docker save ${docker_img_edge[@]} -o $dir/$docker_bundled_edge_tar
@@ -88,6 +96,7 @@ Usage: ./docker-save.sh core
        ./docker-save.sh edge
        ./docker-save.sh edge armv7
        ./docker-save.sh edge aarch64
+       ./docker-save.sh cloud
 EOF
 }
 
diff --git a/streampipes-connect/pom.xml b/streampipes-connect/pom.xml
index d517948..3e187e1 100755
--- a/streampipes-connect/pom.xml
+++ b/streampipes-connect/pom.xml
@@ -99,6 +99,16 @@
             <artifactId>streampipes-serializers-json</artifactId>
             <version>0.68.0-SNAPSHOT</version>
         </dependency>
+        <dependency>
+            <groupId>org.apache.streampipes</groupId>
+            <artifactId>streampipes-storage-management</artifactId>
+            <version>0.68.0-SNAPSHOT</version>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.streampipes</groupId>
+            <artifactId>streampipes-node-management</artifactId>
+            <version>0.68.0-SNAPSHOT</version>
+        </dependency>
 
         <!-- External dependencis -->
         <dependency>
@@ -167,11 +177,5 @@
             <artifactId>powermock-api-mockito2</artifactId>
             <scope>test</scope>
         </dependency>
-        <dependency>
-            <groupId>org.apache.streampipes</groupId>
-            <artifactId>streampipes-storage-management</artifactId>
-            <version>0.68.0-SNAPSHOT</version>
-            <scope>compile</scope>
-        </dependency>
     </dependencies>
 </project>
\ No newline at end of file
diff --git a/streampipes-connect/src/main/java/org/apache/streampipes/connect/adapter/Adapter.java b/streampipes-connect/src/main/java/org/apache/streampipes/connect/adapter/Adapter.java
index 183018e..4768e02 100644
--- a/streampipes-connect/src/main/java/org/apache/streampipes/connect/adapter/Adapter.java
+++ b/streampipes-connect/src/main/java/org/apache/streampipes/connect/adapter/Adapter.java
@@ -26,6 +26,7 @@ import org.apache.streampipes.config.backend.SpProtocol;
 import org.apache.streampipes.model.grounding.JmsTransportProtocol;
 import org.apache.streampipes.model.grounding.KafkaTransportProtocol;
 import org.apache.streampipes.model.grounding.MqttTransportProtocol;
+import org.apache.streampipes.node.management.NodeManagement;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.apache.streampipes.connect.adapter.exception.AdapterException;
@@ -165,36 +166,49 @@ public abstract class Adapter<T extends AdapterDescription> implements Connector
 
     private SendToBrokerAdapterSink<?> getAdapterSink(AdapterDescription adapterDescription) throws AdapterException {
 
-        if (adapterDescription.getDeploymentTargetNodeId() != null) {
-            // use edge protocol
-            SpEdgeNodeProtocol edgeNodeProtocol = BackendConfig.INSTANCE
-                    .getMessagingSettings()
-                    .getPrioritizedEdgeProtocols()
-                    .get(0);
-
-            if (GroundingService.isEdgeProtocol(edgeNodeProtocol, MqttTransportProtocol.class)) {
-                return new SendToMqttAdapterSink(adapterDescription);
-            } else if (GroundingService.isEdgeProtocol(edgeNodeProtocol, KafkaTransportProtocol.class)) {
-                return new SendToKafkaAdapterSink(adapterDescription);
+        if (adapterManagedByNodeController(adapterDescription)) {
+
+            if (isEdgeOrFogNodeTarget(adapterDescription)) {
+                return createSendToEdgeOrFogBrokerAdapterSink(adapterDescription);
             } else {
-                throw new AdapterException("Edge node protocol not supported. " + edgeNodeProtocol);
+                return createSendToCloudBrokerAdapterSink(adapterDescription);
             }
 
         } else {
-            SpProtocol prioritizedProtocol = BackendConfig.INSTANCE
-                    .getMessagingSettings()
-                    .getPrioritizedProtocols()
-                    .get(0);
+            return createSendToCloudBrokerAdapterSink(adapterDescription);
+        }
+    }
 
-            if (GroundingService.isPrioritized(prioritizedProtocol, JmsTransportProtocol.class)) {
-                return new SendToJmsAdapterSink(adapterDescription);
-            }
-            else if (GroundingService.isPrioritized(prioritizedProtocol, KafkaTransportProtocol.class)) {
-                return new SendToKafkaAdapterSink(adapterDescription);
-            }
-            else {
-                return new SendToMqttAdapterSink(adapterDescription);
-            }
+    private SendToBrokerAdapterSink<?> createSendToCloudBrokerAdapterSink(AdapterDescription adapterDescription) {
+        SpProtocol prioritizedProtocol = BackendConfig.INSTANCE
+                .getMessagingSettings()
+                .getPrioritizedProtocols()
+                .get(0);
+
+        if (GroundingService.isPrioritized(prioritizedProtocol, JmsTransportProtocol.class)) {
+            return new SendToJmsAdapterSink(adapterDescription);
+        }
+        else if (GroundingService.isPrioritized(prioritizedProtocol, KafkaTransportProtocol.class)) {
+            return new SendToKafkaAdapterSink(adapterDescription);
+        }
+        else {
+            return new SendToMqttAdapterSink(adapterDescription);
+        }
+    }
+
+    private SendToBrokerAdapterSink<?> createSendToEdgeOrFogBrokerAdapterSink(AdapterDescription adapterDescription) throws AdapterException {
+        // use edge protocol
+        SpEdgeNodeProtocol edgeNodeProtocol = BackendConfig.INSTANCE
+                .getMessagingSettings()
+                .getPrioritizedEdgeProtocols()
+                .get(0);
+
+        if (GroundingService.isEdgeProtocol(edgeNodeProtocol, MqttTransportProtocol.class)) {
+            return new SendToMqttAdapterSink(adapterDescription);
+        } else if (GroundingService.isEdgeProtocol(edgeNodeProtocol, KafkaTransportProtocol.class)) {
+            return new SendToKafkaAdapterSink(adapterDescription);
+        } else {
+            throw new AdapterException("Edge node protocol not supported. " + edgeNodeProtocol);
         }
     }
 
@@ -235,4 +249,15 @@ public abstract class Adapter<T extends AdapterDescription> implements Connector
         return debug;
     }
 
+    private static boolean adapterManagedByNodeController(AdapterDescription desc) {
+        return desc.getDeploymentTargetNodeId() != null && !desc.getDeploymentTargetNodeId().equals("default");
+    }
+
+    private static boolean isEdgeOrFogNodeTarget(AdapterDescription desc) {
+        return NodeManagement.getInstance().getAllNodes().stream()
+                .filter(n -> n.getNodeControllerId().equals(desc.getDeploymentTargetNodeId()))
+                .anyMatch(n -> n.getStaticNodeMetadata().getType().equals("edge") ||
+                        n.getStaticNodeMetadata().getType().equals("fog"));
+    }
+
 }
diff --git a/streampipes-connect/src/main/java/org/apache/streampipes/connect/adapter/GroundingService.java b/streampipes-connect/src/main/java/org/apache/streampipes/connect/adapter/GroundingService.java
index 954cf59..3194326 100644
--- a/streampipes-connect/src/main/java/org/apache/streampipes/connect/adapter/GroundingService.java
+++ b/streampipes-connect/src/main/java/org/apache/streampipes/connect/adapter/GroundingService.java
@@ -23,8 +23,6 @@ import org.apache.streampipes.config.backend.BackendConfig;
 import org.apache.streampipes.config.backend.SpEdgeNodeProtocol;
 import org.apache.streampipes.config.backend.SpProtocol;
 import org.apache.streampipes.connect.adapter.exception.AdapterException;
-import org.apache.streampipes.connect.adapter.preprocessing.elements.SendToKafkaAdapterSink;
-import org.apache.streampipes.connect.adapter.preprocessing.elements.SendToMqttAdapterSink;
 import org.apache.streampipes.connect.adapter.util.TransportFormatGenerator;
 import org.apache.streampipes.model.connect.adapter.*;
 import org.apache.streampipes.model.grounding.EventGrounding;
@@ -35,6 +33,7 @@ import org.apache.streampipes.model.grounding.SimpleTopicDefinition;
 import org.apache.streampipes.model.grounding.TopicDefinition;
 import org.apache.streampipes.model.grounding.TransportProtocol;
 import org.apache.streampipes.model.node.NodeInfoDescription;
+import org.apache.streampipes.node.management.NodeManagement;
 import org.apache.streampipes.storage.management.StorageDispatcher;
 
 import java.util.Collections;
@@ -92,54 +91,16 @@ public class GroundingService {
         String topic = TOPIC_PREFIX + UUID.randomUUID().toString();
         TopicDefinition topicDefinition = new SimpleTopicDefinition(topic);
 
-        if (adapterDescription.getDeploymentTargetNodeId() != null) {
+        if (adapterManagedByNodeController(adapterDescription)) {
 
-            String nodeControllerId = extractNodeControllerId(adapterDescription);
-            SpEdgeNodeProtocol edgeNodeProtocol = BackendConfig.INSTANCE
-                    .getMessagingSettings()
-                    .getPrioritizedEdgeProtocols()
-                    .get(0);
-
-            if (isEdgeProtocol(edgeNodeProtocol, MqttTransportProtocol.class)) {
-                MqttTransportProtocol brokerTransportProtocol =
-                        (MqttTransportProtocol) getNodeBrokerTransportProtocol(nodeControllerId);
-                brokerTransportProtocol.setTopicDefinition(topicDefinition);
-
-                eventGrounding.setTransportProtocol(brokerTransportProtocol);
-
-            } else if (isEdgeProtocol(edgeNodeProtocol, KafkaTransportProtocol.class)) {
-                KafkaTransportProtocol brokerTransportProtocol =
-                        (KafkaTransportProtocol) getNodeBrokerTransportProtocol(nodeControllerId);
-                brokerTransportProtocol.setTopicDefinition(topicDefinition);
-
-                eventGrounding.setTransportProtocol(brokerTransportProtocol);
+            if (isEdgeOrFogNodeTarget(adapterDescription)) {
+                createEdgeOrFogGrounding(eventGrounding, topicDefinition, adapterDescription);
             } else {
-                throw new AdapterException("Edge node protocol not supported. " + edgeNodeProtocol);
+                createCloudProtocol(eventGrounding, topicDefinition);
             }
 
         } else {
-            SpProtocol prioritizedProtocol =
-                BackendConfig.INSTANCE.getMessagingSettings().getPrioritizedProtocols().get(0);
-
-            if (isPrioritized(prioritizedProtocol, JmsTransportProtocol.class)) {
-                eventGrounding.setTransportProtocol(
-                        makeJmsTransportProtocol(
-                                BackendConfig.INSTANCE.getJmsHost(),
-                                BackendConfig.INSTANCE.getJmsPort(),
-                                topicDefinition));
-            } else if (isPrioritized(prioritizedProtocol, KafkaTransportProtocol.class)){
-                eventGrounding.setTransportProtocol(
-                        makeKafkaTransportProtocol(
-                                BackendConfig.INSTANCE.getKafkaHost(),
-                                BackendConfig.INSTANCE.getKafkaPort(),
-                                topicDefinition));
-            } else if (isPrioritized(prioritizedProtocol, MqttTransportProtocol.class)) {
-                eventGrounding.setTransportProtocol(
-                        makeMqttTransportProtocol(
-                                BackendConfig.INSTANCE.getMqttHost(),
-                                BackendConfig.INSTANCE.getMqttPort(),
-                                topicDefinition));
-            }
+            createCloudProtocol(eventGrounding, topicDefinition);
         }
 
         eventGrounding.setTransportFormats(Collections
@@ -148,6 +109,57 @@ public class GroundingService {
         return eventGrounding;
     }
 
+    private static void createEdgeOrFogGrounding(EventGrounding eventGrounding, TopicDefinition topicDefinition,
+                                                 AdapterDescription adapterDescription) throws AdapterException {
+        String nodeControllerId = extractNodeControllerId(adapterDescription);
+        SpEdgeNodeProtocol edgeNodeProtocol = BackendConfig.INSTANCE
+                .getMessagingSettings()
+                .getPrioritizedEdgeProtocols()
+                .get(0);
+
+        if (isEdgeProtocol(edgeNodeProtocol, MqttTransportProtocol.class)) {
+            MqttTransportProtocol brokerTransportProtocol =
+                    (MqttTransportProtocol) getNodeBrokerTransportProtocol(nodeControllerId);
+            brokerTransportProtocol.setTopicDefinition(topicDefinition);
+
+            eventGrounding.setTransportProtocol(brokerTransportProtocol);
+
+        } else if (isEdgeProtocol(edgeNodeProtocol, KafkaTransportProtocol.class)) {
+            KafkaTransportProtocol brokerTransportProtocol =
+                    (KafkaTransportProtocol) getNodeBrokerTransportProtocol(nodeControllerId);
+            brokerTransportProtocol.setTopicDefinition(topicDefinition);
+
+            eventGrounding.setTransportProtocol(brokerTransportProtocol);
+        } else {
+            throw new AdapterException("Edge node protocol not supported. " + edgeNodeProtocol);
+        }
+    }
+
+    private static void createCloudProtocol(EventGrounding eventGrounding, TopicDefinition topicDefinition) {
+        SpProtocol prioritizedProtocol =
+                BackendConfig.INSTANCE.getMessagingSettings().getPrioritizedProtocols().get(0);
+
+        if (isPrioritized(prioritizedProtocol, JmsTransportProtocol.class)) {
+            eventGrounding.setTransportProtocol(
+                    makeJmsTransportProtocol(
+                            BackendConfig.INSTANCE.getJmsHost(),
+                            BackendConfig.INSTANCE.getJmsPort(),
+                            topicDefinition));
+        } else if (isPrioritized(prioritizedProtocol, KafkaTransportProtocol.class)){
+            eventGrounding.setTransportProtocol(
+                    makeKafkaTransportProtocol(
+                            BackendConfig.INSTANCE.getKafkaHost(),
+                            BackendConfig.INSTANCE.getKafkaPort(),
+                            topicDefinition));
+        } else if (isPrioritized(prioritizedProtocol, MqttTransportProtocol.class)) {
+            eventGrounding.setTransportProtocol(
+                    makeMqttTransportProtocol(
+                            BackendConfig.INSTANCE.getMqttHost(),
+                            BackendConfig.INSTANCE.getMqttPort(),
+                            topicDefinition));
+        }
+    }
+
     private static String extractNodeControllerId(AdapterDescription adapterDescription) {
         if (adapterDescription instanceof GenericAdapterDescription) {
             return ((GenericAdapterDescription) adapterDescription)
@@ -212,4 +224,15 @@ public class GroundingService {
     private static Optional<NodeInfoDescription> getNodeInfoDescriptionForId(String id){
         return StorageDispatcher.INSTANCE.getNoSqlStore().getNodeStorage().getNode(id);
     }
+
+    private static boolean adapterManagedByNodeController(AdapterDescription desc) {
+        return desc.getDeploymentTargetNodeId() != null && !desc.getDeploymentTargetNodeId().equals("default");
+    }
+
+    private static boolean isEdgeOrFogNodeTarget(AdapterDescription desc) {
+        return NodeManagement.getInstance().getAllNodes().stream()
+                .filter(n -> n.getNodeControllerId().equals(desc.getDeploymentTargetNodeId()))
+                .anyMatch(n -> n.getStaticNodeMetadata().getType().equals("edge") ||
+                        n.getStaticNodeMetadata().getType().equals("fog"));
+    }
 }
diff --git a/streampipes-container/src/main/java/org/apache/streampipes/container/api/DataSinkPipelineElementResource.java b/streampipes-container/src/main/java/org/apache/streampipes/container/api/DataSinkPipelineElementResource.java
index 72ef4eb..86d5d41 100644
--- a/streampipes-container/src/main/java/org/apache/streampipes/container/api/DataSinkPipelineElementResource.java
+++ b/streampipes-container/src/main/java/org/apache/streampipes/container/api/DataSinkPipelineElementResource.java
@@ -59,7 +59,7 @@ public class DataSinkPipelineElementResource extends InvocablePipelineElementRes
             protocol.setBrokerHostname("localhost");
             if (graph.getDeploymentTargetNodeId().equals("default")) {
                 if (protocol instanceof KafkaTransportProtocol) {
-                    ((KafkaTransportProtocol) protocol).setKafkaPort(9095);
+                    ((KafkaTransportProtocol) protocol).setKafkaPort(9094);
                 }
             } else {
                 if (protocol instanceof KafkaTransportProtocol) {
diff --git a/streampipes-node-controller/arm.Dockerfile b/streampipes-node-controller/arm.Dockerfile
index cff0520..a155288 100644
--- a/streampipes-node-controller/arm.Dockerfile
+++ b/streampipes-node-controller/arm.Dockerfile
@@ -27,7 +27,11 @@ ENV CONSUL_LOCATION consul
 
 COPY --from=build-dev /usr/bin/qemu-arm-static /usr/bin
 RUN set -ex; \
-    apt -y update; \
+    #apt -y update; \
+        apt update && apt upgrade; \
+        apt -y install gnupg2; \
+        apt-key adv --recv-key --keyserver keyserver.ubuntu.com 648ACFD622F3D138; \
+        apt-key adv --recv-key --keyserver keyserver.ubuntu.com 0E98404D386FA1D9; \
     apt -y --no-install-recommends install libjffi-jni curl; \
     apt clean; \
     rm -rf /tmp/apache-* /var/lib/apt/lists/*
diff --git a/streampipes-node-controller/src/main/java/org/apache/streampipes/node/controller/config/EnvConfigParam.java b/streampipes-node-controller/src/main/java/org/apache/streampipes/node/controller/config/EnvConfigParam.java
index f19e69a..f235a5c 100644
--- a/streampipes-node-controller/src/main/java/org/apache/streampipes/node/controller/config/EnvConfigParam.java
+++ b/streampipes-node-controller/src/main/java/org/apache/streampipes/node/controller/config/EnvConfigParam.java
@@ -41,6 +41,7 @@ public enum EnvConfigParam {
     BACKEND_HOST("SP_BACKEND_HOST", "localhost"),
     BACKEND_PORT("SP_BACKEND_PORT", "8030"),
     DOCKER_PRUNING_FREQ("SP_DOCKER_PRUNING_FREQ_SECS", "3600"),
+    DOCKER_STATS_COLLECT_FREQ("SP_DOCKER_STATS_COLLECT_FREQ_SECS", "1"),
     RESOURCE_UPDATE_FREQ("SP_NODE_RESOURCE_UPDATE_FREQ_SECS", "30"),
     EVENT_RELAY_BUFFER_SIZE("SP_NODE_EVENT_BUFFER_SIZE", "1000"),
     NODE_GPU_ACCESS("SP_NODE_HAS_GPU", "false"),
diff --git a/streampipes-node-controller/src/main/java/org/apache/streampipes/node/controller/config/NodeConfiguration.java b/streampipes-node-controller/src/main/java/org/apache/streampipes/node/controller/config/NodeConfiguration.java
index 2b3a79a..4f9dc58 100644
--- a/streampipes-node-controller/src/main/java/org/apache/streampipes/node/controller/config/NodeConfiguration.java
+++ b/streampipes-node-controller/src/main/java/org/apache/streampipes/node/controller/config/NodeConfiguration.java
@@ -55,6 +55,7 @@ public final class NodeConfiguration {
     private static int gpuCores;
     private static String gpuType;
     private static int dockerPruningFreqSecs;
+    private static int dockerStatsCollectFreqSecs;
     private static int resourceMonitorFreqSecs;
     private static int relayEventBufferSize;
     private static String consulHost;
@@ -241,6 +242,14 @@ public final class NodeConfiguration {
         NodeConfiguration.dockerPruningFreqSecs = dockerPruningFreqSecs;
     }
 
+    public static int getDockerStatsCollectFreqSecs() {
+        return dockerStatsCollectFreqSecs;
+    }
+
+    public static void setDockerStatsCollectFreqSecs(int dockerStatsCollectFreqSecs) {
+        NodeConfiguration.dockerStatsCollectFreqSecs = dockerStatsCollectFreqSecs;
+    }
+
     public static int getResourceMonitorFreqSecs() {
         return resourceMonitorFreqSecs;
     }
@@ -464,6 +473,10 @@ public final class NodeConfiguration {
                     configMap.put(envKey, value);
                     setDockerPruningFreqSecs(Integer.parseInt(value));
                     break;
+                case DOCKER_STATS_COLLECT_FREQ:
+                    configMap.put(envKey, value);
+                    setDockerStatsCollectFreqSecs(Integer.parseInt(value));
+                    break;
                 case RESOURCE_UPDATE_FREQ:
                     configMap.put(envKey, value);
                     setResourceMonitorFreqSecs(Integer.parseInt(value));
diff --git a/streampipes-node-controller/src/main/java/org/apache/streampipes/node/controller/management/NodeControllerSubmitter.java b/streampipes-node-controller/src/main/java/org/apache/streampipes/node/controller/management/NodeControllerSubmitter.java
index 26b19d3..1276e16 100644
--- a/streampipes-node-controller/src/main/java/org/apache/streampipes/node/controller/management/NodeControllerSubmitter.java
+++ b/streampipes-node-controller/src/main/java/org/apache/streampipes/node/controller/management/NodeControllerSubmitter.java
@@ -81,7 +81,7 @@ public abstract class NodeControllerSubmitter {
                 LOG.info("Start janitor manager");
                 JanitorManager.getInstance().run();
 
-                // TODO: remove after evaluation tests
+                //TODO: remove after evaluation tests
                 LOG.info("Start docker stats collector");
                 DockerStatsCollector.getInstance().run();
             }
diff --git a/streampipes-node-controller/src/main/java/org/apache/streampipes/node/controller/management/orchestrator/docker/DockerContainerDeclarerSingleton.java b/streampipes-node-controller/src/main/java/org/apache/streampipes/node/controller/management/orchestrator/docker/DockerContainerDeclarerSingleton.java
index 7bd526c..32ee882 100644
--- a/streampipes-node-controller/src/main/java/org/apache/streampipes/node/controller/management/orchestrator/docker/DockerContainerDeclarerSingleton.java
+++ b/streampipes-node-controller/src/main/java/org/apache/streampipes/node/controller/management/orchestrator/docker/DockerContainerDeclarerSingleton.java
@@ -64,8 +64,15 @@ public class DockerContainerDeclarerSingleton {
     }
 
     public List<DockerContainer> getAutoDeploymentDockerContainers() {
-        if ("kafka".equals(NodeConfiguration.getNodeBrokerProtocol())) {
-            remove(DockerMosquittoContainer.SP_SVC_MOSQUITTO_ID);
+//        if ("kafka".equals(NodeConfiguration.getNodeBrokerProtocol())) {
+//            remove(DockerMosquittoContainer.SP_SVC_MOSQUITTO_ID);
+//        } else {
+//            remove(DockerKafkaContainer.SP_SVC_KAFKA_ID, DockerZookeeperContainer.SP_SVC_ZOOKEEPER_ID);
+//        }
+        if ("cloud".equals(NodeConfiguration.getNodeType())) {
+            remove(DockerMosquittoContainer.SP_SVC_MOSQUITTO_ID,
+                    DockerKafkaContainer.SP_SVC_KAFKA_ID,
+                    DockerZookeeperContainer.SP_SVC_ZOOKEEPER_ID);
         } else {
             remove(DockerKafkaContainer.SP_SVC_KAFKA_ID, DockerZookeeperContainer.SP_SVC_ZOOKEEPER_ID);
         }
diff --git a/streampipes-node-controller/src/main/java/org/apache/streampipes/node/controller/management/relay/bridges/MultiBrokerBridge.java b/streampipes-node-controller/src/main/java/org/apache/streampipes/node/controller/management/relay/bridges/MultiBrokerBridge.java
index b774bc2..9bea9c9 100644
--- a/streampipes-node-controller/src/main/java/org/apache/streampipes/node/controller/management/relay/bridges/MultiBrokerBridge.java
+++ b/streampipes-node-controller/src/main/java/org/apache/streampipes/node/controller/management/relay/bridges/MultiBrokerBridge.java
@@ -60,12 +60,12 @@ public abstract class MultiBrokerBridge<T1 extends TransportProtocol, T2 extends
         this.consumer = consumerSupplier.get();
         this.producer = producerSupplier.get();
 
-        this.relayInfo = relayInfo();
-        this.metrics = new RelayMetrics(relayInfo.c, sourceProtocol, targetProtocol, eventRelayStrategy);
-
         if ("true".equals(System.getenv("SP_DEBUG"))) {
             modifyProtocolForDebugging();
         }
+
+        this.relayInfo = relayInfo();
+        this.metrics = new RelayMetrics(relayInfo.c, sourceProtocol, targetProtocol, eventRelayStrategy);
     }
 
     protected abstract void modifyProtocolForDebugging();
diff --git a/streampipes-node-controller/src/main/java/org/apache/streampipes/node/controller/management/resource/utils/FileSystemType.java b/streampipes-node-controller/src/main/java/org/apache/streampipes/node/controller/management/resource/utils/FileSystemType.java
index 8b56c82..8d40bc4 100644
--- a/streampipes-node-controller/src/main/java/org/apache/streampipes/node/controller/management/resource/utils/FileSystemType.java
+++ b/streampipes-node-controller/src/main/java/org/apache/streampipes/node/controller/management/resource/utils/FileSystemType.java
@@ -23,7 +23,8 @@ public enum FileSystemType {
     DISK("/dev/disk"),
     ROOT("/dev/root"),
     MMCBLK("/dev/mmcblk0p1"),
-    SDB("/dev/sdb");
+    SDB("/dev/sdb"),
+    MAPPER("/dev/mapper");
 
     private final String name;
 
diff --git a/streampipes-node-controller/src/main/java/org/apache/streampipes/node/controller/management/resource/utils/ResourceUtils.java b/streampipes-node-controller/src/main/java/org/apache/streampipes/node/controller/management/resource/utils/ResourceUtils.java
index 717b486..5ad6da3 100644
--- a/streampipes-node-controller/src/main/java/org/apache/streampipes/node/controller/management/resource/utils/ResourceUtils.java
+++ b/streampipes-node-controller/src/main/java/org/apache/streampipes/node/controller/management/resource/utils/ResourceUtils.java
@@ -78,6 +78,9 @@ public class ResourceUtils {
             addDiskUsage(diskUsage, f);
         } else if (volume.contains(FileSystemType.SDB.getName())){
             addDiskUsage(diskUsage, f);
+        } else if (volume.contains(FileSystemType.MAPPER.getName())) {
+            // block devices
+            addDiskUsage(diskUsage, f);
         }
     }
 
diff --git a/streampipes-node-controller/src/main/java/org/apache/streampipes/node/controller/management/statscollector/DockerStatsCollector.java b/streampipes-node-controller/src/main/java/org/apache/streampipes/node/controller/management/statscollector/DockerStatsCollector.java
index ccad01c..2d038fc 100644
--- a/streampipes-node-controller/src/main/java/org/apache/streampipes/node/controller/management/statscollector/DockerStatsCollector.java
+++ b/streampipes-node-controller/src/main/java/org/apache/streampipes/node/controller/management/statscollector/DockerStatsCollector.java
@@ -19,20 +19,23 @@ package org.apache.streampipes.node.controller.management.statscollector;
 
 import com.spotify.docker.client.messages.ContainerStats;
 import org.apache.streampipes.logging.evaluation.EvaluationLogger;
+import org.apache.streampipes.model.Tuple2;
 import org.apache.streampipes.node.controller.config.NodeConfiguration;
 import org.apache.streampipes.node.controller.management.orchestrator.docker.utils.DockerUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.util.Map;
+import java.util.*;
 import java.util.concurrent.Executors;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicLong;
 
 public class DockerStatsCollector {
 
     private static final Logger LOG = LoggerFactory.getLogger(DockerStatsCollector.class.getCanonicalName());
     private static final String LOGGING_TOPIC = "container/stats/" + NodeConfiguration.getNodeHost();
+    private static final int DOCKER_STATS_COLLECT_FREQ_SECS = NodeConfiguration.getDockerStatsCollectFreqSecs();
 
     private static DockerStatsCollector instance = null;
 
@@ -55,13 +58,19 @@ public class DockerStatsCollector {
 
         Object[] header = new Object[]{
                 "timestamp",
+                "hostName",
                 "containerName",
                 "cpuPercent",
                 "memPercent",
                 "memUsageInBytes",
                 "memUsageHumanReadable",
                 "memTotalInBytes",
-                "memTotalHumanReadable"};
+                "memTotalHumanReadable",
+                "netRxInBytes",
+                "netRxHumanReadable",
+                "netTxInBytes",
+                "netTxHumanReadable"
+        };
 
         EvaluationLogger.getInstance().logMQTT(LOGGING_TOPIC, header);
     }
@@ -69,7 +78,7 @@ public class DockerStatsCollector {
     private final Runnable collect = () -> {
         LOG.debug("Collect Docker stats");
         Map<String, ContainerStats> stats = DockerUtils.getInstance().collectStats();
-        long timestamp = System.currentTimeMillis();
+        long averageTimestamp = calculateAverageTimestamp(stats);
 
         stats.forEach((containerName, containerStats) -> {
 
@@ -78,17 +87,48 @@ public class DockerStatsCollector {
             double memTotal = containerStats.memoryStats().limit();
             double memPercent = (memUsageInBytes / memTotal) * 100.0;
 
+            Tuple2<Long,Long> netRxTx = DockerStatsUtils.calculateNetworkIo(containerStats);
+            long netRxInBytes = netRxTx.a;
+            long netTxInBytes = netRxTx.b;
+
             Object[] collectedStats = new Object[]{
-                    timestamp,
+                    averageTimestamp,
+                    NodeConfiguration.getNodeHost(),
                     containerName,
                     cpuPercent,
                     memPercent,
                     memUsageInBytes,
                     DockerStatsUtils.humanReadableByteCountBin((long) memUsageInBytes),
                     memTotal,
-                    DockerStatsUtils.humanReadableByteCountBin((long) memTotal)};
+                    DockerStatsUtils.humanReadableByteCountBin((long) memTotal),
+                    netRxInBytes,
+                    DockerStatsUtils.humanReadableByteCountBin(netRxInBytes),
+                    netTxInBytes,
+                    DockerStatsUtils.humanReadableByteCountBin(netTxInBytes)
+            };
 
             EvaluationLogger.getInstance().logMQTT(LOGGING_TOPIC, collectedStats);
         });
     };
+
+    // Helpers
+
+    private long calculateAverageTimestamp(Map<String, ContainerStats> stats) {
+        Date max = stats.values().stream().map(ContainerStats::read).max(Date::compareTo).get();
+        Date min = stats.values().stream().map(ContainerStats::read).min(Date::compareTo).get();
+
+        List<Date> dates = new ArrayList<>();
+        dates.add(min);
+        dates.add(max);
+
+        long totalSeconds = 0L;
+        for (Date date : dates) {
+            totalSeconds += date.getTime() / 1000L;
+        }
+        long averageSeconds = totalSeconds / dates.size();
+        Date averageDate = new Date(averageSeconds * 1000L);
+
+        // return unix timestamp
+        return averageDate.getTime();
+    }
 }
diff --git a/streampipes-node-controller/src/main/java/org/apache/streampipes/node/controller/management/statscollector/DockerStatsUtils.java b/streampipes-node-controller/src/main/java/org/apache/streampipes/node/controller/management/statscollector/DockerStatsUtils.java
index b4c7be9..a94c172 100644
--- a/streampipes-node-controller/src/main/java/org/apache/streampipes/node/controller/management/statscollector/DockerStatsUtils.java
+++ b/streampipes-node-controller/src/main/java/org/apache/streampipes/node/controller/management/statscollector/DockerStatsUtils.java
@@ -18,9 +18,13 @@
 package org.apache.streampipes.node.controller.management.statscollector;
 
 import com.spotify.docker.client.messages.ContainerStats;
+import com.spotify.docker.client.messages.NetworkStats;
+import org.apache.streampipes.model.Tuple2;
 
 import java.text.CharacterIterator;
 import java.text.StringCharacterIterator;
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicLong;
 
 public class DockerStatsUtils {
 
@@ -50,6 +54,17 @@ public class DockerStatsUtils {
         return cpuPercent;
     }
 
+    public static Tuple2<Long,Long> calculateNetworkIo(ContainerStats containerStats) {
+        Map<String, NetworkStats> networks = containerStats.networks();
+        AtomicLong rx = new AtomicLong(0L);
+        AtomicLong tx = new AtomicLong(0L);
+        networks.values().forEach(n -> {
+            rx.addAndGet(n.rxBytes());
+            tx.addAndGet(n.txBytes());
+        });
+        return new Tuple2<>(rx.longValue(),tx.longValue());
+    }
+
     public static String humanReadableByteCountBin(long bytes) {
         long absB = bytes == Long.MIN_VALUE ? Long.MAX_VALUE : Math.abs(bytes);
         if (absB < 1024) {
diff --git a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/matching/InvocationGraphBuilder.java b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/matching/InvocationGraphBuilder.java
index 1766227..d41367b 100644
--- a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/matching/InvocationGraphBuilder.java
+++ b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/matching/InvocationGraphBuilder.java
@@ -22,7 +22,6 @@ import org.apache.streampipes.commons.exceptions.SpRuntimeException;
 import org.apache.streampipes.config.backend.BackendConfig;
 import org.apache.streampipes.config.backend.SpEdgeNodeProtocol;
 import org.apache.streampipes.config.backend.SpProtocol;
-import org.apache.streampipes.container.util.ConsulUtil;
 import org.apache.streampipes.manager.data.PipelineGraph;
 import org.apache.streampipes.manager.data.PipelineGraphHelpers;
 import org.apache.streampipes.manager.matching.output.OutputSchemaFactory;
@@ -38,6 +37,7 @@ import org.apache.streampipes.model.monitoring.ElementStatusInfoSettings;
 import org.apache.streampipes.model.node.NodeInfoDescription;
 import org.apache.streampipes.model.output.OutputStrategy;
 import org.apache.streampipes.model.schema.EventSchema;
+import org.apache.streampipes.node.management.NodeManagement;
 import org.apache.streampipes.sdk.helpers.Tuple2;
 import org.apache.streampipes.storage.api.INodeInfoStorage;
 import org.apache.streampipes.storage.management.StorageDispatcher;
@@ -101,10 +101,15 @@ public class InvocationGraphBuilder {
           else if (defaultDeploymentTarget(t) && source instanceof DataProcessorInvocation) {
             // target runs on cloud node: use central cloud broker, e.g. kafka
             connectSourceToTarget((DataProcessorInvocation) source, t, inputGrounding, false);
-          }
-          else {
-            // target runs on other edge node: use target edge node broker
-            connectSourceToTarget((DataProcessorInvocation) source, t, inputGrounding, true);
+          } else {
+
+            if (isEdgeOrFogNodeTarget(t)) {
+              // target runs on other edge node: use target edge node broker
+              connectSourceToTarget((DataProcessorInvocation) source, t, inputGrounding, true);
+            } else {
+              // target rungs on cloud node managed by node controller
+              connectSourceToTarget((DataProcessorInvocation) source, t, inputGrounding, false);
+            }
           }
 
         } else {
@@ -124,10 +129,13 @@ public class InvocationGraphBuilder {
             t.getInputStreams()
                     .get(getIndex(source.getDOM(),t))
                     .setEventGrounding(eg);
-          } else if (targetInvocableOnEdgeNode(t)) {
+          } else if (targetInvocableManagedByNodeController(t)) {
+
+            boolean isEdgeOrFog = isEdgeOrFogNodeTarget(t);
+
             // case 2: target on other edge node -> relay + target node broker
             // use unique topic for target in case we have multiple source stream relays to the target
-            EventGrounding eg = generateRelayGrounding(inputGrounding,t,true);
+            EventGrounding eg = generateRelayGrounding(inputGrounding,t,isEdgeOrFog);
             String oldTopic = extractTopic(eg);
             eg.getTransportProtocol().getTopicDefinition().setActualTopicName(oldTopic + "."
                     + this.pipelineId);
@@ -165,10 +173,17 @@ public class InvocationGraphBuilder {
                     .get(getRelayIndex((DataProcessorInvocation) source, t)));
   }
 
-  private boolean targetInvocableOnEdgeNode(InvocableStreamPipesEntity t) {
+  private boolean targetInvocableManagedByNodeController(InvocableStreamPipesEntity t) {
     return t.getDeploymentTargetNodeId() != null && !t.getDeploymentTargetNodeId().equals(DEFAULT_TAG);
   }
 
+  private boolean isEdgeOrFogNodeTarget(InvocableStreamPipesEntity t) {
+    return NodeManagement.getInstance().getAllNodes().stream()
+            .filter(n -> n.getNodeControllerId().equals(t.getDeploymentTargetNodeId()))
+            .anyMatch(n -> n.getStaticNodeMetadata().getType().equals("edge") ||
+                    n.getStaticNodeMetadata().getType().equals("fog"));
+  }
+
   private boolean defaultDeploymentTarget(InvocableStreamPipesEntity t) {
     return t.getDeploymentTargetNodeId() != null && t.getDeploymentTargetNodeId().equals(DEFAULT_TAG);
   }
diff --git a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/matching/ProtocolSelector.java b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/matching/ProtocolSelector.java
index ee90a00..9d595f7 100644
--- a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/matching/ProtocolSelector.java
+++ b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/matching/ProtocolSelector.java
@@ -29,6 +29,7 @@ import org.apache.streampipes.model.SpDataStream;
 import org.apache.streampipes.model.graph.DataProcessorInvocation;
 import org.apache.streampipes.model.grounding.*;
 import org.apache.streampipes.model.node.NodeInfoDescription;
+import org.apache.streampipes.node.management.NodeManagement;
 import org.apache.streampipes.storage.management.StorageDispatcher;
 
 import java.util.List;
@@ -71,15 +72,38 @@ public class ProtocolSelector extends GroundingSelector {
                     .getEventGrounding()
                     .getTransportProtocol();
         } else {
-            if(sourceInvocableOnEdgeNode()) {
-                for (SpEdgeNodeProtocol p: prioritizedEdgeProtocols) {
-                    // use edge node protocol
-                    if (matches(p, MqttTransportProtocol.class) && supportsProtocol(MqttTransportProtocol.class)) {
-                        return mqttTransportProtocolForEdge();
-                    } else if (matches(p, KafkaTransportProtocol.class) && supportsProtocol(KafkaTransportProtocol.class)) {
-                        return kafkaTransportProtocolForEdge();
+            if(sourceInvocableManagedByNodeController()) {
+                InvocableStreamPipesEntity invocableStreamPipesEntity = (InvocableStreamPipesEntity ) source;
+
+                boolean isEdgeOrFog = NodeManagement.getInstance().getAllNodes().stream()
+                        .filter(n -> n.getNodeControllerId()
+                                .equals(invocableStreamPipesEntity.getDeploymentTargetNodeId()))
+                        .anyMatch(n -> n.getStaticNodeMetadata().getType().equals("edge") ||
+                                n.getStaticNodeMetadata().getType().equals("fog"));
+
+                if (isEdgeOrFog) {
+                    for (SpEdgeNodeProtocol p: prioritizedEdgeProtocols) {
+                        // use edge node protocol
+                        if (matches(p, MqttTransportProtocol.class) && supportsProtocol(MqttTransportProtocol.class)) {
+                            return mqttTransportProtocolForEdge();
+                        } else if (matches(p, KafkaTransportProtocol.class) && supportsProtocol(KafkaTransportProtocol.class)) {
+                            return kafkaTransportProtocolForEdge();
+                        }
+                    }
+                } else {
+                    for(SpProtocol p: prioritizedProtocols) {
+                        if (matches(p, KafkaTransportProtocol.class) && supportsProtocol(KafkaTransportProtocol.class)) {
+                            return kafkaTransportProtocol();
+                        } else if (matches(p, JmsTransportProtocol.class) && supportsProtocol(JmsTransportProtocol.class)) {
+                            return jmsTransportProtocol();
+                        } else if (matches(p, MqttTransportProtocol.class) && supportsProtocol(MqttTransportProtocol.class)) {
+                            return mqttTransportProtocol();
+                        } else {
+                            throw new IllegalArgumentException("Transport protocol not found: " + p.getProtocolClass());
+                        }
                     }
                 }
+
             } else {
                 for(SpProtocol p: prioritizedProtocols) {
                     if (matches(p, KafkaTransportProtocol.class) && supportsProtocol(KafkaTransportProtocol.class)) {
@@ -149,7 +173,7 @@ public class ProtocolSelector extends GroundingSelector {
         }
     }
 
-    private boolean sourceInvocableOnEdgeNode() {
+    private boolean sourceInvocableManagedByNodeController() {
         return ((InvocableStreamPipesEntity) source).getDeploymentTargetNodeId() != null &&
                 !((InvocableStreamPipesEntity) source).getDeploymentTargetNodeId().equals("default");
     }
diff --git a/ui/src/app/configuration/node-configuration/node-configuration-details/node-configuration-details.component.ts b/ui/src/app/configuration/node-configuration/node-configuration-details/node-configuration-details.component.ts
index 1a67e17..454e7db 100644
--- a/ui/src/app/configuration/node-configuration/node-configuration-details/node-configuration-details.component.ts
+++ b/ui/src/app/configuration/node-configuration/node-configuration-details/node-configuration-details.component.ts
@@ -59,6 +59,7 @@ export class NodeConfigurationDetailsComponent implements OnInit {
     ];
 
   deviceTypes = [
+    {value: 'gpsmodule', viewValue: 'GPS module'},
     {value: 'sensor', viewValue: 'Sensor'},
     {value: 'actuator', viewValue: 'Actuator'},
     {value: 'camera', viewValue: 'Camera'},
diff --git a/ui/src/app/editor/dialog/save-pipeline/save-pipeline.component.ts b/ui/src/app/editor/dialog/save-pipeline/save-pipeline.component.ts
index 481130f..1134476 100644
--- a/ui/src/app/editor/dialog/save-pipeline/save-pipeline.component.ts
+++ b/ui/src/app/editor/dialog/save-pipeline/save-pipeline.component.ts
@@ -222,17 +222,17 @@ export class SavePipelineComponent implements OnInit {
         })
       })
 
-      // this.tmpPipeline.actions.forEach(actions => {
-      //   this.deploymentOptions[actions.appId] = [];
-      //
-      //   filteredNodes.forEach(filteredNode => {
-      //
-      //     if (filteredNode.supportedElements.length != 0 &&
-      //         filteredNode.supportedElements.some(appId => appId === actions.appId)) {
-      //       this.deploymentOptions[actions.appId].push(filteredNode);
-      //     }
-      //   })
-      // })
+      this.tmpPipeline.actions.forEach(actions => {
+        this.deploymentOptions[actions.appId] = [];
+
+        filteredNodes.forEach(filteredNode => {
+
+          if (filteredNode.supportedElements.length != 0 &&
+              filteredNode.supportedElements.some(appId => appId === actions.appId)) {
+            this.deploymentOptions[actions.appId].push(filteredNode);
+          }
+        })
+      })
 
     } else {
       this.addAppIds(this.tmpPipeline.sepas, this.edgeNodes);