You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@streampipes.apache.org by wi...@apache.org on 2020/11/23 10:57:57 UTC

[incubator-streampipes] branch edge-extensions updated: refactor node-controller and add initial work on local container management

This is an automated email from the ASF dual-hosted git repository.

wiener pushed a commit to branch edge-extensions
in repository https://gitbox.apache.org/repos/asf/incubator-streampipes.git


The following commit(s) were added to refs/heads/edge-extensions by this push:
     new 84d72fa  refactor node-controller and add initial work on local container management
84d72fa is described below

commit 84d72fa4fde52d4ae3c252d44d1f183b8a3cc5e0
Author: Patrick Wiener <wi...@fzi.de>
AuthorDate: Mon Nov 23 11:57:34 2020 +0100

    refactor node-controller and add initial work on local container management
---
 .idea/runConfigurations/all_pe_jvm__edge_01_.xml   |  35 +++++
 .idea/runConfigurations/all_pe_jvm__edge_02_.xml   |  35 +++++
 .idea/runConfigurations/all_pe_jvm__fog_.xml       |  35 +++++
 .idea/runConfigurations/all_pe_jvm__primary_.xml   |  21 +++
 .idea/runConfigurations/node_controller_cloud.xml  |  32 ++++
 .../runConfigurations/node_controller_edge_01.xml  |  34 +++++
 .../runConfigurations/node_controller_edge_02.xml  |  33 +++++
 .idea/runConfigurations/node_controller_fog.xml    |  32 ++++
 .../standalone/init/StandaloneModelSubmitter.java  |  28 ++--
 .../model/node/InvocableRegistration.java          |  52 +++++++
 .../container/util/NodeControllerUtil.java         | 154 +++++++++++---------
 .../streampipes/model/node/NodeInfoBuilder.java    |   5 +
 .../streampipes/model/node/NodeMetadata.java       |  12 +-
 .../model/node/PipelineElementDockerContainer.java |  17 ++-
 .../PipelineElementDockerContainerBuilder.java     |  73 ++++++++++
 streampipes-node-controller-container/pom.xml      |  10 ++
 ...ainer.java => NodeControllerContainerInit.java} |  15 +-
 .../controller/container/config/ConfigKeys.java    |  31 ++--
 .../container/config/NodeControllerConfig.java     | 161 ++++++++-------------
 ...ContainerStatus.java => IRunningInstances.java} |  17 ++-
 .../container/management/info/NodeInfoStorage.java |   9 +-
 .../management/node/NodeJanitorManager.java        |   2 +-
 .../ContainerOrchestrator.java                     |   7 +-
 .../ContainerStatus.java                           |   3 +-
 .../RunningContainerInstances.java}                |  24 +--
 .../docker/DockerContainerOrchestrator.java}       |  44 +++---
 .../docker}/DockerInfo.java                        |   3 +-
 .../orchestrator/docker/DockerNodeContainer.java   |  67 +++++++++
 .../docker}/DockerUtils.java                       |  31 ++--
 .../management/pe/PipelineElementManager.java      |  64 +++++++-
 .../management/relay/RunningRelayInstances.java    |  14 +-
 .../management/resource/ResourceManager.java       |   1 -
 .../container/rest/DebugRelayResource.java         |   2 +-
 ...ource.java => InvocableManagementResource.java} |  69 ++++-----
 .../rest/NodeControllerResourceConfig.java         |   2 +-
 .../manager/execution/http/GraphSubmitter.java     |   8 +-
 .../manager/matching/InvocationGraphBuilder.java   |   6 +-
 .../save-pipeline/save-pipeline.component.html     |   2 +-
 .../save-pipeline/save-pipeline.component.ts       |   4 +-
 39 files changed, 864 insertions(+), 330 deletions(-)

diff --git a/.idea/runConfigurations/all_pe_jvm__edge_01_.xml b/.idea/runConfigurations/all_pe_jvm__edge_01_.xml
new file mode 100644
index 0000000..ae3a99a
--- /dev/null
+++ b/.idea/runConfigurations/all_pe_jvm__edge_01_.xml
@@ -0,0 +1,35 @@
+<component name="ProjectRunConfigurationManager">
+  <configuration default="false" name="all-pe-jvm (edge-01)" type="Application" factoryName="Application" folderName="edge-extensions">
+    <envs>
+      <env name="SP_PORT" value="7024" />
+      <env name="SP_HOST" value="edge-01.fzi.de" />
+      <env name="SP_DEBUG" value="true" />
+      <env name="SP_COUCHDB_HOST" value="localhost" />
+      <env name="SP_JMS_HOST" value="localhost" />
+      <env name="SP_JMS_PORT" value="61616" />
+      <env name="SP_DATA_LAKE_HOST" value="localhost" />
+      <env name="SP_DATA_LAKE_PORT" value="8086" />
+      <env name="SP_BACKEND_HOST" value="localhost" />
+      <env name="SP_BACKEND_PORT" value="8030" />
+      <env name="SP_NODE_ID" value="edge-01.node-controller" />
+      <env name="SP_NODE_CONTROLLER_HOST" value="edge-01.fzi.de" />
+      <env name="SP_NODE_CONTROLLER_PORT" value="7077" />
+    </envs>
+    <option name="MAIN_CLASS_NAME" value="org.apache.streampipes.pe.jvm.AllPipelineElementsInit" />
+    <module name="streampipes-pipeline-elements-all-jvm" />
+    <extension name="net.ashald.envfile">
+      <option name="IS_ENABLED" value="true" />
+      <option name="IS_SUBST" value="false" />
+      <option name="IS_PATH_MACRO_SUPPORTED" value="false" />
+      <option name="IS_IGNORE_MISSING_FILES" value="false" />
+      <option name="IS_ENABLE_EXPERIMENTAL_INTEGRATIONS" value="false" />
+      <ENTRIES>
+        <ENTRY IS_ENABLED="true" PARSER="runconfig" />
+        <ENTRY IS_ENABLED="true" PARSER="env" PATH="streampipes-pipeline-elements-all-jvm/development/env.secondary" />
+      </ENTRIES>
+    </extension>
+    <method v="2">
+      <option name="Make" enabled="true" />
+    </method>
+  </configuration>
+</component>
\ No newline at end of file
diff --git a/.idea/runConfigurations/all_pe_jvm__edge_02_.xml b/.idea/runConfigurations/all_pe_jvm__edge_02_.xml
new file mode 100644
index 0000000..15893a6
--- /dev/null
+++ b/.idea/runConfigurations/all_pe_jvm__edge_02_.xml
@@ -0,0 +1,35 @@
+<component name="ProjectRunConfigurationManager">
+  <configuration default="false" name="all-pe-jvm (edge-02)" type="Application" factoryName="Application" folderName="edge-extensions">
+    <envs>
+      <env name="SP_PORT" value="7025" />
+      <env name="SP_HOST" value="edge-02.fzi.de" />
+      <env name="SP_DEBUG" value="true" />
+      <env name="SP_COUCHDB_HOST" value="localhost" />
+      <env name="SP_JMS_HOST" value="localhost" />
+      <env name="SP_JMS_PORT" value="61616" />
+      <env name="SP_DATA_LAKE_HOST" value="localhost" />
+      <env name="SP_DATA_LAKE_PORT" value="8086" />
+      <env name="SP_BACKEND_HOST" value="localhost" />
+      <env name="SP_BACKEND_PORT" value="8030" />
+      <env name="SP_NODE_ID" value="edge-02.node-controller" />
+      <env name="SP_NODE_CONTROLLER_HOST" value="edge-02.fzi.de" />
+      <env name="SP_NODE_CONTROLLER_PORT" value="7078" />
+    </envs>
+    <option name="MAIN_CLASS_NAME" value="org.apache.streampipes.pe.jvm.AllPipelineElementsInit" />
+    <module name="streampipes-pipeline-elements-all-jvm" />
+    <extension name="net.ashald.envfile">
+      <option name="IS_ENABLED" value="true" />
+      <option name="IS_SUBST" value="false" />
+      <option name="IS_PATH_MACRO_SUPPORTED" value="false" />
+      <option name="IS_IGNORE_MISSING_FILES" value="false" />
+      <option name="IS_ENABLE_EXPERIMENTAL_INTEGRATIONS" value="false" />
+      <ENTRIES>
+        <ENTRY IS_ENABLED="true" PARSER="runconfig" />
+        <ENTRY IS_ENABLED="true" PARSER="env" PATH="streampipes-pipeline-elements-all-jvm/development/env.secondary" />
+      </ENTRIES>
+    </extension>
+    <method v="2">
+      <option name="Make" enabled="true" />
+    </method>
+  </configuration>
+</component>
\ No newline at end of file
diff --git a/.idea/runConfigurations/all_pe_jvm__fog_.xml b/.idea/runConfigurations/all_pe_jvm__fog_.xml
new file mode 100644
index 0000000..0ece8ec
--- /dev/null
+++ b/.idea/runConfigurations/all_pe_jvm__fog_.xml
@@ -0,0 +1,35 @@
+<component name="ProjectRunConfigurationManager">
+  <configuration default="false" name="all-pe-jvm (fog)" type="Application" factoryName="Application" folderName="edge-extensions">
+    <envs>
+      <env name="SP_PORT" value="7026" />
+      <env name="SP_HOST" value="fog.fzi.de" />
+      <env name="SP_DEBUG" value="true" />
+      <env name="SP_COUCHDB_HOST" value="localhost" />
+      <env name="SP_JMS_HOST" value="localhost" />
+      <env name="SP_JMS_PORT" value="61616" />
+      <env name="SP_DATA_LAKE_HOST" value="localhost" />
+      <env name="SP_DATA_LAKE_PORT" value="8086" />
+      <env name="SP_BACKEND_HOST" value="localhost" />
+      <env name="SP_BACKEND_PORT" value="8030" />
+      <env name="SP_NODE_ID" value="fog.node-controller" />
+      <env name="SP_NODE_CONTROLLER_HOST" value="fog.fzi.de" />
+      <env name="SP_NODE_CONTROLLER_PORT" value="7079" />
+    </envs>
+    <option name="MAIN_CLASS_NAME" value="org.apache.streampipes.pe.jvm.AllPipelineElementsInit" />
+    <module name="streampipes-pipeline-elements-all-jvm" />
+    <extension name="net.ashald.envfile">
+      <option name="IS_ENABLED" value="true" />
+      <option name="IS_SUBST" value="false" />
+      <option name="IS_PATH_MACRO_SUPPORTED" value="false" />
+      <option name="IS_IGNORE_MISSING_FILES" value="false" />
+      <option name="IS_ENABLE_EXPERIMENTAL_INTEGRATIONS" value="false" />
+      <ENTRIES>
+        <ENTRY IS_ENABLED="true" PARSER="runconfig" />
+        <ENTRY IS_ENABLED="true" PARSER="env" PATH="streampipes-pipeline-elements-all-jvm/development/env.secondary" />
+      </ENTRIES>
+    </extension>
+    <method v="2">
+      <option name="Make" enabled="true" />
+    </method>
+  </configuration>
+</component>
\ No newline at end of file
diff --git a/.idea/runConfigurations/all_pe_jvm__primary_.xml b/.idea/runConfigurations/all_pe_jvm__primary_.xml
new file mode 100644
index 0000000..ce8acb6
--- /dev/null
+++ b/.idea/runConfigurations/all_pe_jvm__primary_.xml
@@ -0,0 +1,21 @@
+<component name="ProjectRunConfigurationManager">
+  <configuration default="false" name="all-pe-jvm (primary)" type="Application" factoryName="Application" folderName="edge-extensions">
+    <option name="MAIN_CLASS_NAME" value="org.apache.streampipes.pe.jvm.AllPipelineElementsInit" />
+    <module name="streampipes-pipeline-elements-all-jvm" />
+    <envs>
+      <env name="SP_PORT" value="7023" />
+      <env name="SP_HOST" value="host.docker.internal" />
+      <env name="SP_DEBUG" value="true" />
+      <env name="SP_COUCHDB_HOST" value="localhost" />
+      <env name="SP_JMS_HOST" value="localhost" />
+      <env name="SP_JMS_PORT" value="61616" />
+      <env name="SP_DATA_LAKE_HOST" value="localhost" />
+      <env name="SP_DATA_LAKE_PORT" value="8086" />
+      <env name="SP_BACKEND_HOST" value="localhost" />
+      <env name="SP_BACKEND_PORT" value="8030" />
+    </envs>
+    <method v="2">
+      <option name="Make" enabled="true" />
+    </method>
+  </configuration>
+</component>
\ No newline at end of file
diff --git a/.idea/runConfigurations/node_controller_cloud.xml b/.idea/runConfigurations/node_controller_cloud.xml
new file mode 100644
index 0000000..6cfc672
--- /dev/null
+++ b/.idea/runConfigurations/node_controller_cloud.xml
@@ -0,0 +1,32 @@
+<component name="ProjectRunConfigurationManager">
+  <configuration default="false" name="node-controller-cloud" type="SpringBootApplicationConfigurationType" factoryName="Spring Boot" folderName="edge-extensions">
+    <module name="streampipes-node-controller-container" />
+    <extension name="net.ashald.envfile">
+      <option name="IS_ENABLED" value="true" />
+      <option name="IS_SUBST" value="false" />
+      <option name="IS_PATH_MACRO_SUPPORTED" value="false" />
+      <option name="IS_IGNORE_MISSING_FILES" value="false" />
+      <option name="IS_ENABLE_EXPERIMENTAL_INTEGRATIONS" value="false" />
+      <ENTRIES>
+        <ENTRY IS_ENABLED="true" PARSER="runconfig" />
+        <ENTRY IS_ENABLED="true" PARSER="env" PATH="$PROJECT_DIR$/../incubator-streampipes/streampipes-node-controller-container/development/env" />
+      </ENTRIES>
+    </extension>
+    <option name="SPRING_BOOT_MAIN_CLASS" value="org.apache.streampipes.node.controller.container.NodeControllerContainerInit" />
+    <option name="ALTERNATIVE_JRE_PATH" />
+    <envs>
+      <env name="SP_BACKEND_HOST" value="host.docker.internal" />
+      <env name="CONSUL_LOCATION" value="host.docker.internal" />
+      <env name="SP_NODE_CONTROLLER_ID" value="cloud.node-controller" />
+      <env name="SP_NODE_CONTROLLER_PORT" value="7080" />
+      <env name="SP_NODE_HOST" value="cloud.fzi.de" />
+      <env name="SP_NODE_TYPE" value="cloud" />
+      <env name="SP_NODE_HAS_GPU" value="true" />
+      <env name="SP_NODE_LOCATION_CLOUD" value="bwcloud" />
+      <env name="SP_DEBUG" value="true" />
+    </envs>
+    <method v="2">
+      <option name="Make" enabled="true" />
+    </method>
+  </configuration>
+</component>
\ No newline at end of file
diff --git a/.idea/runConfigurations/node_controller_edge_01.xml b/.idea/runConfigurations/node_controller_edge_01.xml
new file mode 100644
index 0000000..450f314
--- /dev/null
+++ b/.idea/runConfigurations/node_controller_edge_01.xml
@@ -0,0 +1,34 @@
+<component name="ProjectRunConfigurationManager">
+  <configuration default="false" name="node-controller-edge-01" type="SpringBootApplicationConfigurationType" factoryName="Spring Boot" folderName="edge-extensions">
+    <module name="streampipes-node-controller-container" />
+    <extension name="net.ashald.envfile">
+      <option name="IS_ENABLED" value="true" />
+      <option name="IS_SUBST" value="false" />
+      <option name="IS_PATH_MACRO_SUPPORTED" value="false" />
+      <option name="IS_IGNORE_MISSING_FILES" value="false" />
+      <option name="IS_ENABLE_EXPERIMENTAL_INTEGRATIONS" value="false" />
+      <ENTRIES>
+        <ENTRY IS_ENABLED="true" PARSER="runconfig" />
+        <ENTRY IS_ENABLED="true" PARSER="env" PATH="$PROJECT_DIR$/../incubator-streampipes/streampipes-node-controller-container/development/env" />
+      </ENTRIES>
+    </extension>
+    <option name="SPRING_BOOT_MAIN_CLASS" value="org.apache.streampipes.node.controller.container.NodeControllerContainerInit" />
+    <option name="ALTERNATIVE_JRE_PATH" />
+    <envs>
+      <env name="SP_BACKEND_HOST" value="host.docker.internal" />
+      <env name="CONSUL_LOCATION" value="host.docker.internal" />
+      <env name="SP_NODE_CONTROLLER_ID" value="edge-01.node-controller" />
+      <env name="SP_NODE_CONTROLLER_PORT" value="7077" />
+      <env name="SP_NODE_HOST" value="edge-01.fzi.de" />
+      <env name="SP_NODE_TYPE" value="edge" />
+      <env name="SP_NODE_ACCESSIBLE_SENSOR_ACTUATOR_ROS" value="ros;universalrobot;ipe-k3s-pi1.fzi.de;network" />
+      <env name="SP_NODE_HAS_GPU" value="false" />
+      <env name="SP_NODE_LOCATION_BUILDING" value="holl" />
+      <env name="SP_NODE_LOCATION_MACHINE" value="UR" />
+      <env name="SP_DEBUG" value="true" />
+    </envs>
+    <method v="2">
+      <option name="Make" enabled="true" />
+    </method>
+  </configuration>
+</component>
\ No newline at end of file
diff --git a/.idea/runConfigurations/node_controller_edge_02.xml b/.idea/runConfigurations/node_controller_edge_02.xml
new file mode 100644
index 0000000..c97ce50
--- /dev/null
+++ b/.idea/runConfigurations/node_controller_edge_02.xml
@@ -0,0 +1,33 @@
+<component name="ProjectRunConfigurationManager">
+  <configuration default="false" name="node-controller-edge-02" type="SpringBootApplicationConfigurationType" factoryName="Spring Boot" folderName="edge-extensions">
+    <module name="streampipes-node-controller-container" />
+    <extension name="net.ashald.envfile">
+      <option name="IS_ENABLED" value="true" />
+      <option name="IS_SUBST" value="false" />
+      <option name="IS_PATH_MACRO_SUPPORTED" value="false" />
+      <option name="IS_IGNORE_MISSING_FILES" value="false" />
+      <option name="IS_ENABLE_EXPERIMENTAL_INTEGRATIONS" value="false" />
+      <ENTRIES>
+        <ENTRY IS_ENABLED="true" PARSER="runconfig" />
+        <ENTRY IS_ENABLED="true" PARSER="env" PATH="$PROJECT_DIR$/../incubator-streampipes/streampipes-node-controller-container/development/env" />
+      </ENTRIES>
+    </extension>
+    <option name="SPRING_BOOT_MAIN_CLASS" value="org.apache.streampipes.node.controller.container.NodeControllerContainerInit" />
+    <option name="ALTERNATIVE_JRE_PATH" />
+    <envs>
+      <env name="SP_BACKEND_HOST" value="host.docker.internal" />
+      <env name="CONSUL_LOCATION" value="host.docker.internal" />
+      <env name="SP_NODE_CONTROLLER_ID" value="edge-02.node-controller" />
+      <env name="SP_NODE_CONTROLLER_PORT" value="7078" />
+      <env name="SP_NODE_HOST" value="edge-02.fzi.de" />
+      <env name="SP_NODE_TYPE" value="edge" />
+      <env name="SP_NODE_ACCESSIBLE_SENSOR_ACTUATOR_ZED" value="zed;sensor;/dev/video1;usb" />
+      <env name="SP_NODE_HAS_GPU" value="true" />
+      <env name="SP_NODE_LOCATION_BUILDING" value="holl" />
+      <env name="SP_DEBUG" value="true" />
+    </envs>
+    <method v="2">
+      <option name="Make" enabled="true" />
+    </method>
+  </configuration>
+</component>
\ No newline at end of file
diff --git a/.idea/runConfigurations/node_controller_fog.xml b/.idea/runConfigurations/node_controller_fog.xml
new file mode 100644
index 0000000..041e321
--- /dev/null
+++ b/.idea/runConfigurations/node_controller_fog.xml
@@ -0,0 +1,32 @@
+<component name="ProjectRunConfigurationManager">
+  <configuration default="false" name="node-controller-fog" type="SpringBootApplicationConfigurationType" factoryName="Spring Boot" folderName="edge-extensions">
+    <module name="streampipes-node-controller-container" />
+    <extension name="net.ashald.envfile">
+      <option name="IS_ENABLED" value="true" />
+      <option name="IS_SUBST" value="false" />
+      <option name="IS_PATH_MACRO_SUPPORTED" value="false" />
+      <option name="IS_IGNORE_MISSING_FILES" value="false" />
+      <option name="IS_ENABLE_EXPERIMENTAL_INTEGRATIONS" value="false" />
+      <ENTRIES>
+        <ENTRY IS_ENABLED="true" PARSER="runconfig" />
+        <ENTRY IS_ENABLED="true" PARSER="env" PATH="$PROJECT_DIR$/../incubator-streampipes/streampipes-node-controller-container/development/env" />
+      </ENTRIES>
+    </extension>
+    <option name="SPRING_BOOT_MAIN_CLASS" value="org.apache.streampipes.node.controller.container.NodeControllerContainerInit" />
+    <option name="ALTERNATIVE_JRE_PATH" />
+    <envs>
+      <env name="SP_BACKEND_HOST" value="host.docker.internal" />
+      <env name="CONSUL_LOCATION" value="host.docker.internal" />
+      <env name="SP_NODE_CONTROLLER_ID" value="fog.node-controller" />
+      <env name="SP_NODE_CONTROLLER_PORT" value="7079" />
+      <env name="SP_NODE_HOST" value="fog.fzi.de" />
+      <env name="SP_NODE_TYPE" value="fog" />
+      <env name="SP_NODE_HAS_GPU" value="true" />
+      <env name="SP_NODE_LOCATION_BUILDING" value="main" />
+      <env name="SP_DEBUG" value="true" />
+    </envs>
+    <method v="2">
+      <option name="Make" enabled="true" />
+    </method>
+  </configuration>
+</component>
\ No newline at end of file
diff --git a/streampipes-container-standalone/src/main/java/org/apache/streampipes/container/standalone/init/StandaloneModelSubmitter.java b/streampipes-container-standalone/src/main/java/org/apache/streampipes/container/standalone/init/StandaloneModelSubmitter.java
index 377042f..0d28466 100644
--- a/streampipes-container-standalone/src/main/java/org/apache/streampipes/container/standalone/init/StandaloneModelSubmitter.java
+++ b/streampipes-container-standalone/src/main/java/org/apache/streampipes/container/standalone/init/StandaloneModelSubmitter.java
@@ -19,6 +19,7 @@
 package org.apache.streampipes.container.standalone.init;
 
 
+import org.apache.streampipes.container.util.NodeControllerUtil;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.springframework.boot.SpringApplication;
@@ -56,17 +57,22 @@ public abstract class StandaloneModelSubmitter extends ModelSubmitter {
         app.setDefaultProperties(Collections.singletonMap("server.port", peConfig.getPort()));
         app.run();
 
-        ConsulUtil.registerPeService(
-                peConfig.getId(),
-                peConfig.getHost(),
-                peConfig.getPort()
-        );
-
-//        NodeUtil.registerPeService(
-//                peConfig.getId(),
-//                peConfig.getHost(),
-//                peConfig.getPort()
-//        );
+        // check wether pipeline element is managed by node controller
+        if (System.getenv("SP_NODE_ID") != null) {
+            // secondary
+            // register pipeline element service via node controller
+            NodeControllerUtil.register(
+                    peConfig.getId(),
+                    peConfig.getHost(),
+                    peConfig.getPort(),
+                    DeclarersSingleton.getInstance().getEpaDeclarers());
+        } else {
+            // primary
+            ConsulUtil.registerPeService(
+                    peConfig.getId(),
+                    peConfig.getHost(),
+                    peConfig.getPort());
+        }
     }
 
     @PreDestroy
diff --git a/streampipes-container/src/main/java/org/apache/streampipes/container/model/node/InvocableRegistration.java b/streampipes-container/src/main/java/org/apache/streampipes/container/model/node/InvocableRegistration.java
new file mode 100644
index 0000000..4605b6c
--- /dev/null
+++ b/streampipes-container/src/main/java/org/apache/streampipes/container/model/node/InvocableRegistration.java
@@ -0,0 +1,52 @@
+package org.apache.streampipes.container.model.node;/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+import org.apache.streampipes.container.model.consul.ConsulServiceRegistrationBody;
+
+import java.util.List;
+
+public class InvocableRegistration {
+
+    private ConsulServiceRegistrationBody consulServiceRegistrationBody;
+    private List<String> supportedPipelineElementAppIds;
+
+    public InvocableRegistration() {
+    }
+
+    public InvocableRegistration(ConsulServiceRegistrationBody consulServiceRegistrationBody,
+                                 List<String> supportedPipelineElementAppIds) {
+        this.consulServiceRegistrationBody = consulServiceRegistrationBody;
+        this.supportedPipelineElementAppIds = supportedPipelineElementAppIds;
+    }
+
+    public ConsulServiceRegistrationBody getConsulServiceRegistrationBody() {
+        return consulServiceRegistrationBody;
+    }
+
+    public void setConsulServiceRegistrationBody(ConsulServiceRegistrationBody consulServiceRegistrationBody) {
+        this.consulServiceRegistrationBody = consulServiceRegistrationBody;
+    }
+
+    public List<String> getSupportedPipelineElementAppIds() {
+        return supportedPipelineElementAppIds;
+    }
+
+    public void setSupportedPipelineElementAppIds(List<String> supportedPipelineElementAppIds) {
+        this.supportedPipelineElementAppIds = supportedPipelineElementAppIds;
+    }
+}
diff --git a/streampipes-container/src/main/java/org/apache/streampipes/container/util/NodeControllerUtil.java b/streampipes-container/src/main/java/org/apache/streampipes/container/util/NodeControllerUtil.java
index b74a422..fd99736 100644
--- a/streampipes-container/src/main/java/org/apache/streampipes/container/util/NodeControllerUtil.java
+++ b/streampipes-container/src/main/java/org/apache/streampipes/container/util/NodeControllerUtil.java
@@ -16,105 +16,117 @@ package org.apache.streampipes.container.util;/*
  *
  */
 
-import com.google.gson.Gson;
-import org.apache.http.HttpHeaders;
-import org.apache.http.client.HttpClient;
-import org.apache.http.client.methods.HttpUriRequest;
-import org.apache.http.client.methods.RequestBuilder;
-import org.apache.http.entity.StringEntity;
-import org.apache.http.impl.client.HttpClients;
+import com.fasterxml.jackson.core.JsonProcessingException;
+import org.apache.http.client.fluent.Request;
+import org.apache.http.entity.ContentType;
+import org.apache.streampipes.container.declarer.SemanticEventProcessingAgentDeclarer;
+import org.apache.streampipes.container.model.node.InvocableRegistration;
 import org.apache.streampipes.container.model.consul.ConsulServiceRegistrationBody;
 import org.apache.streampipes.container.model.consul.HealthCheckConfiguration;
+import org.apache.streampipes.serializers.json.JacksonSerializer;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
-import java.net.MalformedURLException;
-import java.net.URL;
-import java.util.Arrays;
-import java.util.List;
-import java.util.Map;
+import java.util.*;
 
 public class NodeControllerUtil {
     static Logger LOG = LoggerFactory.getLogger(NodeControllerUtil.class);
 
     private static final String PROTOCOL = "http://";
-
+    private static final String COLON = ":";
+    private static final String SLASH = "/";
     private static final String HEALTH_CHECK_INTERVAL = "10s";
     private static final String PE_SERVICE_NAME = "pe";
-    private static final String PRIMARY_PE_IDENTIFIER = "primary";
-    private static final String SECONDARY_PE_IDENTIFIER = "secondary";
-    private static final String NODE_ID_IDENTIFIER = "SP_NODE_ID";
-    private static final String SLASH = "/";
+    private static final String PE_SECONDARY_TAG = "secondary";
+    private static final String NODE_CONTROLLER_REGISTER_SVC_URL = "node/container/register";
+
+    public static void register(String serviceID, String host, int port,
+                                Map<String, SemanticEventProcessingAgentDeclarer> epaDeclarers) {
+        register(PE_SERVICE_NAME, makeSvcId(host, serviceID), host, port,
+                Arrays.asList(PE_SERVICE_NAME, PE_SECONDARY_TAG), epaDeclarers);
+    }
+
+    public static void register(String svcName, String svcId, String url, int port, List<String> tag,
+                                Map<String, SemanticEventProcessingAgentDeclarer> epaDeclarers) {
 
-    private static final String NODE_CONTROLLER_LOCATION = "SP_NODE_ID";
-    private static final String NODE_CONTROLLER_REGISTER_URL = "node/pe/container/register";
+        boolean connected = false;
 
-    // dummy class to route consul registration request to node controller instead of consul
+        while (!connected) {
+            LOG.info("Trying to register pipeline element container at node controller: " + makeRegistrationEndpoint());
+            String body = createSvcBody(svcName, svcId, url, port, tag, epaDeclarers);
+            connected = registerSvcHttpClient(body);
 
-    public static void registerPeService(String serviceID, String url, int port) {
-        String serviceLocationTag = System.getenv(NODE_ID_IDENTIFIER) == null ? PRIMARY_PE_IDENTIFIER : SECONDARY_PE_IDENTIFIER;
-        String uniquePEServiceId = url + SLASH + serviceID;
-        registerService(PE_SERVICE_NAME, uniquePEServiceId, url, port, Arrays.asList("pe", serviceLocationTag));
+            if (!connected) {
+                LOG.info("Retrying in 5 seconds");
+                try {
+                    Thread.sleep(5000);
+                } catch (InterruptedException e) {
+                    e.printStackTrace();
+                }
+            }
+        }
+        LOG.info("Successfully registered pipeline element container: " + svcId);
     }
 
-    public static void registerService(String serviceName, String serviceID, String url, int port, List<String> tag) {
-        String body = createServiceRegisterBody(serviceName, serviceID, url, port, tag);
+    private static boolean registerSvcHttpClient(String body) {
+        String endpoint = makeRegistrationEndpoint();
         try {
-            registerServiceHttpClient(body);
-            LOG.info("Register service " + serviceID +" successful");
+            Request.Post(makeRegistrationEndpoint())
+                    .bodyString(body, ContentType.APPLICATION_JSON)
+                    .connectTimeout(1000)
+                    .socketTimeout(100000)
+                    .execute();
+            return true;
         } catch (IOException e) {
-            LOG.error("Register service: " + serviceID, " - " + e.toString());
+            LOG.error("Could not register at " + endpoint);
         }
+        return false;
     }
 
-    private static void registerServiceHttpClient(String body) throws IOException {
-//        return Request.Post("http://localhost:7077/node/pe/container/register")
-//                .addHeader("accept", "application/json")
-//                .body(new StringEntity(body))
-//                .execute()
-//                .returnResponse()
-//                .getStatusLine().getStatusCode();
-        HttpClient client = HttpClients.custom().build();
-        HttpUriRequest request = RequestBuilder.post()
-                .setUri(nodeControllerURL().toString() + SLASH + NODE_CONTROLLER_REGISTER_URL)
-                .setEntity(new StringEntity(body))
-                .setHeader(HttpHeaders.CONTENT_TYPE, "application/json")
-                .build();
-        client.execute(request);
-    }
+    private static String createSvcBody(String name, String id, String url, int port, List<String> tags,
+                                        Map<String, SemanticEventProcessingAgentDeclarer> epaDeclarers) {
+        try {
+            ConsulServiceRegistrationBody body = new ConsulServiceRegistrationBody();
+            String healthCheckURL = PROTOCOL + url + COLON + port;
+            body.setID(id);
+            body.setName(name);
+            body.setTags(tags);
+            body.setAddress(PROTOCOL + url);
+            body.setPort(port);
+            body.setEnableTagOverride(true);
+            body.setCheck(new HealthCheckConfiguration("GET", healthCheckURL, HEALTH_CHECK_INTERVAL));
 
-    private static String createServiceRegisterBody(String name, String id, String url, int port, List<String> tags) {
-        String healthCheckURL = PROTOCOL + url + ":" + port;
-        ConsulServiceRegistrationBody body = new ConsulServiceRegistrationBody();
-        body.setID(id);
-        body.setName(name);
-        body.setTags(tags);
-        body.setAddress(PROTOCOL + url);
-        body.setPort(port);
-        body.setEnableTagOverride(true);
-        body.setCheck(new HealthCheckConfiguration("GET", healthCheckURL, HEALTH_CHECK_INTERVAL));
+            InvocableRegistration svcBody = new InvocableRegistration();
+            svcBody.setConsulServiceRegistrationBody(body);
+            svcBody.setSupportedPipelineElementAppIds(new ArrayList<>(epaDeclarers.keySet()));
 
-        return new Gson().toJson(body);
+            return JacksonSerializer.getObjectMapper().writeValueAsString(svcBody);
+        } catch (JsonProcessingException e) {
+            e.printStackTrace();
+        }
+        throw new IllegalArgumentException("Failure");
     }
 
-    private static URL nodeControllerURL() {
-        Map<String, String> env = System.getenv();
-        URL url = null;
-
-        if (env.containsKey(NODE_CONTROLLER_LOCATION)) {
-            try {
-                url = new URL("http", env.get(NODE_CONTROLLER_LOCATION), 7077, "");
-            } catch (MalformedURLException e) {
-                e.printStackTrace();
-            }
+    private static String makeRegistrationEndpoint() {
+        if (System.getenv("SP_NODE_CONTROLLER_HOST") != null) {
+            return PROTOCOL
+                    + System.getenv("SP_NODE_CONTROLLER_HOST")
+                    + COLON
+                    + System.getenv("SP_NODE_CONTROLLER_PORT")
+                    + SLASH
+                    + NODE_CONTROLLER_REGISTER_SVC_URL;
         } else {
-            try {
-                url = new URL("http", "localhost", 7077, "");
-            } catch (MalformedURLException e) {
-                e.printStackTrace();
-            }
+            return PROTOCOL
+                    + "localhost"
+                    + COLON
+                    + "7077"
+                    + SLASH
+                    + NODE_CONTROLLER_REGISTER_SVC_URL;
         }
-        return url;
+    }
+
+    private static String makeSvcId(String host, String serviceID) {
+        return host + SLASH + serviceID;
     }
 }
diff --git a/streampipes-model/src/main/java/org/apache/streampipes/model/node/NodeInfoBuilder.java b/streampipes-model/src/main/java/org/apache/streampipes/model/node/NodeInfoBuilder.java
index d8a62ea..2986d4f 100644
--- a/streampipes-model/src/main/java/org/apache/streampipes/model/node/NodeInfoBuilder.java
+++ b/streampipes-model/src/main/java/org/apache/streampipes/model/node/NodeInfoBuilder.java
@@ -60,6 +60,11 @@ public class NodeInfoBuilder {
         return this;
     }
 
+    public NodeInfoBuilder withNodeType(String nodeType) {
+        this.nodeMetadata.setNodeType(nodeType);
+        return this;
+    }
+
     public NodeInfoBuilder withNodeLocation(List<String> nodeLocationTags) {
         this.nodeMetadata.setNodeLocationTags(nodeLocationTags);
         return this;
diff --git a/streampipes-model/src/main/java/org/apache/streampipes/model/node/NodeMetadata.java b/streampipes-model/src/main/java/org/apache/streampipes/model/node/NodeMetadata.java
index 9195389..098d02b 100644
--- a/streampipes-model/src/main/java/org/apache/streampipes/model/node/NodeMetadata.java
+++ b/streampipes-model/src/main/java/org/apache/streampipes/model/node/NodeMetadata.java
@@ -26,17 +26,27 @@ public class NodeMetadata {
 
     private String nodeAddress;
     private String nodeModel;
+    private String nodeType;
     private List<String> nodeLocationTags;
 
     public NodeMetadata() {
     }
 
-    public NodeMetadata(String nodeAddress, String nodeModel, List<String> nodeLocationTags) {
+    public NodeMetadata(String nodeAddress, String nodeModel, String nodeType, List<String> nodeLocationTags) {
         this.nodeAddress = nodeAddress;
         this.nodeModel = nodeModel;
+        this.nodeType = nodeType;
         this.nodeLocationTags = nodeLocationTags;
     }
 
+    public String getNodeType() {
+        return nodeType;
+    }
+
+    public void setNodeType(String nodeType) {
+        this.nodeType = nodeType;
+    }
+
     public String getNodeAddress() {
         return nodeAddress;
     }
diff --git a/streampipes-model/src/main/java/org/apache/streampipes/model/node/PipelineElementDockerContainer.java b/streampipes-model/src/main/java/org/apache/streampipes/model/node/PipelineElementDockerContainer.java
index 17ad3f3..77641fa 100644
--- a/streampipes-model/src/main/java/org/apache/streampipes/model/node/PipelineElementDockerContainer.java
+++ b/streampipes-model/src/main/java/org/apache/streampipes/model/node/PipelineElementDockerContainer.java
@@ -1,4 +1,4 @@
-package org.apache.streampipes.model.node;/*
+/*
  * Licensed to the Apache Software Foundation (ASF) under one or more
  * contributor license agreements.  See the NOTICE file distributed with
  * this work for additional information regarding copyright ownership.
@@ -15,10 +15,12 @@ package org.apache.streampipes.model.node;/*
  * limitations under the License.
  *
  */
+package org.apache.streampipes.model.node;
 
 import io.fogsy.empire.annotations.RdfProperty;
 import io.fogsy.empire.annotations.RdfsClass;
 import org.apache.streampipes.model.base.UnnamedStreamPipesEntity;
+import org.apache.streampipes.model.shared.annotation.TsModel;
 import org.apache.streampipes.vocabulary.StreamPipes;
 
 import javax.persistence.Entity;
@@ -28,6 +30,7 @@ import java.util.Map;
 
 @RdfsClass(StreamPipes.PE_DOCKER_CONTAINER)
 @Entity
+@TsModel
 public class PipelineElementDockerContainer extends UnnamedStreamPipesEntity {
 
     @RdfProperty(StreamPipes.PE_DOCKER_CONTAINER_IMAGE_URI)
@@ -68,12 +71,12 @@ public class PipelineElementDockerContainer extends UnnamedStreamPipesEntity {
 
     public PipelineElementDockerContainer(PipelineElementDockerContainer other) {
         super(other);
-        this.imageURI = imageURI;
-        this.containerName = containerName;
-        this.serviceId = serviceId;
-        this.containerPorts = containerPorts;
-        this.envVars = envVars;
-        this.labels = labels;
+        this.imageURI = other.getImageURI();
+        this.containerName = other.getContainerName();
+        this.serviceId = other.getServiceId();
+        this.containerPorts = other.getContainerPorts();
+        this.envVars = other.getEnvVars();
+        this.labels = other.getLabels();
     }
 
     public String getImageURI() {
diff --git a/streampipes-model/src/main/java/org/apache/streampipes/model/node/PipelineElementDockerContainerBuilder.java b/streampipes-model/src/main/java/org/apache/streampipes/model/node/PipelineElementDockerContainerBuilder.java
new file mode 100644
index 0000000..5e9222b
--- /dev/null
+++ b/streampipes-model/src/main/java/org/apache/streampipes/model/node/PipelineElementDockerContainerBuilder.java
@@ -0,0 +1,73 @@
+package org.apache.streampipes.model.node;/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+import java.util.*;
+
+public class PipelineElementDockerContainerBuilder {
+
+    private PipelineElementDockerContainer pipelineElementDockerContainer;
+    private String imageURI;
+    private String containerName;
+    private String serviceId;
+    private final String [] containerPorts;
+    private List<String> envVars;
+    private Map<String, String> labels;
+
+    public PipelineElementDockerContainerBuilder(String id) {
+        this.pipelineElementDockerContainer = new PipelineElementDockerContainer();
+        this.pipelineElementDockerContainer.setServiceId(id);
+        this.imageURI = "";
+        this.containerName = "";
+        this.containerPorts = new String[]{};
+        this.envVars = new ArrayList<>();
+        this.labels = new HashMap<>();
+    }
+
+    public static PipelineElementDockerContainerBuilder create(String id) {
+        return new PipelineElementDockerContainerBuilder(id);
+    }
+
+    public PipelineElementDockerContainerBuilder withImage(String imageUri) {
+        this.pipelineElementDockerContainer.setImageURI(imageUri);
+        return this;
+    }
+
+    public PipelineElementDockerContainerBuilder withName(String name) {
+        this.pipelineElementDockerContainer.setContainerName(name);
+        return this;
+    }
+
+    public PipelineElementDockerContainerBuilder withExposedPorts(String[] ports) {
+        this.pipelineElementDockerContainer.setContainerPorts(ports);
+        return this;
+    }
+
+    public PipelineElementDockerContainerBuilder withEnvironmentVariables(List<String> envs) {
+        this.pipelineElementDockerContainer.setEnvVars(envs);
+        return this;
+    }
+
+    public PipelineElementDockerContainerBuilder withLabels(Map<String, String> labels) {
+        this.pipelineElementDockerContainer.setLabels(labels);
+        return this;
+    }
+
+    public PipelineElementDockerContainer build() {
+        return pipelineElementDockerContainer;
+    }
+
+}
diff --git a/streampipes-node-controller-container/pom.xml b/streampipes-node-controller-container/pom.xml
index 9f274d6..aba22fd 100644
--- a/streampipes-node-controller-container/pom.xml
+++ b/streampipes-node-controller-container/pom.xml
@@ -117,6 +117,16 @@
             <artifactId>streampipes-messaging-kafka</artifactId>
             <version>0.68.0-SNAPSHOT</version>
         </dependency>
+        <dependency>
+            <groupId>org.apache.streampipes</groupId>
+            <artifactId>streampipes-rest-shared</artifactId>
+            <version>0.68.0-SNAPSHOT</version>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.streampipes</groupId>
+            <artifactId>streampipes-rest-shared</artifactId>
+            <version>0.68.0-SNAPSHOT</version>
+        </dependency>
     </dependencies>
 
     <build>
diff --git a/streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/NodeControllerContainer.java b/streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/NodeControllerContainerInit.java
similarity index 81%
rename from streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/NodeControllerContainer.java
rename to streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/NodeControllerContainerInit.java
index 8210dc4..ece533b 100644
--- a/streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/NodeControllerContainer.java
+++ b/streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/NodeControllerContainerInit.java
@@ -18,6 +18,9 @@ package org.apache.streampipes.node.controller.container;
  */
 
 import org.apache.streampipes.container.util.ConsulUtil;
+import org.apache.streampipes.model.node.PipelineElementDockerContainer;
+import org.apache.streampipes.node.controller.container.management.orchestrator.docker.DockerContainerOrchestrator;
+import org.apache.streampipes.node.controller.container.management.pe.PipelineElementManager;
 import org.apache.streampipes.node.controller.container.rest.NodeControllerResourceConfig;
 import org.apache.streampipes.node.controller.container.config.NodeControllerConfig;
 import org.apache.streampipes.node.controller.container.management.info.NodeInfoStorage;
@@ -36,16 +39,16 @@ import java.util.Collections;
 @Configuration
 @EnableAutoConfiguration
 @Import({ NodeControllerResourceConfig.class })
-public class NodeControllerContainer {
+public class NodeControllerContainerInit {
 
     private static final Logger LOG =
-            LoggerFactory.getLogger(NodeControllerContainer.class.getCanonicalName());
+            LoggerFactory.getLogger(NodeControllerContainerInit.class.getCanonicalName());
 
     public static void main(String [] args) {
 
         NodeControllerConfig nodeConfig = NodeControllerConfig.INSTANCE;
 
-        SpringApplication app = new SpringApplication(NodeControllerContainer.class);
+        SpringApplication app = new SpringApplication(NodeControllerContainerInit.class);
         app.setDefaultProperties(Collections.singletonMap("server.port", nodeConfig.getNodeControllerPort()));
         app.run();
 
@@ -58,13 +61,17 @@ public class NodeControllerContainer {
         LOG.info("Start Node Resource manager");
         ResourceManager.getInstance().run();
 
+        if (!"true".equals(System.getenv("SP_DEBUG"))) {
+            LOG.info("Auto-deploy StreamPipes node container");
+            DockerContainerOrchestrator.getInstance().init();
+        }
+
         // registration with consul here
         ConsulUtil.registerNodeControllerService(
                 nodeConfig.getNodeServiceId(),
                 nodeConfig.getNodeHostName(),
                 nodeConfig.getNodeControllerPort()
         );
-
     }
 
     @PreDestroy
diff --git a/streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/config/ConfigKeys.java b/streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/config/ConfigKeys.java
index e3ea78e..b2eef3c 100644
--- a/streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/config/ConfigKeys.java
+++ b/streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/config/ConfigKeys.java
@@ -1,4 +1,3 @@
-package org.apache.streampipes.node.controller.container.config;
 /*
  * Licensed to the Apache Software Foundation (ASF) under one or more
  * contributor license agreements.  See the NOTICE file distributed with
@@ -16,20 +15,22 @@ package org.apache.streampipes.node.controller.container.config;
  * limitations under the License.
  *
  */
+package org.apache.streampipes.node.controller.container.config;
 
 public class ConfigKeys {
-    final static String NODE_CONTROLLER_ID_KEY = "SP_NODE_CONTROLLER_ID";
-    final static String NODE_CONTROLLER_PORT_KEY = "SP_NODE_CONTROLLER_PORT";
-    final static String NODE_HOST_KEY = "SP_NODE_HOST";
-    final static String NODE_BROKER_HOST_KEY = "SP_NODE_BROKER_HOST";
-    final static String NODE_BROKER_PORT_KEY = "SP_NODE_BROKER_PORT";
-    final static String NODE_LOCATION_KEY = "SP_NODE_LOCATION";
-    final static String NODE_HAS_GPU_KEY = "SP_NODE_HAS_GPU";
-    final static String NODE_GPU_CUDA_CORES_KEY = "SP_NODE_GPU_CUDA_CORES";
-    final static String NODE_GPU_TYPE_KEY = "SP_NODE_GPU_TYPE";
-    final static String NODE_ACCESSIBLE_SENSOR_ACTUATOR_KEY = "SP_NODE_ACCESSIBLE_SENSOR_ACTUATOR";
-    final static String NODE_SUPPORTED_PE_APP_ID_KEY = "SP_NODE_SUPPORTED_PE_APP_ID";
-    final static String DOCKER_PRUNING_FREQ_SECS_KEY = "SP_DOCKER_PRUNING_FREQ_SECS";
-    final static String NODE_RESOURCE_UPDATE_FREQ_SECS_KEY = "SP_NODE_RESOURCE_UPDATE_FREQ_SECS";
-    final static String NODE_EVENT_BUFFER_SIZE = "SP_NODE_EVENT_BUFFER_SIZE";
+    public static final String NODE_TYPE = "SP_NODE_TYPE";
+    public static final String NODE_CONTROLLER_ID_KEY = "SP_NODE_CONTROLLER_ID";
+    public static final String NODE_CONTROLLER_PORT_KEY = "SP_NODE_CONTROLLER_PORT";
+    public static final String NODE_HOST_NAME_KEY = "SP_NODE_HOST";
+    public static final String NODE_BROKER_HOST_KEY = "SP_NODE_BROKER_HOST";
+    public static final String NODE_BROKER_PORT_KEY = "SP_NODE_BROKER_PORT";
+    public static final String NODE_LOCATION_KEY = "SP_NODE_LOCATION";
+    public static final String NODE_HAS_GPU_KEY = "SP_NODE_HAS_GPU";
+    public static final String NODE_GPU_CUDA_CORES_KEY = "SP_NODE_GPU_CUDA_CORES";
+    public static final String NODE_GPU_TYPE_KEY = "SP_NODE_GPU_TYPE";
+    public static final String NODE_ACCESSIBLE_SENSOR_ACTUATOR_KEY = "SP_NODE_ACCESSIBLE_SENSOR_ACTUATOR";
+    public static final String NODE_SUPPORTED_PE_APP_ID_KEY = "SP_NODE_SUPPORTED_PE_APP_ID";
+    public static final String DOCKER_PRUNING_FREQ_SECS_KEY = "SP_DOCKER_PRUNING_FREQ_SECS";
+    public static final String NODE_RESOURCE_UPDATE_FREQ_SECS_KEY = "SP_NODE_RESOURCE_UPDATE_FREQ_SECS";
+    public static final String NODE_EVENT_BUFFER_SIZE = "SP_NODE_EVENT_BUFFER_SIZE";
 }
diff --git a/streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/config/NodeControllerConfig.java b/streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/config/NodeControllerConfig.java
index 4f9e32f..139d88d 100644
--- a/streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/config/NodeControllerConfig.java
+++ b/streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/config/NodeControllerConfig.java
@@ -28,87 +28,69 @@ public enum NodeControllerConfig {
     private SpConfig config;
 
     private static final String SLASH = "/";
-    private static final String DOT = ".";
-    private static final String DEFAULT_NODE_BROKER_NAME_SUFFIX = "broker";
-    private static final String node_service_id = "node/org.apache.streampipes.node.controller";
+    private static final String NODE_SERVICE_ID = "node/org.apache.streampipes.node.controller";
 
     private static final String DEFAULT_NODE_CONTROLLER_ID = "node-controller";
     private static final int DEFAULT_NODE_CONTROLLER_PORT = 7077;
+    private static final String DEFAULT_NODE_TYPE = "edge";
     private static final String DEFAULT_NODE_BROKER_HOST = "node-broker";
     private static final int DEFAULT_NODE_BROKER_PORT = 1883;
     private static final String DEFAULT_NODE_HOST_NAME = "host.docker.internal";
 
     // Node controller configs
-    private static final int DEFAULT_DOCKER_PRUNING_FREQ_SECS = 60;
+    private static final int DEFAULT_DOCKER_PRUNING_FREQ_SECS = 3600;
     private static final int DEFAULT_NODE_RESOURCE_UPDATE_FREQ_SECS = 30;
     private static final int DEFAULT_EVENT_BUFFER_SIZE = 1000;
 
     NodeControllerConfig() {
-        config = SpConfig.getSpConfig(node_service_id + SLASH + getNodeHostName());
+        config = SpConfig.getSpConfig(NODE_SERVICE_ID + SLASH + getNodeHostName());
 
         config.register(ConfigKeys.NODE_CONTROLLER_ID_KEY, DEFAULT_NODE_CONTROLLER_ID, "node controller id");
-        config.register(ConfigKeys.NODE_CONTROLLER_PORT_KEY,DEFAULT_NODE_CONTROLLER_PORT, "node controller port");
-        config.register(ConfigKeys.NODE_HOST_KEY, "host.docker.internal", "node host name");
-        config.register(ConfigKeys.NODE_LOCATION_KEY, "", "node location");
+        config.register(ConfigKeys.NODE_CONTROLLER_PORT_KEY, DEFAULT_NODE_CONTROLLER_PORT, "node controller port");
+        config.register(ConfigKeys.NODE_HOST_NAME_KEY, "host.docker.internal", "node host name");
+        config.register(ConfigKeys.NODE_TYPE, "edge", "node type");
         config.register(ConfigKeys.NODE_BROKER_HOST_KEY, DEFAULT_NODE_BROKER_HOST, "node broker host");
         config.register(ConfigKeys.NODE_BROKER_PORT_KEY, DEFAULT_NODE_BROKER_PORT, "node broker port");
-
     }
 
     public String getNodeServiceId() {
-        return node_service_id;
+        return NODE_SERVICE_ID;
     }
 
-    /**
-     *
-     * @return node host name (physical DNS name or IP)
-     */
     public String getNodeHostName(){
-        return envExist(ConfigKeys.NODE_HOST_KEY) ? getEnvAsString(ConfigKeys.NODE_HOST_KEY) :
-                DEFAULT_NODE_HOST_NAME;
+        return getEnvOrDefault(ConfigKeys.NODE_HOST_NAME_KEY,
+                DEFAULT_NODE_HOST_NAME,
+                String.class);
     }
 
-    /**
-     *
-     * @return node-controller id
-     */
     public String getNodeControllerId() {
-        return envExist(ConfigKeys.NODE_CONTROLLER_ID_KEY) ? getEnvAsString(ConfigKeys.NODE_CONTROLLER_ID_KEY) :
-                DEFAULT_NODE_BROKER_HOST;
+        return getEnvOrDefault(ConfigKeys.NODE_CONTROLLER_ID_KEY,
+                DEFAULT_NODE_CONTROLLER_ID,
+                String.class);
     }
 
-    /**
-     *
-     * @return node-controller port
-     */
     public int getNodeControllerPort(){
-        return envExist(ConfigKeys.NODE_CONTROLLER_PORT_KEY) ? getEnvAsInteger(ConfigKeys.NODE_CONTROLLER_PORT_KEY) :
-                DEFAULT_NODE_CONTROLLER_PORT;
+        return getEnvOrDefault(
+                ConfigKeys.NODE_CONTROLLER_PORT_KEY,
+                DEFAULT_NODE_CONTROLLER_PORT,
+                Integer.class);
     }
 
-    /**
-     *
-     * @return node broker host
-     */
     public String getNodeBrokerHost() {
-        return envExist(ConfigKeys.NODE_BROKER_HOST_KEY) ? getEnvAsString(ConfigKeys.NODE_BROKER_HOST_KEY) :
-                DEFAULT_NODE_BROKER_HOST;
+        return getEnvOrDefault(
+                ConfigKeys.NODE_BROKER_HOST_KEY,
+                DEFAULT_NODE_BROKER_HOST,
+                String.class);
     }
 
-    /**
-     *
-     * @return node broker port
-     */
     // TODO: should be flexibly set due to node broker technology used
     public int getNodeBrokerPort() {
-        return envExist(ConfigKeys.NODE_BROKER_PORT_KEY) ? getEnvAsInteger(ConfigKeys.NODE_BROKER_PORT_KEY) :
-                DEFAULT_NODE_BROKER_PORT;
+        return getEnvOrDefault(
+                ConfigKeys.NODE_BROKER_PORT_KEY,
+                DEFAULT_NODE_BROKER_PORT,
+                Integer.class);
     }
 
-    /**
-     *
-     * @return node location tags
-     */
     public List<String> getNodeLocations() {
         return System.getenv()
                 .entrySet()
@@ -118,10 +100,6 @@ public enum NodeControllerConfig {
                 .collect(Collectors.toList());
     }
 
-    /**
-     *
-     * @return supported pipeline elements that can run on this node
-     */
     // TODO: get supported PE programmatically instead of environment variables
     public List<String> getSupportedPipelineElements() {
         return System.getenv()
@@ -132,10 +110,6 @@ public enum NodeControllerConfig {
                 .collect(Collectors.toList());
     }
 
-    /**
-     *
-     * @return return sensors/actuators that are accessible
-     */
     public List<AccessibleSensorActuatorResource> getAccessibleSensorActuator(){
         return System.getenv()
                 .entrySet()
@@ -152,71 +126,62 @@ public enum NodeControllerConfig {
                 .collect(Collectors.toList());
     }
 
-    /**
-     *
-     * @return true if node has cuda capable GPU
-     */
     public boolean hasNodeGpu(){
-        return envExist(ConfigKeys.NODE_HAS_GPU_KEY) ? getEnvAsBoolean(ConfigKeys.NODE_HAS_GPU_KEY) : false;
+        return getEnvOrDefault(
+                ConfigKeys.NODE_HAS_GPU_KEY,
+                false,
+                Boolean.class);
     }
 
-    /**
-     *
-     * @return number of cuda cores
-     */
     public int getGpuCores() {
-        return envExist(ConfigKeys.NODE_GPU_CUDA_CORES_KEY) ? getEnvAsInteger(ConfigKeys.NODE_GPU_CUDA_CORES_KEY) : 0;
+        return getEnvOrDefault(
+                ConfigKeys.NODE_GPU_CUDA_CORES_KEY,
+                0,
+                Integer.class);
     }
 
-    /**
-     *
-     * @return specific node GPU type
-     */
     public String getGpuType() {
-        return envExist(ConfigKeys.NODE_GPU_TYPE_KEY) ? getEnvAsString(ConfigKeys.NODE_GPU_TYPE_KEY) : "n/a";
+        return getEnvOrDefault(
+                ConfigKeys.NODE_GPU_TYPE_KEY,
+                "n/a",
+                String.class);
     }
 
-    /**
-     *
-     * @return docker pruning frequency
-     */
     public int getPruningFreq() {
-        return envExist(ConfigKeys.DOCKER_PRUNING_FREQ_SECS_KEY) ? getEnvAsInteger(ConfigKeys.DOCKER_PRUNING_FREQ_SECS_KEY) :
-                DEFAULT_DOCKER_PRUNING_FREQ_SECS;
+        return getEnvOrDefault(
+                ConfigKeys.DOCKER_PRUNING_FREQ_SECS_KEY,
+                DEFAULT_DOCKER_PRUNING_FREQ_SECS,
+                Integer.class);
     }
 
-    /**
-     *
-     * @return node resource update frequency
-     */
     public int getNodeResourceUpdateFreqSecs() {
-        return envExist(ConfigKeys.NODE_RESOURCE_UPDATE_FREQ_SECS_KEY) ? getEnvAsInteger(ConfigKeys.NODE_RESOURCE_UPDATE_FREQ_SECS_KEY) :
-                DEFAULT_NODE_RESOURCE_UPDATE_FREQ_SECS;
+        return getEnvOrDefault(
+                ConfigKeys.NODE_RESOURCE_UPDATE_FREQ_SECS_KEY,
+                DEFAULT_NODE_RESOURCE_UPDATE_FREQ_SECS,
+                Integer.class);
     }
 
     public int getEventBufferSize() {
-        return envExist(ConfigKeys.NODE_EVENT_BUFFER_SIZE) ? getEnvAsInteger(ConfigKeys.NODE_EVENT_BUFFER_SIZE) :
-                DEFAULT_EVENT_BUFFER_SIZE;
-    }
-
-    private boolean envExist(String key) {
-        return System.getenv(key) != null;
-    }
-
-    private String getEnv(String key) {
-        return System.getenv(key);
-    }
-
-    private String getEnvAsString(String key) {
-        return String.valueOf(getEnv(key));
+        return getEnvOrDefault(
+                ConfigKeys.NODE_EVENT_BUFFER_SIZE,
+                DEFAULT_EVENT_BUFFER_SIZE,
+                Integer.class);
     }
 
-    private int getEnvAsInteger(String key) {
-        return Integer.parseInt(getEnv(key));
+    public String getNodeType() {
+        return getEnvOrDefault(
+                ConfigKeys.NODE_TYPE,
+                DEFAULT_NODE_TYPE,
+                String.class);
     }
 
-    private boolean getEnvAsBoolean(String key) {
-        return Boolean.parseBoolean(getEnv(key));
+    private <T> T getEnvOrDefault(String k, T defaultValue, Class<T> type) {
+        if(type.equals(Integer.class)) {
+            return System.getenv(k) != null ? (T) Integer.valueOf(System.getenv(k)) : defaultValue;
+        } else if(type.equals(Boolean.class)) {
+            return System.getenv(k) != null ? (T) Boolean.valueOf(System.getenv(k)) : defaultValue;
+        } else {
+            return System.getenv(k) != null ? type.cast(System.getenv(k)) : defaultValue;
+        }
     }
-
 }
diff --git a/streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/management/container/ContainerStatus.java b/streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/management/IRunningInstances.java
similarity index 83%
copy from streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/management/container/ContainerStatus.java
copy to streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/management/IRunningInstances.java
index 7961894..e049a80 100644
--- a/streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/management/container/ContainerStatus.java
+++ b/streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/management/IRunningInstances.java
@@ -1,4 +1,4 @@
-package org.apache.streampipes.node.controller.container.management.container;/*
+package org.apache.streampipes.node.controller.container.management;/*
  * Licensed to the Apache Software Foundation (ASF) under one or more
  * contributor license agreements.  See the NOTICE file distributed with
  * this work for additional information regarding copyright ownership.
@@ -16,10 +16,13 @@ package org.apache.streampipes.node.controller.container.management.container;/*
  *
  */
 
-public enum ContainerStatus {
-    DEPLOYED,
-    RUNNING,
-    STOPPED,
-    REMOVED,
-    UNKNOWN
+public interface IRunningInstances<T> {
+
+    void add(String id, T value);
+
+    boolean isRunning(String id);
+
+    T get(String id);
+
+    void remove(String id);
 }
diff --git a/streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/management/info/NodeInfoStorage.java b/streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/management/info/NodeInfoStorage.java
index 246dd37..8c6f628 100644
--- a/streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/management/info/NodeInfoStorage.java
+++ b/streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/management/info/NodeInfoStorage.java
@@ -26,8 +26,8 @@ import org.apache.streampipes.model.node.resources.interfaces.AccessibleSensorAc
 import org.apache.streampipes.model.node.resources.software.SoftwareResource;
 import org.apache.streampipes.model.node.resources.software.Docker;
 import org.apache.streampipes.node.controller.container.config.NodeControllerConfig;
-import org.apache.streampipes.node.controller.container.management.container.DockerInfo;
-import org.apache.streampipes.node.controller.container.management.container.DockerUtils;
+import org.apache.streampipes.node.controller.container.management.orchestrator.docker.DockerInfo;
+import org.apache.streampipes.node.controller.container.management.orchestrator.docker.DockerUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import oshi.SystemInfo;
@@ -74,6 +74,7 @@ public class NodeInfoStorage {
 
         NodeInfo nodeInfo = NodeInfoBuilder.create(getNodeControllerId())
                 .withNodeControllerPort(getNodeControllerPort())
+                .withNodeType(getNodeType())
                 .withNodeHost(getNodeHost())
                 .withNodeLocation(getNodeLocation())
                 .withNodeModel(getNodeModel())
@@ -85,6 +86,10 @@ public class NodeInfoStorage {
         NodeInfoStorage.getInstance().add(nodeInfo);
     }
 
+    private static String getNodeType() {
+        return NodeControllerConfig.INSTANCE.getNodeType();
+    }
+
     private static String getNodeControllerId(){
         return NodeControllerConfig.INSTANCE.getNodeControllerId();
     }
diff --git a/streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/management/node/NodeJanitorManager.java b/streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/management/node/NodeJanitorManager.java
index ba36aab..74deddc 100644
--- a/streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/management/node/NodeJanitorManager.java
+++ b/streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/management/node/NodeJanitorManager.java
@@ -17,7 +17,7 @@ package org.apache.streampipes.node.controller.container.management.node;/*
  */
 
 import org.apache.streampipes.node.controller.container.config.NodeControllerConfig;
-import org.apache.streampipes.node.controller.container.management.container.DockerUtils;
+import org.apache.streampipes.node.controller.container.management.orchestrator.docker.DockerUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
diff --git a/streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/management/container/ContainerOrchestrator.java b/streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/management/orchestrator/ContainerOrchestrator.java
similarity index 95%
rename from streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/management/container/ContainerOrchestrator.java
rename to streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/management/orchestrator/ContainerOrchestrator.java
index 5f59c9f..fda9ae2 100644
--- a/streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/management/container/ContainerOrchestrator.java
+++ b/streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/management/orchestrator/ContainerOrchestrator.java
@@ -1,4 +1,4 @@
-package org.apache.streampipes.node.controller.container.management.container;/*
+/*
  * Licensed to the Apache Software Foundation (ASF) under one or more
  * contributor license agreements.  See the NOTICE file distributed with
  * this work for additional information regarding copyright ownership.
@@ -15,13 +15,18 @@ package org.apache.streampipes.node.controller.container.management.container;/*
  * limitations under the License.
  *
  */
+package org.apache.streampipes.node.controller.container.management.orchestrator;
 
 import org.apache.streampipes.model.node.PipelineElementDockerContainer;
 
 public interface ContainerOrchestrator {
 
+   void init();
+
    String deploy(PipelineElementDockerContainer p);
 
    String remove(PipelineElementDockerContainer p);
 
+   String list();
+
 }
diff --git a/streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/management/container/ContainerStatus.java b/streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/management/orchestrator/ContainerStatus.java
similarity index 97%
rename from streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/management/container/ContainerStatus.java
rename to streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/management/orchestrator/ContainerStatus.java
index 7961894..00a2c75 100644
--- a/streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/management/container/ContainerStatus.java
+++ b/streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/management/orchestrator/ContainerStatus.java
@@ -1,4 +1,4 @@
-package org.apache.streampipes.node.controller.container.management.container;/*
+/*
  * Licensed to the Apache Software Foundation (ASF) under one or more
  * contributor license agreements.  See the NOTICE file distributed with
  * this work for additional information regarding copyright ownership.
@@ -15,6 +15,7 @@ package org.apache.streampipes.node.controller.container.management.container;/*
  * limitations under the License.
  *
  */
+package org.apache.streampipes.node.controller.container.management.orchestrator;
 
 public enum ContainerStatus {
     DEPLOYED,
diff --git a/streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/management/relay/RunningRelayInstances.java b/streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/management/orchestrator/RunningContainerInstances.java
similarity index 63%
copy from streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/management/relay/RunningRelayInstances.java
copy to streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/management/orchestrator/RunningContainerInstances.java
index 276f488..58a88b1 100644
--- a/streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/management/relay/RunningRelayInstances.java
+++ b/streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/management/orchestrator/RunningContainerInstances.java
@@ -1,4 +1,4 @@
-package org.apache.streampipes.node.controller.container.management.relay;/*
+package org.apache.streampipes.node.controller.container.management.orchestrator;/*
  * Licensed to the Apache Software Foundation (ASF) under one or more
  * contributor license agreements.  See the NOTICE file distributed with
  * this work for additional information regarding copyright ownership.
@@ -16,30 +16,34 @@ package org.apache.streampipes.node.controller.container.management.relay;/*
  *
  */
 
+import org.apache.streampipes.model.node.PipelineElementDockerContainer;
+import org.apache.streampipes.node.controller.container.management.IRunningInstances;
+
 import java.util.HashMap;
 import java.util.Map;
 
-public enum RunningRelayInstances {
+public enum RunningContainerInstances implements IRunningInstances<PipelineElementDockerContainer> {
     INSTANCE;
 
-    private final Map<String, EventRelayManager> runningInstances = new HashMap<>();
+    private final Map<String, PipelineElementDockerContainer> runningInstances = new HashMap<>();
 
-    // TODO: persist active relays to support failure handling
-    public void addRelay(String id, EventRelayManager eventRelayManager) {
-        runningInstances.put(id, eventRelayManager);
+    @Override
+    public void add(String id, PipelineElementDockerContainer container) {
+        runningInstances.put(id, container);
     }
 
+    @Override
     public boolean isRunning(String id) {
         return runningInstances.get(id) != null;
     }
 
-    public EventRelayManager get(String id) {
+    @Override
+    public PipelineElementDockerContainer get(String id) {
         return isRunning(id) ? runningInstances.get(id) : null;
     }
 
-    public EventRelayManager removeRelay(String id) {
-        EventRelayManager eventRelayManager = runningInstances.get(id);
+    @Override
+    public void remove(String id) {
         runningInstances.remove(id);
-        return eventRelayManager;
     }
 }
diff --git a/streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/management/container/DockerOrchestratorManager.java b/streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/management/orchestrator/docker/DockerContainerOrchestrator.java
similarity index 78%
rename from streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/management/container/DockerOrchestratorManager.java
rename to streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/management/orchestrator/docker/DockerContainerOrchestrator.java
index b47a218..943a5bd 100644
--- a/streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/management/container/DockerOrchestratorManager.java
+++ b/streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/management/orchestrator/docker/DockerContainerOrchestrator.java
@@ -1,4 +1,4 @@
-package org.apache.streampipes.node.controller.container.management.container;/*
+package org.apache.streampipes.node.controller.container.management.orchestrator.docker;/*
  * Licensed to the Apache Software Foundation (ASF) under one or more
  * contributor license agreements.  See the NOTICE file distributed with
  * this work for additional information regarding copyright ownership.
@@ -18,44 +18,48 @@ package org.apache.streampipes.node.controller.container.management.container;/*
 
 import com.google.common.collect.ImmutableMap;
 import com.google.gson.Gson;
-import com.google.gson.JsonArray;
 import com.spotify.docker.client.exceptions.DockerException;
 import com.spotify.docker.client.exceptions.NotFoundException;
 import com.spotify.docker.client.messages.Container;
 import org.apache.commons.lang.StringUtils;
 import org.apache.streampipes.container.util.ConsulUtil;
 import org.apache.streampipes.model.node.PipelineElementDockerContainer;
+import org.apache.streampipes.node.controller.container.management.orchestrator.ContainerOrchestrator;
+import org.apache.streampipes.node.controller.container.management.orchestrator.ContainerStatus;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.io.Serializable;
 import java.util.*;
 
-public class DockerOrchestratorManager implements ContainerOrchestrator {
+public class DockerContainerOrchestrator implements ContainerOrchestrator {
 
     private static final Logger LOG =
-            LoggerFactory.getLogger(DockerOrchestratorManager.class.getCanonicalName());
+            LoggerFactory.getLogger(DockerContainerOrchestrator.class.getCanonicalName());
 
-    private DockerUtils docker = DockerUtils.getInstance();
+    private final DockerUtils docker = DockerUtils.getInstance();
+    private static DockerContainerOrchestrator instance = null;
 
-    private static DockerOrchestratorManager instance = null;
+    private DockerContainerOrchestrator() {}
 
-    private DockerOrchestratorManager() {}
-
-    public static DockerOrchestratorManager getInstance() {
+    public static DockerContainerOrchestrator getInstance() {
         if (instance == null) {
-            synchronized (DockerOrchestratorManager.class) {
+            synchronized (DockerContainerOrchestrator.class) {
                 if (instance == null)
-                    instance = new DockerOrchestratorManager();
+                    instance = new DockerContainerOrchestrator();
             }
         }
         return instance;
     }
 
+    @Override
+    public void init() {
+        DockerNodeContainer.INSTANCE.get().forEach(this::deploy);
+    }
 
     @Override
     public String deploy(PipelineElementDockerContainer p) {
-        LOG.info("Pull image and deploy pipeline element container {}", p.getContainerName());
+        LOG.info("Pull image and deploy pipeline element container {}", p.getImageURI());
 
         Optional<Container> containerOptional = docker.getContainer(p.getContainerName());
         if (!containerOptional.isPresent()) {
@@ -84,7 +88,7 @@ public class DockerOrchestratorManager implements ContainerOrchestrator {
 
     @Override
     public String remove(PipelineElementDockerContainer p) {
-        LOG.info("Remove pipeline element container: {}", p.getContainerName());
+        LOG.info("Remove pipeline element container: {}", p.getImageURI());
 
         Optional<Container> containerOptional = docker.getContainer(p.getContainerName());
         if(containerOptional.isPresent()) {
@@ -110,19 +114,23 @@ public class DockerOrchestratorManager implements ContainerOrchestrator {
         return new Gson().toJson(m);
     }
 
+    @Override
     public String list() {
         LOG.info("List running pipeline element container");
 
         List<Container> containerList = docker.getRunningPipelineElementContainer();
-
+        HashMap<String, Object> m = new HashMap<>();
         if (containerList.size() > 0) {
-            List<String> l = new ArrayList<>();
             for (Container c: containerList) {
-                l.add(StringUtils.remove(c.names().get(0), "/"));
+                m.put("containerName", StringUtils.remove(c.names().get(0), "/"));
+                m.put("containerId", c.id());
+                m.put("image", c.image());
+                m.put("state", c.state());
+                m.put("status", c.status());
+                m.put("labels", c.labels());
             }
-            return new Gson().toJson(l);
         }
-        return new Gson().toJson(new JsonArray());
+        return new Gson().toJson(m);
     }
 
     private String deployPipelineElementContainer(PipelineElementDockerContainer p) throws Exception {
diff --git a/streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/management/container/DockerInfo.java b/streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/management/orchestrator/docker/DockerInfo.java
similarity index 99%
rename from streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/management/container/DockerInfo.java
rename to streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/management/orchestrator/docker/DockerInfo.java
index 7a2d887..50b5cef 100644
--- a/streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/management/container/DockerInfo.java
+++ b/streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/management/orchestrator/docker/DockerInfo.java
@@ -1,4 +1,4 @@
-package org.apache.streampipes.node.controller.container.management.container;/*
+/*
  * Licensed to the Apache Software Foundation (ASF) under one or more
  * contributor license agreements.  See the NOTICE file distributed with
  * this work for additional information regarding copyright ownership.
@@ -15,6 +15,7 @@ package org.apache.streampipes.node.controller.container.management.container;/*
  * limitations under the License.
  *
  */
+package org.apache.streampipes.node.controller.container.management.orchestrator.docker;
 
 public class DockerInfo {
 
diff --git a/streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/management/orchestrator/docker/DockerNodeContainer.java b/streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/management/orchestrator/docker/DockerNodeContainer.java
new file mode 100644
index 0000000..ed5dd38
--- /dev/null
+++ b/streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/management/orchestrator/docker/DockerNodeContainer.java
@@ -0,0 +1,67 @@
+package org.apache.streampipes.node.controller.container.management.orchestrator.docker;/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+import org.apache.streampipes.model.node.PipelineElementDockerContainer;
+import org.apache.streampipes.model.node.PipelineElementDockerContainerBuilder;
+import org.apache.streampipes.node.controller.container.config.NodeControllerConfig;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+
+public enum DockerNodeContainer {
+    INSTANCE;
+
+    public List<PipelineElementDockerContainer> get() {
+        //NodeInfoStorage.getInstance().retrieveNodeInfo().getNodeResources().getHardwareResource().getCpu().getArch();
+        List<PipelineElementDockerContainer> nodeContainers = new ArrayList<>();
+
+        // PE: pipeline element JVM all container
+        PipelineElementDockerContainer processor =
+                PipelineElementDockerContainerBuilder.create("pe/org.apache.streampipes.processors.all.jvm")
+                        .withImage("apachestreampipes/pipeline-elements-all-jvm:0.68.0-SNAPSHOT")
+                        .withName("streampipes_pipeline-elements-all-jvm")
+                        .withExposedPorts(new String[]{"7023"})
+                        .withEnvironmentVariables(Arrays.asList(
+                                "SP_NODE_ID=" + NodeControllerConfig.INSTANCE.getNodeBrokerHost(),
+                                "SP_NODE_CONTROLLER_HOST=" + NodeControllerConfig.INSTANCE.getNodeControllerId(),
+                                "SP_NODE_CONTROLLER_PORT=" + NodeControllerConfig.INSTANCE.getNodeControllerPort()
+                        ))
+                        .withLabels(new HashMap<String,String>(){{
+                            put("org.apache.streampipes.container.type", "pipeline-element");
+                            put("org.apache.streampipes.container.node.type", "edge");}})
+                        .build();
+
+        // Node broker: Mosquitto
+        PipelineElementDockerContainer nodeBroker =
+                PipelineElementDockerContainerBuilder.create("pe/org.apache.streampipes.node.broker")
+                        .withImage("eclipse-mosquitto:1.6.12")
+                        .withName("streampipes_node-broker")
+                        .withExposedPorts(new String[]{"1883"})
+                        .withLabels(new HashMap<String,String>(){{
+                            put("org.apache.streampipes.pe.container.type", "broker");
+                            put("org.apache.streampipes.pe.container.location", "edge");}})
+                        .build();
+
+        nodeContainers.add(processor);
+        nodeContainers.add(nodeBroker);
+
+        return nodeContainers;
+    }
+}
diff --git a/streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/management/container/DockerUtils.java b/streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/management/orchestrator/docker/DockerUtils.java
similarity index 93%
rename from streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/management/container/DockerUtils.java
rename to streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/management/orchestrator/docker/DockerUtils.java
index 1a91296..8793685 100644
--- a/streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/management/container/DockerUtils.java
+++ b/streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/management/orchestrator/docker/DockerUtils.java
@@ -1,4 +1,4 @@
-package org.apache.streampipes.node.controller.container.management.container;/*
+package org.apache.streampipes.node.controller.container.management.orchestrator.docker;/*
  * Licensed to the Apache Software Foundation (ASF) under one or more
  * contributor license agreements.  See the NOTICE file distributed with
  * this work for additional information regarding copyright ownership.
@@ -16,8 +16,6 @@ package org.apache.streampipes.node.controller.container.management.container;/*
  *
  */
 
-import com.google.gson.Gson;
-import com.google.gson.GsonBuilder;
 import com.google.gson.JsonObject;
 import com.google.gson.JsonParser;
 import com.spotify.docker.client.DefaultDockerClient;
@@ -37,15 +35,12 @@ import java.util.*;
 import java.util.stream.Collectors;
 
 public class DockerUtils {
-
-    private static final Logger LOG =
-            LoggerFactory.getLogger(DockerUtils.class.getCanonicalName());
+    private static final Logger LOG = LoggerFactory.getLogger(DockerUtils.class.getCanonicalName());
 
     private static final String SP_NETWORK = "spnet";
     private static final String SP_CONTAINER_PREFIX = "streampipes_";
     private static final String DOCKER_UNIX_SOCK = "/var/run/docker.sock";
     private static final String BLANK_SPACE = " ";
-
     private static DockerUtils instance;
     private static DockerClient docker;
 
@@ -108,10 +103,10 @@ public class DockerUtils {
     private String getContainerIdByName(String containerName) {
         LOG.info("Get containerId by container name");
         try {
-            List<Container> containerList = docker.listContainers();
-            return containerList.stream()
+            return docker.listContainers()
+                    .stream()
                     .findAny()
-                    .filter(c -> c.names().get(0).contains(containerName))
+                    .filter(c -> c.names().get(0).contains(verifyContainerName(containerName)))
                     .get()
                     .id();
         } catch (DockerException | InterruptedException e) {
@@ -123,16 +118,19 @@ public class DockerUtils {
 
     public String createContainer(PipelineElementDockerContainer p) {
         LOG.info("Create pipeline element container {}", p.getContainerName());
-        ContainerCreation creation;
         try {
-            creation = docker.createContainer(getContainerConfig(p), SP_CONTAINER_PREFIX + p.getContainerName());
-            return creation.id();
+            return docker.createContainer(getContainerConfig(p), verifyContainerName(p.getContainerName())).id();
         } catch (DockerException | InterruptedException e) {
             LOG.error("Pipeline element container could not be created. {}", e.toString());
         }
         return "";
     }
 
+    private String verifyContainerName(String containerName) {
+        return containerName.startsWith(SP_CONTAINER_PREFIX) ?
+                containerName : SP_CONTAINER_PREFIX + containerName;
+    }
+
     private ContainerConfig getContainerConfig(PipelineElementDockerContainer p) {
         return ContainerConfig.builder()
                 .hostname(p.getContainerName())
@@ -146,10 +144,9 @@ public class DockerUtils {
     }
 
     public Optional<Container> getContainer(String containerName) {
-        LOG.info("Get container: {}", containerName);
-        List<Container> containers = getContainerList();
-        return containers.stream()
-                .filter(c -> c.names().get(0).contains(containerName))
+        return getContainerList()
+                .stream()
+                .filter(c -> c.names().get(0).contains(verifyContainerName(containerName)))
                 .findAny();
     }
 
diff --git a/streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/management/pe/PipelineElementManager.java b/streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/management/pe/PipelineElementManager.java
index f574410..cc648f4 100644
--- a/streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/management/pe/PipelineElementManager.java
+++ b/streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/management/pe/PipelineElementManager.java
@@ -16,24 +16,29 @@ package org.apache.streampipes.node.controller.container.management.pe;/*
  *
  */
 
-import com.google.gson.Gson;
-import com.google.gson.JsonSyntaxException;
 import org.apache.http.client.fluent.Request;
 import org.apache.http.client.fluent.Response;
 import org.apache.http.entity.ContentType;
-import org.apache.streampipes.model.pipeline.PipelineElementStatus;
+import org.apache.http.entity.StringEntity;
+import org.apache.streampipes.container.model.node.InvocableRegistration;
+import org.apache.streampipes.node.controller.container.management.info.NodeInfoStorage;
+import org.apache.streampipes.serializers.json.JacksonSerializer;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
+import java.util.Collections;
 
 public class PipelineElementManager {
 
     private static final Logger LOG =
             LoggerFactory.getLogger(PipelineElementManager.class.getCanonicalName());
 
+    private static final String PROTOCOL = "http://";
+    private static final String COLON = ":";
+    private static final String SLASH = "/";
+    private static final String ENV_CONSUL_LOCATION = "CONSUL_LOCATION";
     private static final Integer CONNECT_TIMEOUT = 10000;
-
     private static PipelineElementManager instance = null;
 
     private PipelineElementManager() {}
@@ -49,11 +54,34 @@ public class PipelineElementManager {
     }
 
     /**
-     * registeration of newly started pipeline element runtime container
+     * Register pipeline element container
+     *
+     * @param invocableRegistration
      */
+    public void registerPipelineElements(InvocableRegistration invocableRegistration) {
+        try {
+            Request.Put(makeConsulRegistrationEndpoint())
+                    .addHeader("accept", "application/json")
+                    .body(new StringEntity(JacksonSerializer
+                            .getObjectMapper()
+                            .writeValueAsString(invocableRegistration.getConsulServiceRegistrationBody())))
+                    .execute();
+
+            // TODO: persistent storage to survive failures
+            NodeInfoStorage.getInstance()
+                    .retrieveNodeInfo()
+                    .setSupportedPipelineElementAppIds(invocableRegistration.getSupportedPipelineElementAppIds());
+        } catch (IOException e) {
+            e.printStackTrace();
+        }
+    }
 
     /**
-     * invokes pipeline elements when pipeline is started
+     * Invoke pipeline elements when pipeline is started
+     *
+     * @param pipelineElementEndpoint
+     * @param payload
+     * @return
      */
     public String invokePipelineElement(String pipelineElementEndpoint, String payload) {
         LOG.info("Invoking element: {}", pipelineElementEndpoint);
@@ -78,4 +106,28 @@ public class PipelineElementManager {
 
     }
 
+    public void unregisterPipelineElements(){
+        NodeInfoStorage.getInstance()
+                .retrieveNodeInfo()
+                .setSupportedPipelineElementAppIds(Collections.emptyList());
+    }
+
+    private String makeConsulRegistrationEndpoint() {
+        if (System.getenv(ENV_CONSUL_LOCATION) != null) {
+            return PROTOCOL
+                    + System.getenv(ENV_CONSUL_LOCATION)
+                    + COLON
+                    + "8500"
+                    + SLASH
+                    + "v1/agent/service/register";
+        } else {
+            return PROTOCOL
+                    + "localhost"
+                    + COLON
+                    + "8500"
+                    + SLASH
+                    + "v1/agent/service/register";
+        }
+    }
+
 }
diff --git a/streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/management/relay/RunningRelayInstances.java b/streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/management/relay/RunningRelayInstances.java
index 276f488..09e373f 100644
--- a/streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/management/relay/RunningRelayInstances.java
+++ b/streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/management/relay/RunningRelayInstances.java
@@ -16,30 +16,34 @@ package org.apache.streampipes.node.controller.container.management.relay;/*
  *
  */
 
+import org.apache.streampipes.node.controller.container.management.IRunningInstances;
+
 import java.util.HashMap;
 import java.util.Map;
 
-public enum RunningRelayInstances {
+public enum RunningRelayInstances implements IRunningInstances<EventRelayManager> {
     INSTANCE;
 
     private final Map<String, EventRelayManager> runningInstances = new HashMap<>();
 
     // TODO: persist active relays to support failure handling
-    public void addRelay(String id, EventRelayManager eventRelayManager) {
+    @Override
+    public void add(String id, EventRelayManager eventRelayManager) {
         runningInstances.put(id, eventRelayManager);
     }
 
+    @Override
     public boolean isRunning(String id) {
         return runningInstances.get(id) != null;
     }
 
+    @Override
     public EventRelayManager get(String id) {
         return isRunning(id) ? runningInstances.get(id) : null;
     }
 
-    public EventRelayManager removeRelay(String id) {
-        EventRelayManager eventRelayManager = runningInstances.get(id);
+    @Override
+    public void remove(String id) {
         runningInstances.remove(id);
-        return eventRelayManager;
     }
 }
diff --git a/streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/management/resource/ResourceManager.java b/streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/management/resource/ResourceManager.java
index e9b5c31..0affe45 100644
--- a/streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/management/resource/ResourceManager.java
+++ b/streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/management/resource/ResourceManager.java
@@ -18,7 +18,6 @@ package org.apache.streampipes.node.controller.container.management.resource;/*
 
 import com.google.gson.Gson;
 import org.apache.streampipes.node.controller.container.config.NodeControllerConfig;
-import org.apache.streampipes.node.controller.container.management.container.DockerUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import oshi.SystemInfo;
diff --git a/streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/rest/DebugRelayResource.java b/streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/rest/DebugRelayResource.java
index 0ff1ca6..00dface 100644
--- a/streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/rest/DebugRelayResource.java
+++ b/streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/rest/DebugRelayResource.java
@@ -35,7 +35,7 @@ public class DebugRelayResource extends AbstractNodeContainerResource {
         System.out.println(msg);
         EventRelayManager eventRelayManager = new EventRelayManager();
         eventRelayManager.start();
-        RunningRelayInstances.INSTANCE.addRelay(eventRelayManager.getRelayedTopic(), eventRelayManager);
+        RunningRelayInstances.INSTANCE.add(eventRelayManager.getRelayedTopic(), eventRelayManager);
 
         return ok();
     }
diff --git a/streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/rest/PELifeCycleResource.java b/streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/rest/InvocableManagementResource.java
similarity index 70%
rename from streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/rest/PELifeCycleResource.java
rename to streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/rest/InvocableManagementResource.java
index 091331b..e1e302c 100644
--- a/streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/rest/PELifeCycleResource.java
+++ b/streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/rest/InvocableManagementResource.java
@@ -1,4 +1,4 @@
-package org.apache.streampipes.node.controller.container.rest;/*
+/*
  * Licensed to the Apache Software Foundation (ASF) under one or more
  * contributor license agreements.  See the NOTICE file distributed with
  * this work for additional information regarding copyright ownership.
@@ -15,18 +15,21 @@ package org.apache.streampipes.node.controller.container.rest;/*
  * limitations under the License.
  *
  */
+package org.apache.streampipes.node.controller.container.rest;
 
 import org.apache.streampipes.commons.exceptions.SpRuntimeException;
+import org.apache.streampipes.container.model.node.InvocableRegistration;
 import org.apache.streampipes.container.transform.Transformer;
 import org.apache.streampipes.container.util.Util;
 import org.apache.streampipes.model.base.InvocableStreamPipesEntity;
 import org.apache.streampipes.model.graph.DataProcessorInvocation;
 import org.apache.streampipes.model.graph.DataSinkInvocation;
 import org.apache.streampipes.model.node.PipelineElementDockerContainer;
-import org.apache.streampipes.node.controller.container.management.container.DockerOrchestratorManager;
+import org.apache.streampipes.node.controller.container.management.orchestrator.docker.DockerContainerOrchestrator;
 import org.apache.streampipes.node.controller.container.management.pe.PipelineElementManager;
 import org.apache.streampipes.node.controller.container.management.relay.EventRelayManager;
 import org.apache.streampipes.node.controller.container.management.relay.RunningRelayInstances;
+import org.apache.streampipes.serializers.json.JacksonSerializer;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -36,64 +39,44 @@ import javax.ws.rs.core.Response;
 import java.io.IOException;
 
 @Path("/node/container")
-public class PELifeCycleResource<I extends InvocableStreamPipesEntity> extends AbstractNodeContainerResource{
-    private static final Logger LOG =
-            LoggerFactory.getLogger(PELifeCycleResource.class.getCanonicalName());
+public class InvocableManagementResource<I extends InvocableStreamPipesEntity> extends AbstractNodeContainerResource{
+    private static final Logger LOG = LoggerFactory.getLogger(InvocableManagementResource.class.getCanonicalName());
 
-    private static final String COLON = ":";
-
-    /**
-     *
-     * @return a list of currently running Docker Containers
-     */
     @GET
     @Produces(MediaType.APPLICATION_JSON)
     public Response getPipelineElementContainer(){
-        return ok(DockerOrchestratorManager.getInstance().list());
+        return ok(DockerContainerOrchestrator.getInstance().list());
     }
 
-    /**
-     * Deploys a new Docker Container
-     *
-     * @param container to be deployed
-     * @return deployment status
-     */
     @POST
     @Path("/deploy")
     @Consumes(MediaType.APPLICATION_JSON)
-    @Produces(MediaType.APPLICATION_JSON)
     public Response deployPipelineElementContainer(PipelineElementDockerContainer container) {
-        return ok(DockerOrchestratorManager.getInstance().deploy(container));
+        return ok(DockerContainerOrchestrator.getInstance().deploy(container));
     }
 
-    /**
-     * Register pipeline elements in consul
-     * @param message
-     * @return
-     */
     @POST
     @Path("/register")
-    @Consumes(MediaType.APPLICATION_JSON)
-    @Produces(MediaType.APPLICATION_JSON)
-    public Response registerPipelineElementInConsul(String message) {
-        // TODO implement
+    public void register(String body) {
+        try {
+            InvocableRegistration invocableRegistration = JacksonSerializer
+                    .getObjectMapper()
+                    .readValue(body, InvocableRegistration.class);
 
-//        HttpClient client = HttpClients.custom().build();
-//        HttpUriRequest request = RequestBuilder.put()
-//                .setUri("http://localhost:8500/v1/agent/service/register")
-//                .setEntity(new StringEntity(message))
-//                .setHeader(HttpHeaders.CONTENT_TYPE, "application/json")
-//                .build();
-//        client.execute(request);
+            // register pipeline elements at consul and node controller
+            PipelineElementManager.getInstance().registerPipelineElements(invocableRegistration);
+            LOG.info("Sucessfully registered pipeline element container");
 
-        return ok();
+        } catch (IOException e) {
+            LOG.error("Could not register pipeline element container - " + e.toString());
+        }
     }
 
     @POST
     @Path("/invoke/{identifier}/{elementId}")
     @Consumes(MediaType.APPLICATION_JSON)
     @Produces(MediaType.APPLICATION_JSON)
-    public Response invokePipelineElement(@PathParam("identifier") String identifier, @PathParam("elementId") String elementId, String payload) {
+    public Response invoke(@PathParam("identifier") String identifier, @PathParam("elementId") String elementId, String payload) {
 
         // TODO implement
         String pipelineElementEndpoint;
@@ -105,7 +88,7 @@ public class PELifeCycleResource<I extends InvocableStreamPipesEntity> extends A
                 // TODO: start event relay to remote broker
 //                EventRelayManager eventRelayManager = new EventRelayManager(graph);
 //                eventRelayManager.start();
-//                RunningRelayInstances.INSTANCE.addRelay(eventRelayManager.getRelayedTopic(), eventRelayManager);
+//                RunningRelayInstances.INSTANCE.add(eventRelayManager.getRelayedTopic(), eventRelayManager);
 
                 PipelineElementManager.getInstance().invokePipelineElement(graph.getBelongsTo(), payload);
             }
@@ -141,8 +124,10 @@ public class PELifeCycleResource<I extends InvocableStreamPipesEntity> extends A
         // TODO implement
 
         // TODO: stop event relay to remote broker
-        EventRelayManager relay = RunningRelayInstances.INSTANCE.removeRelay(appId);
+        EventRelayManager relay = RunningRelayInstances.INSTANCE.get(appId);
+        assert relay != null;
         relay.stop();
+        RunningRelayInstances.INSTANCE.remove(appId);
 
         return ok();
     }
@@ -150,9 +135,9 @@ public class PELifeCycleResource<I extends InvocableStreamPipesEntity> extends A
     @DELETE
     @Path("/remove")
     @Consumes(MediaType.APPLICATION_JSON)
-    @Produces(MediaType.APPLICATION_JSON)
     public Response removePipelineElementContainer(PipelineElementDockerContainer container) {
-        return ok(DockerOrchestratorManager.getInstance().remove(container));
+        PipelineElementManager.getInstance().unregisterPipelineElements();
+        return ok(DockerContainerOrchestrator.getInstance().remove(container));
     }
 
 }
diff --git a/streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/rest/NodeControllerResourceConfig.java b/streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/rest/NodeControllerResourceConfig.java
index 68930be..2f6f06d 100644
--- a/streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/rest/NodeControllerResourceConfig.java
+++ b/streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/rest/NodeControllerResourceConfig.java
@@ -26,7 +26,7 @@ public class NodeControllerResourceConfig extends ResourceConfig {
     public NodeControllerResourceConfig() {
         register(HealthCheckResource.class);
         register(InfoStatusResource.class);
-        register(PELifeCycleResource.class);
+        register(InvocableManagementResource.class);
 
         // TODO remove later - only for local relay tests
         register(DebugRelayResource.class);
diff --git a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/http/GraphSubmitter.java b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/http/GraphSubmitter.java
index 1899ee1..e29de56 100644
--- a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/http/GraphSubmitter.java
+++ b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/http/GraphSubmitter.java
@@ -54,9 +54,9 @@ public class GraphSubmitter {
 
     graphs.forEach(g -> status.addPipelineElementStatus(new HttpRequestBuilder(g, g.getBelongsTo()).invoke()));
     if (status.getElementStatus().stream().allMatch(PipelineElementStatus::isSuccess)) {
-      dataSets.forEach(dataSet ->
+        dataSets.forEach(dataSet ->
               status.addPipelineElementStatus
-                      (new HttpRequestBuilder(dataSet, dataSet.getUri()).invoke()));
+                      (new HttpRequestBuilder(dataSet, dataSet.getElementId()).invoke()));
     }
     status.setSuccess(status.getElementStatus().stream().allMatch(PipelineElementStatus::isSuccess));
 
@@ -91,8 +91,8 @@ public class GraphSubmitter {
     status.setPipelineId(pipelineId);
     status.setPipelineName(pipelineName);
 
-    graphs.forEach(g -> status.addPipelineElementStatus(new HttpRequestBuilder(g, g.getUri()).detach()));
-    dataSets.forEach(dataSet -> status.addPipelineElementStatus(new HttpRequestBuilder(dataSet, dataSet.getUri() +
+    graphs.forEach(g -> status.addPipelineElementStatus(new HttpRequestBuilder(g, g.getElementId()).detach()));
+    dataSets.forEach(dataSet -> status.addPipelineElementStatus(new HttpRequestBuilder(dataSet, dataSet.getElementId() +
             "/" +dataSet.getDatasetInvocationId())
             .detach()));
     status.setSuccess(status.getElementStatus().stream().allMatch(PipelineElementStatus::isSuccess));
diff --git a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/matching/InvocationGraphBuilder.java b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/matching/InvocationGraphBuilder.java
index 80c9acd..f1fe90f 100644
--- a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/matching/InvocationGraphBuilder.java
+++ b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/matching/InvocationGraphBuilder.java
@@ -119,9 +119,9 @@ public class InvocationGraphBuilder {
             }
           }
         } else {
-          t.getInputStreams()
-                  .get(getIndex(source.getDOM(), t))
-                  .setEventGrounding(inputGrounding);
+            t.getInputStreams()
+                    .get(getIndex(source.getDOM(), t))
+                    .setEventGrounding(inputGrounding);
         }
       } else {
         t.getInputStreams()
diff --git a/ui/src/app/editor/dialog/save-pipeline/save-pipeline.component.html b/ui/src/app/editor/dialog/save-pipeline/save-pipeline.component.html
index fb900b7..d045ad5 100644
--- a/ui/src/app/editor/dialog/save-pipeline/save-pipeline.component.html
+++ b/ui/src/app/editor/dialog/save-pipeline/save-pipeline.component.html
@@ -49,7 +49,7 @@
                 <mat-slide-toggle color="primary" [(ngModel)]="advancedSettings">
                     Choose deployment options
                 </mat-slide-toggle>
-                <mat-divider *ngIf="advancedSettings" style="margin: 2em 0 2em 0;"></mat-divider>
+<!--                <mat-divider *ngIf="advancedSettings" style="margin: 2em 0 2em 0;"></mat-divider>-->
 <!--                <div *ngIf="advancedSettings">-->
 <!--                    <div fxFlex="100" fxLayout="row">-->
 <!--                        <div fxFlex="50" fxLayout="row" fxLayoutAlign="center center">-->
diff --git a/ui/src/app/editor/dialog/save-pipeline/save-pipeline.component.ts b/ui/src/app/editor/dialog/save-pipeline/save-pipeline.component.ts
index cf9b9f3..b40f136 100644
--- a/ui/src/app/editor/dialog/save-pipeline/save-pipeline.component.ts
+++ b/ui/src/app/editor/dialog/save-pipeline/save-pipeline.component.ts
@@ -122,7 +122,9 @@ export class SavePipelineComponent implements OnInit {
       this.deploymentOptions[pipelineElement.appId] = [];
       this.deploymentOptions[pipelineElement.appId].push(this.makeDefaultNodeInfo());
       edgeNodes.forEach(nodeInfo => {
-        if (nodeInfo.supportedPipelineElementAppIds.some(appId => appId === pipelineElement.appId)) {
+        // only show nodes that actually have supported pipeline elements registered
+        if (nodeInfo.supportedPipelineElementAppIds.length != 0 &&
+            nodeInfo.supportedPipelineElementAppIds.some(appId => appId === pipelineElement.appId)) {
           this.deploymentOptions[pipelineElement.appId].push(nodeInfo);
         }
       })