You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@streampipes.apache.org by wi...@apache.org on 2020/11/25 23:06:34 UTC

[incubator-streampipes] branch edge-extensions updated: [WIP] working edge deployments using node controller, some refactoring on existing classes

This is an automated email from the ASF dual-hosted git repository.

wiener pushed a commit to branch edge-extensions
in repository https://gitbox.apache.org/repos/asf/incubator-streampipes.git


The following commit(s) were added to refs/heads/edge-extensions by this push:
     new ec16d76  [WIP] working edge deployments using node controller, some refactoring on existing classes
ec16d76 is described below

commit ec16d7653c18c97b0281d6115a7efecec5baec7a
Author: Patrick Wiener <wi...@fzi.de>
AuthorDate: Thu Nov 26 00:01:13 2020 +0100

    [WIP] working edge deployments using node controller, some refactoring on existing classes
---
 .../streampipes/config/consul/ConsulSpConfig.java  |  81 +++-
 .../development/env                                |   0
 .../master/init/AdapterMasterContainer.java        |   0
 .../standalone/init/StandaloneModelSubmitter.java  |   2 +-
 .../streampipes/container/util/ConsulUtil.java     | 426 ++++++++++++---------
 .../container/util/NodeControllerUtil.java         |  27 +-
 .../model/base/InvocableStreamPipesEntity.java     |  19 -
 .../model/graph/DataProcessorInvocation.java       |   3 +
 .../model/graph/DataSinkInvocation.java            |   3 +
 .../container/NodeControllerContainerInit.java     |  10 +-
 .../container/config/NodeControllerConfig.java     |   4 +-
 .../container/management/IRunningInstances.java    |   3 +-
 .../container/management/info/NodeInfoStorage.java |   5 +-
 .../{node => janitor}/NodeJanitorManager.java      |   3 +-
 .../orchestrator/RunningContainerInstances.java    |   3 +-
 .../docker/DockerContainerOrchestrator.java        |   5 +-
 .../orchestrator/docker/DockerNodeContainer.java   |   7 +-
 .../orchestrator/docker/DockerUtils.java           |   3 +-
 .../ElementLifeCyle.java}                          |  16 +-
 ...ntManager.java => InvocableElementManager.java} |  85 ++--
 .../RunningInvocableInstances.java}                |  21 +-
 .../management/relay/AbstractMqttKafkaBridge.java  |   3 +-
 .../relay/AbstractMqttKafkaConnector.java          |   3 +-
 .../management/relay/AbstractMqttKafkaRelay.java   |   3 +-
 .../management/relay/EventRelayManager.java        |   3 +-
 .../management/relay/MqttKafkaBridge.java          |   4 +-
 .../management/relay/RunningRelayInstances.java    |   3 +-
 .../container/management/relay/model/Metrics.java  |   3 +-
 .../container/management/relay/model/Relay.java    |   3 +-
 .../{resource => resources}/ResourceManager.java   |   3 +-
 .../container/rest/DebugRelayResource.java         |   8 +-
 .../container/rest/InfoStatusResource.java         |   7 +-
 ...tResource.java => InvocableEntityResource.java} |  68 ++--
 .../rest/NodeControllerResourceConfig.java         |   4 +-
 .../manager/endpoint/EndpointFetcher.java          |   2 +-
 .../execution/http/DataSetEntityUrlGenerator.java  |  10 +-
 .../execution/http/EndpointUrlGenerator.java       |   9 +-
 .../http/InvocableEntityUrlGenerator.java          | 143 +++----
 .../manager/matching/InvocationGraphBuilder.java   |   9 +-
 .../manager/node/AvailableNodesFetcher.java        |   2 +-
 .../apache/streampipes/rest/impl/ConsulConfig.java |   2 +-
 .../app/core-model/gen/streampipes-model-client.ts |   2 +-
 ui/src/app/core-model/gen/streampipes-model.ts     |  33 +-
 .../save-pipeline/save-pipeline.component.ts       |   2 -
 44 files changed, 581 insertions(+), 474 deletions(-)

diff --git a/streampipes-config/src/main/java/org/apache/streampipes/config/consul/ConsulSpConfig.java b/streampipes-config/src/main/java/org/apache/streampipes/config/consul/ConsulSpConfig.java
index 80226c1..e76a36d 100644
--- a/streampipes-config/src/main/java/org/apache/streampipes/config/consul/ConsulSpConfig.java
+++ b/streampipes-config/src/main/java/org/apache/streampipes/config/consul/ConsulSpConfig.java
@@ -26,8 +26,12 @@ import org.apache.streampipes.config.SpConfig;
 import org.apache.streampipes.config.SpConfigChangeCallback;
 import org.apache.streampipes.config.model.ConfigItem;
 import org.apache.streampipes.config.model.ConfigurationScope;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
+import java.net.InetSocketAddress;
 import java.net.MalformedURLException;
+import java.net.Socket;
 import java.net.URL;
 import java.util.Arrays;
 import java.util.HashMap;
@@ -36,9 +40,11 @@ import java.util.Map;
 import java.util.Optional;
 
 public class ConsulSpConfig extends SpConfig implements Runnable {
+    private static final Logger LOG = LoggerFactory.getLogger(ConsulSpConfig.class.getCanonicalName());
 
     private static final String CONSUL_ENV_LOCATION = "CONSUL_LOCATION";
-    private static final String NODE_ID_ENV_KEY = "SP_NODE_ID";
+    private static final int CONSUL_DEFAULT_PORT = 8500;
+    private static final String ENV_NODE_CONTROLLER_ID_KEY = "SP_NODE_CONTROLLER_ID";
 
     public static final String SERVICE_ROUTE_PREFIX = "sp/v1/";
     public static final String BASE_PREFIX = "base";
@@ -48,7 +54,7 @@ public class ConsulSpConfig extends SpConfig implements Runnable {
     public static final String SECONDARY_NODE_KEY = "secondary";
 
     private String serviceName;
-    private  KeyValueClient kvClient;
+    private KeyValueClient kvClient;
 
     private List<String> baseConfigKeys = Arrays.asList("SP_HOST", "SP_PORT");
 
@@ -59,32 +65,69 @@ public class ConsulSpConfig extends SpConfig implements Runnable {
 
     public ConsulSpConfig(String serviceName) {
         super(serviceName);
-        //TDOO use consul adress from an environment variable
+        Consul consul = consulInstance();
+        this.kvClient = consul.keyValueClient();
+        this.serviceName = serviceName;
+    }
+
+    public ConsulSpConfig(String serviceName, SpConfigChangeCallback callback) {
+        this(serviceName);
+        this.callback = callback;
+        this.configProps = new HashMap<>();
+        new Thread(this).start();
+    }
+
+    private static Consul consulInstance() {
+        boolean connected = false;
+        URL consulUrl = consulURL();
+
+        while (!connected) {
+            LOG.info("Trying to connect to Consul to register config items");
+            connected = isReady(consulUrl.getHost(), consulUrl.getPort());
+
+            if (!connected) {
+                LOG.info("Retrying in 1 second");
+                try {
+                    Thread.sleep(1000);
+                } catch (InterruptedException e) {
+                    e.printStackTrace();
+                }
+            }
+        }
+        LOG.info("Successfully connected to Consul");
+        return Consul.builder().withUrl(consulURL()).build();
+    }
+
+    private static URL consulURL() {
         Map<String, String> env = System.getenv();
-        Consul consul;
+        URL url = null;
+
         if (env.containsKey(CONSUL_ENV_LOCATION)) {
-            URL url = null;
             try {
-                url = new URL("http", env.get(CONSUL_ENV_LOCATION), 8500, "");
+                url = new URL("http", env.get(CONSUL_ENV_LOCATION), CONSUL_DEFAULT_PORT, "");
             } catch (MalformedURLException e) {
                 e.printStackTrace();
             }
-            consul = Consul.builder().withUrl(url).build(); // connect to Consul on localhost
         } else {
-            consul = Consul.builder().build();
+            try {
+                url = new URL("http", "localhost", CONSUL_DEFAULT_PORT, "");
+            } catch (MalformedURLException e) {
+                e.printStackTrace();
+            }
         }
-
-//        Consul consul = Consul.builder().build(); // connect to Consul on localhost
-        kvClient = consul.keyValueClient();
-
-        this.serviceName = serviceName;
+        return url;
     }
 
-    public ConsulSpConfig(String serviceName, SpConfigChangeCallback callback) {
-        this(serviceName);
-        this.callback = callback;
-        this.configProps = new HashMap<>();
-        new Thread(this).start();
+    public static boolean isReady(String host, int port) {
+        try {
+            InetSocketAddress sa = new InetSocketAddress(host, port);
+            Socket ss = new Socket();
+            ss.connect(sa, 1000);
+            ss.close();
+        } catch(Exception e) {
+            return false;
+        }
+        return true;
     }
 
     @Override
@@ -249,7 +292,7 @@ public class ConsulSpConfig extends SpConfig implements Runnable {
     private String addSn(String key) {
         String configAppendix;
         if (this.baseConfigKeys.contains(key)) {
-            String nodeId = System.getenv(NODE_ID_ENV_KEY);
+            String nodeId = System.getenv(ENV_NODE_CONTROLLER_ID_KEY);
             if (nodeId == null) {
                 //nodeId = PRIMARY_NODE_KEY;
                 configAppendix = BASE_PREFIX + SLASH + PRIMARY_NODE_KEY;
diff --git a/streampipes-connect-container-master/development/env b/streampipes-connect-container-master/development/env
deleted file mode 100644
index e69de29..0000000
diff --git a/streampipes-connect-container-master/src/main/java/org/apache/streampipes/connect/container/master/init/AdapterMasterContainer.java b/streampipes-connect-container-master/src/main/java/org/apache/streampipes/connect/container/master/init/AdapterMasterContainer.java
deleted file mode 100644
index e69de29..0000000
diff --git a/streampipes-container-standalone/src/main/java/org/apache/streampipes/container/standalone/init/StandaloneModelSubmitter.java b/streampipes-container-standalone/src/main/java/org/apache/streampipes/container/standalone/init/StandaloneModelSubmitter.java
index 0d28466..9de5af6 100644
--- a/streampipes-container-standalone/src/main/java/org/apache/streampipes/container/standalone/init/StandaloneModelSubmitter.java
+++ b/streampipes-container-standalone/src/main/java/org/apache/streampipes/container/standalone/init/StandaloneModelSubmitter.java
@@ -58,7 +58,7 @@ public abstract class StandaloneModelSubmitter extends ModelSubmitter {
         app.run();
 
         // check wether pipeline element is managed by node controller
-        if (System.getenv("SP_NODE_ID") != null) {
+        if (System.getenv("SP_NODE_CONTROLLER_ID") != null) {
             // secondary
             // register pipeline element service via node controller
             NodeControllerUtil.register(
diff --git a/streampipes-container/src/main/java/org/apache/streampipes/container/util/ConsulUtil.java b/streampipes-container/src/main/java/org/apache/streampipes/container/util/ConsulUtil.java
index d90df3d..a2f1b45 100644
--- a/streampipes-container/src/main/java/org/apache/streampipes/container/util/ConsulUtil.java
+++ b/streampipes-container/src/main/java/org/apache/streampipes/container/util/ConsulUtil.java
@@ -18,8 +18,7 @@
 
 package org.apache.streampipes.container.util;
 
-import com.google.gson.Gson;
-import com.google.gson.JsonElement;
+import com.fasterxml.jackson.core.JsonProcessingException;
 import com.orbitz.consul.AgentClient;
 import com.orbitz.consul.Consul;
 import com.orbitz.consul.HealthClient;
@@ -31,11 +30,12 @@ import com.orbitz.consul.model.health.ServiceHealth;
 import com.orbitz.consul.model.kv.Value;
 import org.apache.http.client.fluent.Request;
 import org.apache.http.entity.StringEntity;
-import org.apache.streampipes.config.consul.ConsulSpConfig;
 import org.apache.streampipes.config.model.ConfigItem;
+import org.apache.streampipes.container.model.PeConfig;
 import org.apache.streampipes.container.model.consul.ConsulServiceRegistrationBody;
 import org.apache.streampipes.container.model.consul.HealthCheckConfiguration;
 import org.apache.streampipes.sdk.helpers.Tuple2;
+import org.apache.streampipes.serializers.json.JacksonSerializer;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -49,104 +49,201 @@ import java.util.HashMap;
 import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
-import java.util.stream.Collectors;
 
 public class ConsulUtil {
 
-  private static final String PROTOCOL = "http://";
+  private static final Logger LOG = LoggerFactory.getLogger(ConsulUtil.class);
 
+  private static final String HTTP_PROTOCOL = "http://";
+  private static final String COLON = ":";
+  private static final String SLASH = "/";
   private static final String HEALTH_CHECK_INTERVAL = "10s";
-  //private static final String HEALTH_CHECK_TTL = "15s";
-  //private static final String CONSUL_DEREGISTER_SERIVER_AFTER = "10s";
-  private static final String PE_SERVICE_NAME = "pe";
-  private static final String NODE_SERVICE_NAME = "node";
-
+  private static final String PE_SVC_TAG = "pe";
+  private static final String NODE_SVC_TAG = "node";
   private static final String CONSUL_ENV_LOCATION = "CONSUL_LOCATION";
+  private static final int CONSUL_DEFAULT_PORT = 8500;
+  private static final String CONSUL_NAMESPACE = "/sp/v1/";
   private static final String CONSUL_URL_REGISTER_SERVICE = "v1/agent/service/register";
-
-  private static final String PRIMARY_PE_IDENTIFIER = "primary";
-  private static final String SECONDARY_PE_IDENTIFIER = "secondary";
-  private static final String NODE_ID_IDENTIFIER = "SP_NODE_ID";
-  private static final String SLASH = "/";
-
-  static Logger LOG = LoggerFactory.getLogger(ConsulUtil.class);
+  private static final String PRIMARY_PE_IDENTIFIER_TAG = "primary";
+  private static final String SECONDARY_PE_IDENTIFIER_TAG = "secondary";
+  private static final String ENV_NODE_CONTROLLER_ID_KEY = "SP_NODE_CONTROLLER_ID";
 
   public static Consul consulInstance() {
     return Consul.builder().withUrl(consulURL()).build();
   }
 
-  public static void registerPeService(String serviceID, String url, int port) {
-    String serviceLocationTag = System.getenv(NODE_ID_IDENTIFIER) == null ? PRIMARY_PE_IDENTIFIER : SECONDARY_PE_IDENTIFIER;
-    String uniquePEServiceId = url + SLASH + serviceID;
-    registerService(PE_SERVICE_NAME, uniquePEServiceId, url, port, Arrays.asList("pe", serviceLocationTag));
+  /**
+   * Method called by {@link org.apache.streampipes.container.standalone.init.StandaloneModelSubmitter} to register
+   * new pipeline element service endpoint.
+   *
+   * @param svcId unique service id
+   * @param host  host address of pipeline element service endpoint
+   * @param port  port of pipeline element service endpoint
+   */
+  public static void registerPeService(String svcId, String host, int port) {
+    registerService(PE_SVC_TAG, makeUniqueSvcId(host, svcId), host, port, makeSvcTags());
   }
 
-  public static void registerService(String serviceName, String serviceID, String url, int port, String tag) {
-    registerService(serviceName, serviceID, url, port, Collections.singletonList(tag));
+  /**
+   * Method called by {@link org.apache.streampipes.node.controller.container.NodeControllerContainerInit} to
+   * register new node controller service endpoint.
+   *
+   * @param svcId unique service id
+   * @param host  host address of node controller service endpoint
+   * @param port  port of node controller service endpoint
+   */
+  public static void registerNodeService(String svcId, String host, int port) {
+    registerService(NODE_SVC_TAG, makeUniqueSvcId(host, svcId), host, port, Collections.singletonList(NODE_SVC_TAG));
   }
 
-  public static void registerService(String serviceName, String serviceID, String url, int port, List<String> tag) {
-    String body = createServiceRegisterBody(serviceName, serviceID, url, port, tag);
-    try {
-      registerServiceHttpClient(body);
-      LOG.info("Register service " + serviceID +" successful");
-    } catch (IOException e) {
-      LOG.error("Register service: " + serviceID, " - " + e.toString());
+  /**
+   * Register service at Consul.
+   *
+   * @param svcGroup  service group for registered service
+   * @param svcId     unique service id
+   * @param host      host address of service endpoint
+   * @param port      port of service endpoint
+   * @param tags      tags of service
+   */
+  public static void registerService(String svcGroup, String svcId, String host, int port, List<String> tags) {
+    boolean connected = false;
+
+    while (!connected) {
+      LOG.info("Trying to register service at Consul: " + svcId);
+      ConsulServiceRegistrationBody svcRegistration = createRegistrationBody(svcGroup, svcId, host, port, tags);
+      connected = registerServiceHttpClient(svcRegistration);
+
+      if (!connected) {
+        LOG.info("Retrying in 1 second");
+        try {
+          Thread.sleep(1000);
+        } catch (InterruptedException e) {
+          e.printStackTrace();
+        }
+      }
     }
+    LOG.info("Successfully registered service at Consul: " + svcId);
   }
 
-  public static void registerNodeControllerService(String serviceID, String url, int port) {
-    String uniqueNodeServiceId = url + SLASH + serviceID;
-    registerService(NODE_SERVICE_NAME, uniqueNodeServiceId, url, port, "node");
-  }
-
-  //NOT TESTED
- /*   public static void subcribeHealthService() {
-        Consul consul = consulInstance();
-        HealthClient healthClient = consul.healthClient();
-        Agent agent = consul.agentClient().getAgent();
-
+  /**
+   * PUT REST call to Consul API to register new service.
+   *
+   * @param svcRegistration   service registration object used to register service endpoint
+   * @return                  success or failure of service registration
+   */
+  private static boolean registerServiceHttpClient(ConsulServiceRegistrationBody svcRegistration) {
+    try {
+      String endpoint = makeConsulEndpoint();
+      String body = JacksonSerializer.getObjectMapper().writeValueAsString(svcRegistration);
 
-        ServiceHealthCache svHealth = ServiceHealthCache.newCache(healthClient, PE_SERVICE_NAME);
+      Request.Put(endpoint)
+              .addHeader("accept", "application/json")
+              .body(new StringEntity(body))
+              .execute();
 
-        svHealth.addListener(new ConsulCache.Listener<ServiceHealthKey, ServiceHealth>() {
-            @Override
-            public void notify(Map<ServiceHealthKey, ServiceHealth> map) {
-                System.out.println("ad");
-            }
-        });
+      return true;
+    } catch (IOException e) {
+      LOG.error("Could not register service at Consul");
     }
-    */
+    return false;
+  }
 
-  public static Map<String, Tuple2<String, String>> getPEServices() {
-    LOG.info("Load PE service status");
+  // GET methods
+
+  /**
+   * Get all pipeline element service endpoints
+   *
+   * @return list of pipline element service endpoints
+   */
+  public static Map<String, Tuple2<String, String>> getPeServices() {
+    LOG.info("Load pipeline element service status");
     Consul consul = consulInstance();
     AgentClient agent = consul.agentClient();
 
     Map<String, Service> services = consul.agentClient().getServices();
     Map<String, HealthCheck> checks = agent.getChecks();
 
-    Map<String, Tuple2<String,String>> peServices = new HashMap<>();
+    Map<String, Tuple2<String,String>> peSvcs = new HashMap<>();
 
     for (Map.Entry<String, Service> entry : services.entrySet()) {
-      if (entry.getValue().getTags().contains(PE_SERVICE_NAME)) {
-        String serviceId = entry.getValue().getId();
-        String serviceStatus = "critical";
-        String serviceTag = "primary";
+      if (entry.getValue().getTags().contains(PE_SVC_TAG)) {
+        String svcId = entry.getValue().getId();
+
+        String svcStatus = "critical";
+        String svcTag = PRIMARY_PE_IDENTIFIER_TAG;
+
         if (checks.containsKey("service:" + entry.getKey())) {
-          serviceStatus = checks.get("service:" + entry.getKey()).getStatus();
-          if (checks.get("service:" + entry.getKey()).getServiceTags().stream().noneMatch(e -> e.contains("primary"))) {
-            serviceTag = "secondary";
+          svcStatus = checks.get("service:" + entry.getKey()).getStatus();
+          if (checks.get("service:" + entry.getKey())
+                  .getServiceTags().stream().noneMatch(e -> e.contains(PRIMARY_PE_IDENTIFIER_TAG))) {
+            svcTag = SECONDARY_PE_IDENTIFIER_TAG;
           }
         }
 
-        LOG.info("Service id: " + serviceId + " service status: " + serviceStatus + " service tag: " + serviceTag);
-        peServices.put(serviceId, new Tuple2(serviceStatus, serviceTag));
+        LOG.info("Service id: " + svcId + " service status: " + svcStatus + " service tag: " + svcTag);
+        peSvcs.put(svcId, new Tuple2<>(svcStatus, svcTag));
       }
     }
-    return peServices;
+    return peSvcs;
+  }
+
+  /**
+   * Get active pipeline element service endpoints
+   *
+   * @return list of pipeline element endpoints
+   */
+  public static List<String> getActivePeEndpoints() {
+    LOG.info("Load active pipeline element service endpoints");
+    return getServiceEndpoints(PE_SVC_TAG, true, Collections.singletonList(PRIMARY_PE_IDENTIFIER_TAG));
   }
 
+  /**
+   * Get active node controller service endpoints
+   *
+   * @return list of active node controller endpoints
+   */
+  public static List<String> getActiveNodeEndpoints() {
+    LOG.info("Load active node service endpoints");
+    return getServiceEndpoints(NODE_SVC_TAG, true, new ArrayList<>());
+  }
+
+
+  /**
+   * Get service endpoints
+   *
+   * @param svcGroup            service group for registered service
+   * @param restrictToHealthy   retrieve healthy or all registered services for a service group
+   * @param filterByTags        filter param to filter list of registered services
+   * @return                    list of services
+   */
+  public static List<String> getServiceEndpoints(String svcGroup, boolean restrictToHealthy,
+                                                 List<String> filterByTags) {
+    Consul consul = consulInstance();
+    HealthClient healthClient = consul.healthClient();
+    List<String> endpoints = new LinkedList<>();
+    List<ServiceHealth> nodes;
+
+    if (!restrictToHealthy) {
+      nodes = healthClient.getAllServiceInstances(svcGroup).getResponse();
+    } else {
+      nodes = healthClient.getHealthyServiceInstances(svcGroup).getResponse();
+    }
+    for (ServiceHealth node : nodes) {
+      if (node.getService().getTags().containsAll(filterByTags)) {
+        String endpoint = node.getService().getAddress() + ":" + node.getService().getPort();
+        LOG.info("Active " + svcGroup + " endpoint: " + endpoint);
+        endpoints.add(endpoint);
+      }
+    }
+    return endpoints;
+  }
+
+  /**
+   * Get key-value entries for a given route
+   *
+   * @param route route to retrieve key-value entries in Consul
+   * @return      key-value entries
+   */
   public static Map<String, String> getKeyValue(String route) {
     Consul consul = consulInstance();
     KeyValueClient keyValueClient = consul.keyValueClient();
@@ -162,147 +259,80 @@ public class ConsulUtil {
         if (value.getValueAsString().isPresent()) {
           v = value.getValueAsString().get();
         }
-        LOG.info("Load key: " + route + " value: " + v);
         keyValues.put(key, v);
       }
     }
     return keyValues;
   }
 
-
-  public static int getIntValue(String route) {
-    String value = getKeyValue(route)
-            .values()
-            .stream()
-            .findFirst()
-            .get();
-
-//    List<String> list = ConsulUtil.getKeyValue(route)
-//            .entrySet()
-//            .stream()
-//            .map(m -> {
-//              return new Gson().fromJson(m.getValue(), ConfigItem.class).getValue();
-//            })
-//            .collect(Collectors.toList());
-
-    return Integer.parseInt(new Gson().fromJson(value, ConfigItem.class).getValue());
+  /**
+   * Get specific value for a key in route
+   *
+   * @param route   route to retrieve value
+   * @param type    data type of return value, e.g. Integer.class, String.class
+   * @return        value for key
+   */
+  public static <T> T getValueForRoute(String route, Class<T> type) {
+    try {
+      String entry = getKeyValue(route)
+              .values()
+              .stream()
+              .findFirst()
+              .orElse(null);
+
+      if (type.equals(Integer.class)) {
+        return (T) Integer.valueOf(JacksonSerializer.getObjectMapper().readValue(entry, ConfigItem.class).getValue());
+      } else if (type.equals(Boolean.class)) {
+        return (T) Boolean.valueOf(JacksonSerializer.getObjectMapper().readValue(entry, ConfigItem.class).getValue());
+      } else {
+        return type.cast(JacksonSerializer.getObjectMapper().readValue(entry, ConfigItem.class).getValue());
+      }
+    } catch (JsonProcessingException e) {
+      e.printStackTrace();
+    }
+    throw new IllegalArgumentException("Cannot get entry from Consul");
   }
 
-  public static String getStringValue(String route) {
-    String value = getKeyValue(route)
-            .values()
-            .stream()
-            .findFirst()
-            .get();
-
-    return new Gson().fromJson(value, ConfigItem.class).getValue();
-  }
+  // Update
 
+  /**
+   * Update key-value config in Consul
+   *
+   * @param key       key to be updated
+   * @param entry     new entry
+   * @param password  wether value is a password, here only non-sensitive values are updated
+   */
   public static void updateConfig(String key, String entry, boolean password) {
     Consul consul = consulInstance();
-    KeyValueClient keyValueClient = consul.keyValueClient();
-
     if (!password) {
-      keyValueClient.putValue(key, entry);
+      LOG.info("Updated config - key:" + key + " value: " + entry);
+      consul.keyValueClient().putValue(key, entry);
     }
-
-//        keyValueClient.putValue(key + "_description", description);
-//        keyValueClient.putValue(key + "_type", valueType);
-    LOG.info("Updated config - key:" + key +
-            " value: " + entry);
-//        +
-//                " description: " + description +
-//                " type: " + valueType);
   }
 
-  public static List<String> getActivePEServicesEndPoints() {
-    LOG.info("Load active PE service endpoints");
-    return getServiceEndpoints(PE_SERVICE_NAME, true, Collections.singletonList(PRIMARY_PE_IDENTIFIER));
-  }
-
-  public static List<String> getActiveNodeEndpoints() {
-    LOG.info("Load active node service endpoints");
-    // TODO set restrictToHealthy to true, this is just for debugging
-    return getServiceEndpoints(NODE_SERVICE_NAME, false, new ArrayList<>());
-  }
-
-  public static List<String> getServiceEndpoints(String serviceGroup, boolean restrictToHealthy,
-                                                 List<String> filterByTags) {
+  /**
+   * Deregister registered service endpoint in Consul
+   *
+   * @param svcId     service id of endpoint to be deregistered
+   */
+  public static void deregisterService(String svcId) {
     Consul consul = consulInstance();
-    HealthClient healthClient = consul.healthClient();
-    List<String> endpoints = new LinkedList<>();
-
-    List<ServiceHealth> nodes;
-    if (!restrictToHealthy) {
-      nodes = healthClient.getAllServiceInstances(serviceGroup).getResponse();
-    } else {
-      nodes = healthClient.getHealthyServiceInstances(serviceGroup).getResponse();
-    }
-    for (ServiceHealth node : nodes) {
-      if (node.getService().getTags().containsAll(filterByTags)) {
-        String endpoint = node.getService().getAddress() + ":" + node.getService().getPort();
-        LOG.info("Active " + serviceGroup + " endpoint: " + endpoint);
-        endpoints.add(endpoint);
-      }
-    }
-    return endpoints;
+    LOG.info("Deregister service: " + svcId);
+    consul.agentClient().deregister(svcId);
   }
 
-  public static void deregisterService(String serviceId) {
+  /**
+   * Delete config in Consul
+   *
+   * @param key     key to be deleted
+   */
+  public static void deleteConfig(String key) {
     Consul consul = consulInstance();
-
-    LOG.info("Deregister Service: " + serviceId);
-    consul.agentClient().deregister(serviceId);
+    LOG.info("Delete config: {}", key);
+    consul.keyValueClient().deleteKeys(CONSUL_NAMESPACE + key);
   }
 
-  public static void deleteKeys(String serviceId) {
-    Consul consul = consulInstance();
-
-    LOG.info("Delete keys: {}", serviceId);
-    // TODO: namespace should not be hardcoded
-    consul.keyValueClient().deleteKeys("/sp/v1/" + serviceId);
-  }
-
-  private static int registerServiceHttpClient(String body) throws IOException {
-    return Request.Put(consulURL().toString() + "/" + CONSUL_URL_REGISTER_SERVICE)
-            .addHeader("accept", "application/json")
-            .body(new StringEntity(body))
-            .execute()
-            .returnResponse()
-            .getStatusLine().getStatusCode();
-  }
-
-  private static String createServiceRegisterBody(String name, String id, String url, int port, List<String> tags) {
-    String healthCheckURL = PROTOCOL + url + ":" + port;
-    ConsulServiceRegistrationBody body = new ConsulServiceRegistrationBody();
-    body.setID(id);
-    body.setName(name);
-    body.setTags(tags);
-    body.setAddress(PROTOCOL + url);
-    body.setPort(port);
-    body.setEnableTagOverride(true);
-    body.setCheck(new HealthCheckConfiguration("GET", healthCheckURL, HEALTH_CHECK_INTERVAL));
-
-    return new Gson().toJson(body);
-
-//    return "{" +
-//            "\"ID\": \"" + id + "\"," +
-//            "\"Name\": \"" + name + "\"," +
-//            "\"Tags\": [" +
-//            "    \"" + tag + "\"" + ",\"urlprefix-/" + id + " strip=/" + id + "\"" +
-//            " ]," +
-//            " \"Address\": \"" + PROTOCOL + url + "\"," +
-//            " \"Port\":" + port + "," +
-//            " \"EnableTagOverride\": true" + "," +
-//            "\"Check\": {" +
-//            " \"Method\": \"GET\"" + "," +
-//            " \"http\":" + "\"" + healthCheckURL + "\"," +
-//            //  " \"DeregisterCriticalServiceAfter\":" +  "\"" + CONSUL_DEREGISTER_SERIVER_AFTER + "\"," +
-//            " \"interval\":" + "\"" + HEALTH_CHECK_INTERVAL + "\"" + //"," +
-//            //" \"TTL\":" + "\"" + HEALTH_CHECK_TTL + "\"" +
-//            " }" +
-//            "}";
-  }
+  // Helper methods
 
   private static URL consulURL() {
     Map<String, String> env = System.getenv();
@@ -310,17 +340,45 @@ public class ConsulUtil {
 
     if (env.containsKey(CONSUL_ENV_LOCATION)) {
       try {
-        url = new URL("http", env.get(CONSUL_ENV_LOCATION), 8500, "");
+        url = new URL("http", env.get(CONSUL_ENV_LOCATION), CONSUL_DEFAULT_PORT, "");
       } catch (MalformedURLException e) {
         e.printStackTrace();
       }
     } else {
       try {
-        url = new URL("http", "localhost", 8500, "");
+        url = new URL("http", "localhost", CONSUL_DEFAULT_PORT, "");
       } catch (MalformedURLException e) {
         e.printStackTrace();
       }
     }
     return url;
   }
+
+  private static ConsulServiceRegistrationBody createRegistrationBody(String svcGroup, String id, String host,
+                                                                      int port, List<String> tags) {
+    ConsulServiceRegistrationBody body = new ConsulServiceRegistrationBody();
+    body.setID(id);
+    body.setName(svcGroup);
+    body.setTags(tags);
+    body.setAddress(HTTP_PROTOCOL + host);
+    body.setPort(port);
+    body.setEnableTagOverride(true);
+    body.setCheck(new HealthCheckConfiguration("GET",
+            (HTTP_PROTOCOL + host + COLON + port), HEALTH_CHECK_INTERVAL));
+
+    return body;
+  }
+
+  private static String makeUniqueSvcId(String host, String serviceID) {
+    return host + SLASH + serviceID;
+  }
+
+  private static List<String> makeSvcTags() {
+    return Arrays.asList(PE_SVC_TAG, System.getenv(ENV_NODE_CONTROLLER_ID_KEY) == null ?
+            PRIMARY_PE_IDENTIFIER_TAG : SECONDARY_PE_IDENTIFIER_TAG);
+  }
+
+  private static String makeConsulEndpoint() {
+    return consulURL().toString() + "/" + CONSUL_URL_REGISTER_SERVICE;
+  }
 }
diff --git a/streampipes-container/src/main/java/org/apache/streampipes/container/util/NodeControllerUtil.java b/streampipes-container/src/main/java/org/apache/streampipes/container/util/NodeControllerUtil.java
index fd99736..db959d2 100644
--- a/streampipes-container/src/main/java/org/apache/streampipes/container/util/NodeControllerUtil.java
+++ b/streampipes-container/src/main/java/org/apache/streampipes/container/util/NodeControllerUtil.java
@@ -33,28 +33,27 @@ import java.util.*;
 public class NodeControllerUtil {
     static Logger LOG = LoggerFactory.getLogger(NodeControllerUtil.class);
 
-    private static final String PROTOCOL = "http://";
+    private static final String HTTP_PROTOCOL = "http://";
     private static final String COLON = ":";
     private static final String SLASH = "/";
     private static final String HEALTH_CHECK_INTERVAL = "10s";
-    private static final String PE_SERVICE_NAME = "pe";
-    private static final String PE_SECONDARY_TAG = "secondary";
-    private static final String NODE_CONTROLLER_REGISTER_SVC_URL = "node/container/register";
+    private static final String PE_TAG = "pe";
+    private static final String SECONDARY_PE_IDENTIFIER_TAG = "secondary";
+    private static final String NODE_CONTROLLER_REGISTER_SVC_URL = "api/v2/node/container/register";
 
     public static void register(String serviceID, String host, int port,
                                 Map<String, SemanticEventProcessingAgentDeclarer> epaDeclarers) {
-        register(PE_SERVICE_NAME, makeSvcId(host, serviceID), host, port,
-                Arrays.asList(PE_SERVICE_NAME, PE_SECONDARY_TAG), epaDeclarers);
+        register(PE_TAG, makeSvcId(host, serviceID), host, port,
+                Arrays.asList(PE_TAG, SECONDARY_PE_IDENTIFIER_TAG), epaDeclarers);
     }
 
-    public static void register(String svcName, String svcId, String url, int port, List<String> tag,
+    public static void register(String svcName, String svcId, String host, int port, List<String> tag,
                                 Map<String, SemanticEventProcessingAgentDeclarer> epaDeclarers) {
-
         boolean connected = false;
 
         while (!connected) {
             LOG.info("Trying to register pipeline element container at node controller: " + makeRegistrationEndpoint());
-            String body = createSvcBody(svcName, svcId, url, port, tag, epaDeclarers);
+            String body = createSvcBody(svcName, svcId, host, port, tag, epaDeclarers);
             connected = registerSvcHttpClient(body);
 
             if (!connected) {
@@ -84,15 +83,15 @@ public class NodeControllerUtil {
         return false;
     }
 
-    private static String createSvcBody(String name, String id, String url, int port, List<String> tags,
+    private static String createSvcBody(String name, String id, String host, int port, List<String> tags,
                                         Map<String, SemanticEventProcessingAgentDeclarer> epaDeclarers) {
         try {
             ConsulServiceRegistrationBody body = new ConsulServiceRegistrationBody();
-            String healthCheckURL = PROTOCOL + url + COLON + port;
+            String healthCheckURL = HTTP_PROTOCOL + host + COLON + port;
             body.setID(id);
             body.setName(name);
             body.setTags(tags);
-            body.setAddress(PROTOCOL + url);
+            body.setAddress(HTTP_PROTOCOL + host);
             body.setPort(port);
             body.setEnableTagOverride(true);
             body.setCheck(new HealthCheckConfiguration("GET", healthCheckURL, HEALTH_CHECK_INTERVAL));
@@ -110,14 +109,14 @@ public class NodeControllerUtil {
 
     private static String makeRegistrationEndpoint() {
         if (System.getenv("SP_NODE_CONTROLLER_HOST") != null) {
-            return PROTOCOL
+            return HTTP_PROTOCOL
                     + System.getenv("SP_NODE_CONTROLLER_HOST")
                     + COLON
                     + System.getenv("SP_NODE_CONTROLLER_PORT")
                     + SLASH
                     + NODE_CONTROLLER_REGISTER_SVC_URL;
         } else {
-            return PROTOCOL
+            return HTTP_PROTOCOL
                     + "localhost"
                     + COLON
                     + "7077"
diff --git a/streampipes-model/src/main/java/org/apache/streampipes/model/base/InvocableStreamPipesEntity.java b/streampipes-model/src/main/java/org/apache/streampipes/model/base/InvocableStreamPipesEntity.java
index 3d86c5b..8d38c28 100644
--- a/streampipes-model/src/main/java/org/apache/streampipes/model/base/InvocableStreamPipesEntity.java
+++ b/streampipes-model/src/main/java/org/apache/streampipes/model/base/InvocableStreamPipesEntity.java
@@ -129,25 +129,6 @@ public abstract class InvocableStreamPipesEntity extends NamedStreamPipesEntity
     }
   }
 
-//  TODO: delete if not needed after merge
-//  public InvocableStreamPipesEntity(ConsumableStreamPipesEntity entityDescription) {
-//    super();
-//    this.setName(entityDescription.getName());
-//    this.setDescription(entityDescription.getDescription());
-//    this.setIconUrl(entityDescription.getIconUrl());
-//    this.setInputStreams(entityDescription.getSpDataStreams());
-//    this.setSupportedGrounding(entityDescription.getSupportedGrounding());
-//    this.setStaticProperties(entityDescription.getStaticProperties());
-//    this.setBelongsTo(entityDescription.getElementId());
-//    this.setStreamRequirements(entityDescription.getSpDataStreams());
-//    this.setAppId(entityDescription.getAppId());
-//    this.setIncludesAssets(entityDescription.isIncludesAssets());
-//
-//    this.setElementEndpointHostname(entityDescription.getElementEndpointHostname());
-//    this.setElementEndpointPort(entityDescription.getElementEndpointPort());
-//    this.setElementEndpointServiceName(entityDescription.getElementEndpointServiceName());
-//  }
-
   public InvocableStreamPipesEntity(String uri, String name, String description, String iconUrl) {
     super(uri, name, description, iconUrl);
     this.configured = false;
diff --git a/streampipes-model/src/main/java/org/apache/streampipes/model/graph/DataProcessorInvocation.java b/streampipes-model/src/main/java/org/apache/streampipes/model/graph/DataProcessorInvocation.java
index fbaafe7..e224806 100644
--- a/streampipes-model/src/main/java/org/apache/streampipes/model/graph/DataProcessorInvocation.java
+++ b/streampipes-model/src/main/java/org/apache/streampipes/model/graph/DataProcessorInvocation.java
@@ -71,6 +71,9 @@ public class DataProcessorInvocation extends InvocableStreamPipesEntity implemen
     this.setAppId(sepa.getAppId());
     this.setIncludesAssets(sepa.isIncludesAssets());
     this.setElementId(RdfIdGenerator.makeRdfId(this));
+    this.setElementEndpointServiceName(sepa.getElementEndpointServiceName());
+    this.setElementEndpointHostname(sepa.getElementEndpointHostname());
+    this.setElementEndpointPort(sepa.getElementEndpointPort());
     //this.setUri(belongsTo +"/" +getElementId());
   }
 
diff --git a/streampipes-model/src/main/java/org/apache/streampipes/model/graph/DataSinkInvocation.java b/streampipes-model/src/main/java/org/apache/streampipes/model/graph/DataSinkInvocation.java
index a07dc21..17449f7 100644
--- a/streampipes-model/src/main/java/org/apache/streampipes/model/graph/DataSinkInvocation.java
+++ b/streampipes-model/src/main/java/org/apache/streampipes/model/graph/DataSinkInvocation.java
@@ -62,6 +62,9 @@ public class DataSinkInvocation extends InvocableStreamPipesEntity {
     this.setAppId(sec.getAppId());
     this.setIncludesAssets(sec.isIncludesAssets());
     this.setElementId(RdfIdGenerator.makeRdfId(this));
+    this.setElementEndpointServiceName(sec.getElementEndpointServiceName());
+    this.setElementEndpointHostname(sec.getElementEndpointHostname());
+    this.setElementEndpointPort(sec.getElementEndpointPort());
     //this.setUri(belongsTo +"/" +getElementId());
   }
 
diff --git a/streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/NodeControllerContainerInit.java b/streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/NodeControllerContainerInit.java
index ece533b..4ff12ac 100644
--- a/streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/NodeControllerContainerInit.java
+++ b/streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/NodeControllerContainerInit.java
@@ -1,4 +1,3 @@
-package org.apache.streampipes.node.controller.container;
 /*
  * Licensed to the Apache Software Foundation (ASF) under one or more
  * contributor license agreements.  See the NOTICE file distributed with
@@ -16,16 +15,15 @@ package org.apache.streampipes.node.controller.container;
  * limitations under the License.
  *
  */
+package org.apache.streampipes.node.controller.container;
 
 import org.apache.streampipes.container.util.ConsulUtil;
-import org.apache.streampipes.model.node.PipelineElementDockerContainer;
 import org.apache.streampipes.node.controller.container.management.orchestrator.docker.DockerContainerOrchestrator;
-import org.apache.streampipes.node.controller.container.management.pe.PipelineElementManager;
 import org.apache.streampipes.node.controller.container.rest.NodeControllerResourceConfig;
 import org.apache.streampipes.node.controller.container.config.NodeControllerConfig;
 import org.apache.streampipes.node.controller.container.management.info.NodeInfoStorage;
-import org.apache.streampipes.node.controller.container.management.node.NodeJanitorManager;
-import org.apache.streampipes.node.controller.container.management.resource.ResourceManager;
+import org.apache.streampipes.node.controller.container.management.janitor.NodeJanitorManager;
+import org.apache.streampipes.node.controller.container.management.resources.ResourceManager;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.springframework.boot.SpringApplication;
@@ -67,7 +65,7 @@ public class NodeControllerContainerInit {
         }
 
         // registration with consul here
-        ConsulUtil.registerNodeControllerService(
+        ConsulUtil.registerNodeService(
                 nodeConfig.getNodeServiceId(),
                 nodeConfig.getNodeHostName(),
                 nodeConfig.getNodeControllerPort()
diff --git a/streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/config/NodeControllerConfig.java b/streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/config/NodeControllerConfig.java
index 139d88d..4368d6d 100644
--- a/streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/config/NodeControllerConfig.java
+++ b/streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/config/NodeControllerConfig.java
@@ -1,4 +1,4 @@
-package org.apache.streampipes.node.controller.container.config;/*
+/*
  * Licensed to the Apache Software Foundation (ASF) under one or more
  * contributor license agreements.  See the NOTICE file distributed with
  * this work for additional information regarding copyright ownership.
@@ -15,6 +15,7 @@ package org.apache.streampipes.node.controller.container.config;/*
  * limitations under the License.
  *
  */
+package org.apache.streampipes.node.controller.container.config;
 
 import org.apache.streampipes.config.SpConfig;
 import org.apache.streampipes.model.node.resources.interfaces.AccessibleSensorActuatorResource;
@@ -29,7 +30,6 @@ public enum NodeControllerConfig {
 
     private static final String SLASH = "/";
     private static final String NODE_SERVICE_ID = "node/org.apache.streampipes.node.controller";
-
     private static final String DEFAULT_NODE_CONTROLLER_ID = "node-controller";
     private static final int DEFAULT_NODE_CONTROLLER_PORT = 7077;
     private static final String DEFAULT_NODE_TYPE = "edge";
diff --git a/streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/management/IRunningInstances.java b/streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/management/IRunningInstances.java
index e049a80..afb797d 100644
--- a/streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/management/IRunningInstances.java
+++ b/streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/management/IRunningInstances.java
@@ -1,4 +1,4 @@
-package org.apache.streampipes.node.controller.container.management;/*
+/*
  * Licensed to the Apache Software Foundation (ASF) under one or more
  * contributor license agreements.  See the NOTICE file distributed with
  * this work for additional information regarding copyright ownership.
@@ -15,6 +15,7 @@ package org.apache.streampipes.node.controller.container.management;/*
  * limitations under the License.
  *
  */
+package org.apache.streampipes.node.controller.container.management;
 
 public interface IRunningInstances<T> {
 
diff --git a/streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/management/info/NodeInfoStorage.java b/streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/management/info/NodeInfoStorage.java
index 8c6f628..05f5278 100644
--- a/streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/management/info/NodeInfoStorage.java
+++ b/streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/management/info/NodeInfoStorage.java
@@ -1,4 +1,4 @@
-package org.apache.streampipes.node.controller.container.management.info;/*
+/*
  * Licensed to the Apache Software Foundation (ASF) under one or more
  * contributor license agreements.  See the NOTICE file distributed with
  * this work for additional information regarding copyright ownership.
@@ -15,6 +15,7 @@ package org.apache.streampipes.node.controller.container.management.info;/*
  * limitations under the License.
  *
  */
+package org.apache.streampipes.node.controller.container.management.info;
 
 import org.apache.streampipes.model.node.*;
 import org.apache.streampipes.model.node.resources.hardware.HardwareResource;
@@ -46,7 +47,7 @@ public class NodeInfoStorage {
 
     private NodeInfo nodeInfo = new NodeInfo();
 
-    private static DockerInfo DockerInfo = DockerUtils.getInstance().getDockerInfo();
+    private static final DockerInfo DockerInfo = DockerUtils.getInstance().getDockerInfo();
     // OSHI to retreive system information
     private static SystemInfo si = new SystemInfo();
     private static HardwareAbstractionLayer hal = si.getHardware();
diff --git a/streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/management/node/NodeJanitorManager.java b/streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/management/janitor/NodeJanitorManager.java
similarity index 99%
rename from streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/management/node/NodeJanitorManager.java
rename to streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/management/janitor/NodeJanitorManager.java
index 74deddc..31aee7f 100644
--- a/streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/management/node/NodeJanitorManager.java
+++ b/streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/management/janitor/NodeJanitorManager.java
@@ -1,4 +1,4 @@
-package org.apache.streampipes.node.controller.container.management.node;/*
+/*
  * Licensed to the Apache Software Foundation (ASF) under one or more
  * contributor license agreements.  See the NOTICE file distributed with
  * this work for additional information regarding copyright ownership.
@@ -15,6 +15,7 @@ package org.apache.streampipes.node.controller.container.management.node;/*
  * limitations under the License.
  *
  */
+package org.apache.streampipes.node.controller.container.management.janitor;
 
 import org.apache.streampipes.node.controller.container.config.NodeControllerConfig;
 import org.apache.streampipes.node.controller.container.management.orchestrator.docker.DockerUtils;
diff --git a/streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/management/orchestrator/RunningContainerInstances.java b/streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/management/orchestrator/RunningContainerInstances.java
index 58a88b1..e3a09a6 100644
--- a/streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/management/orchestrator/RunningContainerInstances.java
+++ b/streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/management/orchestrator/RunningContainerInstances.java
@@ -1,4 +1,4 @@
-package org.apache.streampipes.node.controller.container.management.orchestrator;/*
+/*
  * Licensed to the Apache Software Foundation (ASF) under one or more
  * contributor license agreements.  See the NOTICE file distributed with
  * this work for additional information regarding copyright ownership.
@@ -15,6 +15,7 @@ package org.apache.streampipes.node.controller.container.management.orchestrator
  * limitations under the License.
  *
  */
+package org.apache.streampipes.node.controller.container.management.orchestrator;
 
 import org.apache.streampipes.model.node.PipelineElementDockerContainer;
 import org.apache.streampipes.node.controller.container.management.IRunningInstances;
diff --git a/streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/management/orchestrator/docker/DockerContainerOrchestrator.java b/streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/management/orchestrator/docker/DockerContainerOrchestrator.java
index 943a5bd..d45afcb 100644
--- a/streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/management/orchestrator/docker/DockerContainerOrchestrator.java
+++ b/streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/management/orchestrator/docker/DockerContainerOrchestrator.java
@@ -1,4 +1,4 @@
-package org.apache.streampipes.node.controller.container.management.orchestrator.docker;/*
+/*
  * Licensed to the Apache Software Foundation (ASF) under one or more
  * contributor license agreements.  See the NOTICE file distributed with
  * this work for additional information regarding copyright ownership.
@@ -15,6 +15,7 @@ package org.apache.streampipes.node.controller.container.management.orchestrator
  * limitations under the License.
  *
  */
+package org.apache.streampipes.node.controller.container.management.orchestrator.docker;
 
 import com.google.common.collect.ImmutableMap;
 import com.google.gson.Gson;
@@ -97,7 +98,7 @@ public class DockerContainerOrchestrator implements ContainerOrchestrator {
 
             // deregister and delete kv pair in service in consul
             ConsulUtil.deregisterService(p.getServiceId());
-            ConsulUtil.deleteKeys(p.getServiceId());
+            ConsulUtil.deleteConfig(p.getServiceId());
 
             ImmutableMap<String, ? extends Serializable> m = ImmutableMap.of(
                     "message",
diff --git a/streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/management/orchestrator/docker/DockerNodeContainer.java b/streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/management/orchestrator/docker/DockerNodeContainer.java
index ed5dd38..b58ba5c 100644
--- a/streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/management/orchestrator/docker/DockerNodeContainer.java
+++ b/streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/management/orchestrator/docker/DockerNodeContainer.java
@@ -1,4 +1,4 @@
-package org.apache.streampipes.node.controller.container.management.orchestrator.docker;/*
+/*
  * Licensed to the Apache Software Foundation (ASF) under one or more
  * contributor license agreements.  See the NOTICE file distributed with
  * this work for additional information regarding copyright ownership.
@@ -15,6 +15,7 @@ package org.apache.streampipes.node.controller.container.management.orchestrator
  * limitations under the License.
  *
  */
+package org.apache.streampipes.node.controller.container.management.orchestrator.docker;
 
 import org.apache.streampipes.model.node.PipelineElementDockerContainer;
 import org.apache.streampipes.model.node.PipelineElementDockerContainerBuilder;
@@ -39,8 +40,8 @@ public enum DockerNodeContainer {
                         .withName("streampipes_pipeline-elements-all-jvm")
                         .withExposedPorts(new String[]{"7023"})
                         .withEnvironmentVariables(Arrays.asList(
-                                "SP_NODE_ID=" + NodeControllerConfig.INSTANCE.getNodeBrokerHost(),
-                                "SP_NODE_CONTROLLER_HOST=" + NodeControllerConfig.INSTANCE.getNodeControllerId(),
+                                "SP_NODE_CONTROLLER_ID=" + NodeControllerConfig.INSTANCE.getNodeControllerId(),
+                                "SP_NODE_CONTROLLER_HOST=" + NodeControllerConfig.INSTANCE.getNodeHostName(),
                                 "SP_NODE_CONTROLLER_PORT=" + NodeControllerConfig.INSTANCE.getNodeControllerPort()
                         ))
                         .withLabels(new HashMap<String,String>(){{
diff --git a/streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/management/orchestrator/docker/DockerUtils.java b/streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/management/orchestrator/docker/DockerUtils.java
index 8793685..3a0a10c 100644
--- a/streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/management/orchestrator/docker/DockerUtils.java
+++ b/streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/management/orchestrator/docker/DockerUtils.java
@@ -1,4 +1,4 @@
-package org.apache.streampipes.node.controller.container.management.orchestrator.docker;/*
+/*
  * Licensed to the Apache Software Foundation (ASF) under one or more
  * contributor license agreements.  See the NOTICE file distributed with
  * this work for additional information regarding copyright ownership.
@@ -15,6 +15,7 @@ package org.apache.streampipes.node.controller.container.management.orchestrator
  * limitations under the License.
  *
  */
+package org.apache.streampipes.node.controller.container.management.orchestrator.docker;
 
 import com.google.gson.JsonObject;
 import com.google.gson.JsonParser;
diff --git a/streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/management/IRunningInstances.java b/streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/management/pe/ElementLifeCyle.java
similarity index 74%
copy from streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/management/IRunningInstances.java
copy to streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/management/pe/ElementLifeCyle.java
index e049a80..7d0a0bb 100644
--- a/streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/management/IRunningInstances.java
+++ b/streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/management/pe/ElementLifeCyle.java
@@ -1,4 +1,4 @@
-package org.apache.streampipes.node.controller.container.management;/*
+/*
  * Licensed to the Apache Software Foundation (ASF) under one or more
  * contributor license agreements.  See the NOTICE file distributed with
  * this work for additional information regarding copyright ownership.
@@ -15,14 +15,18 @@ package org.apache.streampipes.node.controller.container.management;/*
  * limitations under the License.
  *
  */
+package org.apache.streampipes.node.controller.container.management.pe;
 
-public interface IRunningInstances<T> {
+import org.apache.streampipes.container.model.node.InvocableRegistration;
 
-    void add(String id, T value);
+public interface ElementLifeCyle {
 
-    boolean isRunning(String id);
+    void register(InvocableRegistration registration);
 
-    T get(String id);
+    String invoke(String endpoint, String payload);
+
+    String detach(String runningInstanceId);
+
+    void unregister();
 
-    void remove(String id);
 }
diff --git a/streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/management/pe/PipelineElementManager.java b/streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/management/pe/InvocableElementManager.java
similarity index 59%
rename from streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/management/pe/PipelineElementManager.java
rename to streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/management/pe/InvocableElementManager.java
index cc648f4..0cb079e 100644
--- a/streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/management/pe/PipelineElementManager.java
+++ b/streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/management/pe/InvocableElementManager.java
@@ -1,4 +1,4 @@
-package org.apache.streampipes.node.controller.container.management.pe;/*
+/*
  * Licensed to the Apache Software Foundation (ASF) under one or more
  * contributor license agreements.  See the NOTICE file distributed with
  * this work for additional information regarding copyright ownership.
@@ -15,7 +15,9 @@ package org.apache.streampipes.node.controller.container.management.pe;/*
  * limitations under the License.
  *
  */
+package org.apache.streampipes.node.controller.container.management.pe;
 
+import com.google.gson.Gson;
 import org.apache.http.client.fluent.Request;
 import org.apache.http.client.fluent.Response;
 import org.apache.http.entity.ContentType;
@@ -29,84 +31,93 @@ import org.slf4j.LoggerFactory;
 import java.io.IOException;
 import java.util.Collections;
 
-public class PipelineElementManager {
+public class InvocableElementManager implements ElementLifeCyle {
 
     private static final Logger LOG =
-            LoggerFactory.getLogger(PipelineElementManager.class.getCanonicalName());
+            LoggerFactory.getLogger(InvocableElementManager.class.getCanonicalName());
 
-    private static final String PROTOCOL = "http://";
+    private static final String HTTP_PROTOCOL = "http://";
     private static final String COLON = ":";
     private static final String SLASH = "/";
     private static final String ENV_CONSUL_LOCATION = "CONSUL_LOCATION";
     private static final Integer CONNECT_TIMEOUT = 10000;
-    private static PipelineElementManager instance = null;
+    private static InvocableElementManager instance = null;
 
-    private PipelineElementManager() {}
+    private InvocableElementManager() {}
 
-    public static PipelineElementManager getInstance() {
+    public static InvocableElementManager getInstance() {
         if (instance == null) {
-            synchronized (PipelineElementManager.class) {
+            synchronized (InvocableElementManager.class) {
                 if (instance == null)
-                    instance = new PipelineElementManager();
+                    instance = new InvocableElementManager();
             }
         }
         return instance;
     }
 
-    /**
-     * Register pipeline element container
-     *
-     * @param invocableRegistration
-     */
-    public void registerPipelineElements(InvocableRegistration invocableRegistration) {
+    @Override
+    public void register(InvocableRegistration registration) {
         try {
             Request.Put(makeConsulRegistrationEndpoint())
                     .addHeader("accept", "application/json")
                     .body(new StringEntity(JacksonSerializer
                             .getObjectMapper()
-                            .writeValueAsString(invocableRegistration.getConsulServiceRegistrationBody())))
+                            .writeValueAsString(registration.getConsulServiceRegistrationBody())))
                     .execute();
 
             // TODO: persistent storage to survive failures
             NodeInfoStorage.getInstance()
                     .retrieveNodeInfo()
-                    .setSupportedPipelineElementAppIds(invocableRegistration.getSupportedPipelineElementAppIds());
+                    .setSupportedPipelineElementAppIds(registration.getSupportedPipelineElementAppIds());
         } catch (IOException e) {
             e.printStackTrace();
         }
     }
 
-    /**
-     * Invoke pipeline elements when pipeline is started
-     *
-     * @param pipelineElementEndpoint
-     * @param payload
-     * @return
-     */
-    public String invokePipelineElement(String pipelineElementEndpoint, String payload) {
-        LOG.info("Invoking element: {}", pipelineElementEndpoint);
+    @Override
+    public String invoke(String endpoint, String payload) {
+        LOG.info("Invoke pipeline element: {}", endpoint);
         try {
             Response httpResp = Request
-                    .Post(pipelineElementEndpoint)
+                    .Post(endpoint)
                     .bodyString(payload, ContentType.APPLICATION_JSON)
                     .connectTimeout(CONNECT_TIMEOUT)
                     .execute();
-            return httpResp.toString();
+
+            String resp = httpResp.returnContent().asString();
+            org.apache.streampipes.model.Response streamPipesResp = new Gson().fromJson(resp,
+                    org.apache.streampipes.model.Response.class);
+
+            return streamPipesResp.toString();
         } catch (Exception e) {
             LOG.error(e.getMessage());
         }
-        return "";
+        throw new IllegalArgumentException("Failed to invoke pipeline element: " + endpoint);
     }
 
-    /**
-     * detaches pipeline elements when pipeline is stopped
-     */
-    // TODO: implement detach pe logic
-    public void detachPipelineElement() {
+    @Override
+    public String detach(String endpoint) {
+        LOG.info("Detach pipeline element: {}", endpoint);
+        try {
+            Response httpResp = Request
+                    .Delete(endpoint)
+                    .connectTimeout(CONNECT_TIMEOUT)
+                    .execute();
+
+            String resp = httpResp.returnContent().asString();
+            org.apache.streampipes.model.Response streamPipesResp = new Gson().fromJson(resp,
+                    org.apache.streampipes.model.Response.class);
 
+            return streamPipesResp.toString();
+        } catch (Exception e) {
+            LOG.error(e.getMessage());
+        }
+        throw new IllegalArgumentException("Failed to detach pipeline element: " + endpoint);
     }
 
-    public void unregisterPipelineElements(){
+    @Override
+    public void unregister(){
+        // TODO: unregister element from Consul and
         NodeInfoStorage.getInstance()
                 .retrieveNodeInfo()
                 .setSupportedPipelineElementAppIds(Collections.emptyList());
@@ -114,14 +125,14 @@ public class PipelineElementManager {
 
     private String makeConsulRegistrationEndpoint() {
         if (System.getenv(ENV_CONSUL_LOCATION) != null) {
-            return PROTOCOL
+            return HTTP_PROTOCOL
                     + System.getenv(ENV_CONSUL_LOCATION)
                     + COLON
                     + "8500"
                     + SLASH
                     + "v1/agent/service/register";
         } else {
-            return PROTOCOL
+            return HTTP_PROTOCOL
                     + "localhost"
                     + COLON
                     + "8500"
diff --git a/streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/management/relay/RunningRelayInstances.java b/streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/management/pe/RunningInvocableInstances.java
similarity index 66%
copy from streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/management/relay/RunningRelayInstances.java
copy to streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/management/pe/RunningInvocableInstances.java
index 09e373f..3067f71 100644
--- a/streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/management/relay/RunningRelayInstances.java
+++ b/streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/management/pe/RunningInvocableInstances.java
@@ -1,4 +1,4 @@
-package org.apache.streampipes.node.controller.container.management.relay;/*
+/*
  * Licensed to the Apache Software Foundation (ASF) under one or more
  * contributor license agreements.  See the NOTICE file distributed with
  * this work for additional information regarding copyright ownership.
@@ -15,35 +15,36 @@ package org.apache.streampipes.node.controller.container.management.relay;/*
  * limitations under the License.
  *
  */
+package org.apache.streampipes.node.controller.container.management.pe;
 
+import org.apache.streampipes.model.base.InvocableStreamPipesEntity;
 import org.apache.streampipes.node.controller.container.management.IRunningInstances;
 
 import java.util.HashMap;
 import java.util.Map;
 
-public enum RunningRelayInstances implements IRunningInstances<EventRelayManager> {
+public enum RunningInvocableInstances implements IRunningInstances<InvocableStreamPipesEntity> {
     INSTANCE;
 
-    private final Map<String, EventRelayManager> runningInstances = new HashMap<>();
+    private final Map<String, InvocableStreamPipesEntity> runningInvocableInstances = new HashMap<>();
 
-    // TODO: persist active relays to support failure handling
     @Override
-    public void add(String id, EventRelayManager eventRelayManager) {
-        runningInstances.put(id, eventRelayManager);
+    public void add(String id, InvocableStreamPipesEntity value) {
+        runningInvocableInstances.put(id, value);
     }
 
     @Override
     public boolean isRunning(String id) {
-        return runningInstances.get(id) != null;
+        return runningInvocableInstances.get(id) != null;
     }
 
     @Override
-    public EventRelayManager get(String id) {
-        return isRunning(id) ? runningInstances.get(id) : null;
+    public InvocableStreamPipesEntity get(String id) {
+        return runningInvocableInstances.get(id);
     }
 
     @Override
     public void remove(String id) {
-        runningInstances.remove(id);
+        runningInvocableInstances.remove(id);
     }
 }
diff --git a/streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/management/relay/AbstractMqttKafkaBridge.java b/streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/management/relay/AbstractMqttKafkaBridge.java
index af15274..43993e2 100644
--- a/streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/management/relay/AbstractMqttKafkaBridge.java
+++ b/streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/management/relay/AbstractMqttKafkaBridge.java
@@ -1,4 +1,4 @@
-package org.apache.streampipes.node.controller.container.management.relay;/*
+/*
  * Licensed to the Apache Software Foundation (ASF) under one or more
  * contributor license agreements.  See the NOTICE file distributed with
  * this work for additional information regarding copyright ownership.
@@ -15,6 +15,7 @@ package org.apache.streampipes.node.controller.container.management.relay;/*
  * limitations under the License.
  *
  */
+package org.apache.streampipes.node.controller.container.management.relay;
 
 import com.google.gson.Gson;
 import org.apache.streampipes.messaging.kafka.SpKafkaProducer;
diff --git a/streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/management/relay/AbstractMqttKafkaConnector.java b/streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/management/relay/AbstractMqttKafkaConnector.java
index 1d59eb3..2f04f6e 100644
--- a/streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/management/relay/AbstractMqttKafkaConnector.java
+++ b/streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/management/relay/AbstractMqttKafkaConnector.java
@@ -1,4 +1,4 @@
-package org.apache.streampipes.node.controller.container.management.relay;/*
+/*
  * Licensed to the Apache Software Foundation (ASF) under one or more
  * contributor license agreements.  See the NOTICE file distributed with
  * this work for additional information regarding copyright ownership.
@@ -15,6 +15,7 @@ package org.apache.streampipes.node.controller.container.management.relay;/*
  * limitations under the License.
  *
  */
+package org.apache.streampipes.node.controller.container.management.relay;
 
 import com.google.gson.Gson;
 import org.apache.streampipes.messaging.kafka.SpKafkaProducer;
diff --git a/streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/management/relay/AbstractMqttKafkaRelay.java b/streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/management/relay/AbstractMqttKafkaRelay.java
index 41ff66d..768cdc2 100644
--- a/streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/management/relay/AbstractMqttKafkaRelay.java
+++ b/streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/management/relay/AbstractMqttKafkaRelay.java
@@ -1,4 +1,4 @@
-package org.apache.streampipes.node.controller.container.management.relay;/*
+/*
  * Licensed to the Apache Software Foundation (ASF) under one or more
  * contributor license agreements.  See the NOTICE file distributed with
  * this work for additional information regarding copyright ownership.
@@ -15,6 +15,7 @@ package org.apache.streampipes.node.controller.container.management.relay;/*
  * limitations under the License.
  *
  */
+package org.apache.streampipes.node.controller.container.management.relay;
 
 import org.apache.kafka.common.KafkaException;
 import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
diff --git a/streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/management/relay/EventRelayManager.java b/streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/management/relay/EventRelayManager.java
index d3a7af5..c8d0346 100644
--- a/streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/management/relay/EventRelayManager.java
+++ b/streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/management/relay/EventRelayManager.java
@@ -1,4 +1,4 @@
-package org.apache.streampipes.node.controller.container.management.relay;/*
+/*
  * Licensed to the Apache Software Foundation (ASF) under one or more
  * contributor license agreements.  See the NOTICE file distributed with
  * this work for additional information regarding copyright ownership.
@@ -15,6 +15,7 @@ package org.apache.streampipes.node.controller.container.management.relay;/*
  * limitations under the License.
  *
  */
+package org.apache.streampipes.node.controller.container.management.relay;
 
 import org.apache.streampipes.commons.exceptions.SpRuntimeException;
 import org.apache.streampipes.model.base.InvocableStreamPipesEntity;
diff --git a/streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/management/relay/MqttKafkaBridge.java b/streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/management/relay/MqttKafkaBridge.java
index 755715c..d91c151 100644
--- a/streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/management/relay/MqttKafkaBridge.java
+++ b/streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/management/relay/MqttKafkaBridge.java
@@ -1,4 +1,4 @@
-package org.apache.streampipes.node.controller.container.management.relay;/*
+/*
  * Licensed to the Apache Software Foundation (ASF) under one or more
  * contributor license agreements.  See the NOTICE file distributed with
  * this work for additional information regarding copyright ownership.
@@ -15,6 +15,7 @@ package org.apache.streampipes.node.controller.container.management.relay;/*
  * limitations under the License.
  *
  */
+package org.apache.streampipes.node.controller.container.management.relay;
 
 import org.apache.kafka.common.KafkaException;
 import org.apache.streampipes.commons.exceptions.SpRuntimeException;
@@ -28,7 +29,6 @@ import org.slf4j.LoggerFactory;
 
 import java.util.ArrayList;
 
-
 public class MqttKafkaBridge extends AbstractMqttKafkaConnector implements EventRelay<MqttTransportProtocol, KafkaTransportProtocol> {
     private final static Logger LOG = LoggerFactory.getLogger(MqttKafkaBridge.class);
 
diff --git a/streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/management/relay/RunningRelayInstances.java b/streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/management/relay/RunningRelayInstances.java
index 09e373f..fc56a57 100644
--- a/streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/management/relay/RunningRelayInstances.java
+++ b/streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/management/relay/RunningRelayInstances.java
@@ -1,4 +1,4 @@
-package org.apache.streampipes.node.controller.container.management.relay;/*
+/*
  * Licensed to the Apache Software Foundation (ASF) under one or more
  * contributor license agreements.  See the NOTICE file distributed with
  * this work for additional information regarding copyright ownership.
@@ -15,6 +15,7 @@ package org.apache.streampipes.node.controller.container.management.relay;/*
  * limitations under the License.
  *
  */
+package org.apache.streampipes.node.controller.container.management.relay;
 
 import org.apache.streampipes.node.controller.container.management.IRunningInstances;
 
diff --git a/streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/management/relay/model/Metrics.java b/streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/management/relay/model/Metrics.java
index 621be48..4d2dd34 100644
--- a/streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/management/relay/model/Metrics.java
+++ b/streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/management/relay/model/Metrics.java
@@ -1,4 +1,4 @@
-package org.apache.streampipes.node.controller.container.management.relay.model;/*
+/*
  * Licensed to the Apache Software Foundation (ASF) under one or more
  * contributor license agreements.  See the NOTICE file distributed with
  * this work for additional information regarding copyright ownership.
@@ -15,6 +15,7 @@ package org.apache.streampipes.node.controller.container.management.relay.model;
  * limitations under the License.
  *
  */
+package org.apache.streampipes.node.controller.container.management.relay.model;
 
 public class Metrics {
 
diff --git a/streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/management/relay/model/Relay.java b/streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/management/relay/model/Relay.java
index 9a90aca..450cb33 100644
--- a/streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/management/relay/model/Relay.java
+++ b/streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/management/relay/model/Relay.java
@@ -1,4 +1,4 @@
-package org.apache.streampipes.node.controller.container.management.relay.model;/*
+/*
  * Licensed to the Apache Software Foundation (ASF) under one or more
  * contributor license agreements.  See the NOTICE file distributed with
  * this work for additional information regarding copyright ownership.
@@ -15,6 +15,7 @@ package org.apache.streampipes.node.controller.container.management.relay.model;
  * limitations under the License.
  *
  */
+package org.apache.streampipes.node.controller.container.management.relay.model;
 
 public class Relay {
     private final String sourceHost;
diff --git a/streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/management/resource/ResourceManager.java b/streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/management/resources/ResourceManager.java
similarity index 99%
rename from streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/management/resource/ResourceManager.java
rename to streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/management/resources/ResourceManager.java
index 0affe45..c34960d 100644
--- a/streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/management/resource/ResourceManager.java
+++ b/streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/management/resources/ResourceManager.java
@@ -1,4 +1,4 @@
-package org.apache.streampipes.node.controller.container.management.resource;/*
+/*
  * Licensed to the Apache Software Foundation (ASF) under one or more
  * contributor license agreements.  See the NOTICE file distributed with
  * this work for additional information regarding copyright ownership.
@@ -15,6 +15,7 @@ package org.apache.streampipes.node.controller.container.management.resource;/*
  * limitations under the License.
  *
  */
+package org.apache.streampipes.node.controller.container.management.resources;
 
 import com.google.gson.Gson;
 import org.apache.streampipes.node.controller.container.config.NodeControllerConfig;
diff --git a/streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/rest/DebugRelayResource.java b/streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/rest/DebugRelayResource.java
index 00dface..45bbe39 100644
--- a/streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/rest/DebugRelayResource.java
+++ b/streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/rest/DebugRelayResource.java
@@ -1,4 +1,4 @@
-package org.apache.streampipes.node.controller.container.rest;/*
+/*
  * Licensed to the Apache Software Foundation (ASF) under one or more
  * contributor license agreements.  See the NOTICE file distributed with
  * this work for additional information regarding copyright ownership.
@@ -15,6 +15,7 @@ package org.apache.streampipes.node.controller.container.rest;/*
  * limitations under the License.
  *
  */
+package org.apache.streampipes.node.controller.container.rest;
 
 import org.apache.streampipes.commons.exceptions.SpRuntimeException;
 import org.apache.streampipes.node.controller.container.management.relay.EventRelayManager;
@@ -24,11 +25,12 @@ import javax.ws.rs.POST;
 import javax.ws.rs.Path;
 import javax.ws.rs.core.Response;
 
+@Path("/api/v2/relay")
 public class DebugRelayResource extends AbstractNodeContainerResource {
 
     // TODO: Debug-only.
     @POST
-    @Path("/relay/start")
+    @Path("/start")
     public Response debugRelayEventStream(String msg) throws SpRuntimeException {
         // TODO implement
 
@@ -41,7 +43,7 @@ public class DebugRelayResource extends AbstractNodeContainerResource {
     }
 
     @POST
-    @Path("/relay/stop")
+    @Path("/stop")
     public Response debugStopRelayEventStream(String msg) throws SpRuntimeException {
         // TODO implement
 
diff --git a/streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/rest/InfoStatusResource.java b/streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/rest/InfoStatusResource.java
index 40b3812..c02539c 100644
--- a/streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/rest/InfoStatusResource.java
+++ b/streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/rest/InfoStatusResource.java
@@ -1,4 +1,3 @@
-package org.apache.streampipes.node.controller.container.rest;
 /*
  * Licensed to the Apache Software Foundation (ASF) under one or more
  * contributor license agreements.  See the NOTICE file distributed with
@@ -16,8 +15,10 @@ package org.apache.streampipes.node.controller.container.rest;
  * limitations under the License.
  *
  */
+package org.apache.streampipes.node.controller.container.rest;
+
 import org.apache.streampipes.node.controller.container.management.info.NodeInfoStorage;
-import org.apache.streampipes.node.controller.container.management.resource.ResourceManager;
+import org.apache.streampipes.node.controller.container.management.resources.ResourceManager;
 
 import javax.ws.rs.GET;
 import javax.ws.rs.Path;
@@ -25,7 +26,7 @@ import javax.ws.rs.Produces;
 import javax.ws.rs.core.MediaType;
 import javax.ws.rs.core.Response;
 
-@Path("/node")
+@Path("/api/v2/node")
 public class InfoStatusResource extends AbstractNodeContainerResource{
 
     @GET
diff --git a/streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/rest/InvocableManagementResource.java b/streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/rest/InvocableEntityResource.java
similarity index 67%
rename from streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/rest/InvocableManagementResource.java
rename to streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/rest/InvocableEntityResource.java
index e1e302c..f1823fd 100644
--- a/streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/rest/InvocableManagementResource.java
+++ b/streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/rest/InvocableEntityResource.java
@@ -17,16 +17,15 @@
  */
 package org.apache.streampipes.node.controller.container.rest;
 
-import org.apache.streampipes.commons.exceptions.SpRuntimeException;
 import org.apache.streampipes.container.model.node.InvocableRegistration;
 import org.apache.streampipes.container.transform.Transformer;
-import org.apache.streampipes.container.util.Util;
 import org.apache.streampipes.model.base.InvocableStreamPipesEntity;
 import org.apache.streampipes.model.graph.DataProcessorInvocation;
 import org.apache.streampipes.model.graph.DataSinkInvocation;
 import org.apache.streampipes.model.node.PipelineElementDockerContainer;
 import org.apache.streampipes.node.controller.container.management.orchestrator.docker.DockerContainerOrchestrator;
-import org.apache.streampipes.node.controller.container.management.pe.PipelineElementManager;
+import org.apache.streampipes.node.controller.container.management.pe.InvocableElementManager;
+import org.apache.streampipes.node.controller.container.management.pe.RunningInvocableInstances;
 import org.apache.streampipes.node.controller.container.management.relay.EventRelayManager;
 import org.apache.streampipes.node.controller.container.management.relay.RunningRelayInstances;
 import org.apache.streampipes.serializers.json.JacksonSerializer;
@@ -38,9 +37,9 @@ import javax.ws.rs.core.MediaType;
 import javax.ws.rs.core.Response;
 import java.io.IOException;
 
-@Path("/node/container")
-public class InvocableManagementResource<I extends InvocableStreamPipesEntity> extends AbstractNodeContainerResource{
-    private static final Logger LOG = LoggerFactory.getLogger(InvocableManagementResource.class.getCanonicalName());
+@Path("/api/v2/node/container")
+public class InvocableEntityResource<I extends InvocableStreamPipesEntity> extends AbstractNodeContainerResource{
+    private static final Logger LOG = LoggerFactory.getLogger(InvocableEntityResource.class.getCanonicalName());
 
     @GET
     @Produces(MediaType.APPLICATION_JSON)
@@ -64,7 +63,7 @@ public class InvocableManagementResource<I extends InvocableStreamPipesEntity> e
                     .readValue(body, InvocableRegistration.class);
 
             // register pipeline elements at consul and node controller
-            PipelineElementManager.getInstance().registerPipelineElements(invocableRegistration);
+            InvocableElementManager.getInstance().register(invocableRegistration);
             LOG.info("Sucessfully registered pipeline element container");
 
         } catch (IOException e) {
@@ -76,67 +75,68 @@ public class InvocableManagementResource<I extends InvocableStreamPipesEntity> e
     @Path("/invoke/{identifier}/{elementId}")
     @Consumes(MediaType.APPLICATION_JSON)
     @Produces(MediaType.APPLICATION_JSON)
-    public Response invoke(@PathParam("identifier") String identifier, @PathParam("elementId") String elementId, String payload) {
+    public String invoke(@PathParam("identifier") String identifier,
+                           @PathParam("elementId") String elementId, String payload) {
 
         // TODO implement
-        String pipelineElementEndpoint;
+        String endpoint;
         InvocableStreamPipesEntity graph;
         try {
             if (identifier.equals("sepa")) {
                 graph = Transformer.fromJsonLd(DataProcessorInvocation.class, payload);
+                endpoint = graph.getBelongsTo();
 
                 // TODO: start event relay to remote broker
 //                EventRelayManager eventRelayManager = new EventRelayManager(graph);
 //                eventRelayManager.start();
 //                RunningRelayInstances.INSTANCE.add(eventRelayManager.getRelayedTopic(), eventRelayManager);
 
-                PipelineElementManager.getInstance().invokePipelineElement(graph.getBelongsTo(), payload);
-            }
-            else if (identifier.equals("sec")) {
+                RunningInvocableInstances.INSTANCE.add(graph.getDeploymentRunningInstanceId(), graph);
+
+                String resp = InvocableElementManager.getInstance().invoke(endpoint, payload);
+                return resp;
+
+            } else if (identifier.equals("sec")) {
                 graph = Transformer.fromJsonLd(DataSinkInvocation.class, payload);
-                pipelineElementEndpoint = graph.getBelongsTo();
-                PipelineElementManager.getInstance().invokePipelineElement(pipelineElementEndpoint, payload);
+                endpoint = graph.getBelongsTo();
+
+                InvocableElementManager.getInstance().invoke(endpoint, payload);
 
             }
-            //pipelineElementEndpoint = graph.getElementEndpointHostname() + COLON + graph.getElementEndpointPort() + "/" + identifier + "/" + elementId;
         } catch (IOException e) {
             e.printStackTrace();
         }
-        return ok();
+        return "";
     }
 
-    // TODO move endpoint to /elementId/instances/runningInstanceId
     @DELETE
-    @Path("{elementId}/{runningInstanceId}")
+    @Path("/detach/{identifier}/{elementId}/{runningInstanceId}")
     @Produces(MediaType.APPLICATION_JSON)
-    public String detach(@PathParam("elementId") String elementId, @PathParam("runningInstanceId") String runningInstanceId) {
+    public String detach(@PathParam("identifier") String identifier, @PathParam("elementId") String elementId,
+                         @PathParam("runningInstanceId") String runningInstanceId) {
 
         LOG.info("receive stop request elementId={}, runningInstanceId={}", elementId, runningInstanceId);
 
-        return Util.toResponseString(elementId, false, "Could not find the running instance with id: " + runningInstanceId);
-    }
+        // TODO store host and port locally to retrieve by runningInstanceId
 
-    @POST
-    @Path("/detach")
-    @Consumes(MediaType.APPLICATION_JSON)
-    @Produces(MediaType.APPLICATION_JSON)
-    public Response detachPipelineElement(String appId) throws SpRuntimeException {
-        // TODO implement
+        String endpoint = RunningInvocableInstances.INSTANCE.get(runningInstanceId).getBelongsTo();
 
-        // TODO: stop event relay to remote broker
-        EventRelayManager relay = RunningRelayInstances.INSTANCE.get(appId);
-        assert relay != null;
-        relay.stop();
-        RunningRelayInstances.INSTANCE.remove(appId);
+        String resp = InvocableElementManager.getInstance().detach(endpoint + "/" + runningInstanceId);
+        RunningInvocableInstances.INSTANCE.remove(runningInstanceId);
+        return resp;
 
-        return ok();
+        // TODO: stop event relay to remote broker
+//        EventRelayManager relay = RunningRelayInstances.INSTANCE.get(elementId);
+//        assert relay != null;
+//        relay.stop();
+//        RunningRelayInstances.INSTANCE.remove(elementId);
     }
 
     @DELETE
     @Path("/remove")
     @Consumes(MediaType.APPLICATION_JSON)
     public Response removePipelineElementContainer(PipelineElementDockerContainer container) {
-        PipelineElementManager.getInstance().unregisterPipelineElements();
+        InvocableElementManager.getInstance().unregister();
         return ok(DockerContainerOrchestrator.getInstance().remove(container));
     }
 
diff --git a/streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/rest/NodeControllerResourceConfig.java b/streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/rest/NodeControllerResourceConfig.java
index 2f6f06d..a18816c 100644
--- a/streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/rest/NodeControllerResourceConfig.java
+++ b/streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/rest/NodeControllerResourceConfig.java
@@ -1,4 +1,3 @@
-package org.apache.streampipes.node.controller.container.rest;
 /*
  * Licensed to the Apache Software Foundation (ASF) under one or more
  * contributor license agreements.  See the NOTICE file distributed with
@@ -16,6 +15,7 @@ package org.apache.streampipes.node.controller.container.rest;
  * limitations under the License.
  *
  */
+package org.apache.streampipes.node.controller.container.rest;
 
 import org.glassfish.jersey.server.ResourceConfig;
 import org.springframework.stereotype.Component;
@@ -26,7 +26,7 @@ public class NodeControllerResourceConfig extends ResourceConfig {
     public NodeControllerResourceConfig() {
         register(HealthCheckResource.class);
         register(InfoStatusResource.class);
-        register(InvocableManagementResource.class);
+        register(InvocableEntityResource.class);
 
         // TODO remove later - only for local relay tests
         register(DebugRelayResource.class);
diff --git a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/endpoint/EndpointFetcher.java b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/endpoint/EndpointFetcher.java
index 3604171..7959ce6 100644
--- a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/endpoint/EndpointFetcher.java
+++ b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/endpoint/EndpointFetcher.java
@@ -30,7 +30,7 @@ import java.util.stream.Stream;
 public class EndpointFetcher {
 
   public List<RdfEndpoint> getEndpoints() {
-    List<String> endpoints = ConsulUtil.getActivePEServicesEndPoints();
+    List<String> endpoints = ConsulUtil.getActivePeEndpoints();
     List<RdfEndpoint> servicerdRdfEndpoints = new LinkedList<>();
 
     for (String endpoint : endpoints) {
diff --git a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/http/DataSetEntityUrlGenerator.java b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/http/DataSetEntityUrlGenerator.java
index 0e9a3eb..8c7db7f 100644
--- a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/http/DataSetEntityUrlGenerator.java
+++ b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/http/DataSetEntityUrlGenerator.java
@@ -21,19 +21,19 @@ import org.apache.streampipes.model.SpDataSet;
 
 public class DataSetEntityUrlGenerator extends EndpointUrlGenerator<SpDataSet> {
 
-    public DataSetEntityUrlGenerator(SpDataSet pipelineElement) {
-        super(pipelineElement);
+    public DataSetEntityUrlGenerator(SpDataSet graph) {
+        super(graph);
     }
 
     @Override
     public String generateInvokeEndpoint() {
-        return pipelineElement.getElementId();
+        return graph.getElementId();
     }
 
     @Override
     public String generateDetachEndpoint() {
-        return pipelineElement.getElementId()
+        return graph.getElementId()
                 + SLASH
-                + pipelineElement.getDatasetInvocationId() ;
+                + graph.getDatasetInvocationId() ;
     }
 }
diff --git a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/http/EndpointUrlGenerator.java b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/http/EndpointUrlGenerator.java
index 4222c77..c05ed6e 100644
--- a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/http/EndpointUrlGenerator.java
+++ b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/http/EndpointUrlGenerator.java
@@ -21,13 +21,14 @@ import org.apache.streampipes.model.base.NamedStreamPipesEntity;
 
 public abstract class EndpointUrlGenerator<T extends NamedStreamPipesEntity> {
 
+    protected static final String HTTP_PROTOCOL = "http://";
+    protected static final String COLON = ":";
     protected static final String SLASH = "/";
-    protected static final String URLPREFIX = "http://";
 
-    protected T pipelineElement;
+    protected T graph;
 
-    public EndpointUrlGenerator(T pipelineElement) {
-        this.pipelineElement = pipelineElement;
+    public EndpointUrlGenerator(T graph) {
+        this.graph = graph;
     }
 
     public abstract String generateInvokeEndpoint();
diff --git a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/http/InvocableEntityUrlGenerator.java b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/http/InvocableEntityUrlGenerator.java
index 9fec987..814f026 100644
--- a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/http/InvocableEntityUrlGenerator.java
+++ b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/http/InvocableEntityUrlGenerator.java
@@ -19,126 +19,87 @@ package org.apache.streampipes.manager.execution.http;
 
 import org.apache.streampipes.config.consul.ConsulSpConfig;
 import org.apache.streampipes.container.util.ConsulUtil;
-import org.apache.streampipes.manager.node.AvailableNodesFetcher;
 import org.apache.streampipes.model.base.InvocableStreamPipesEntity;
 import org.apache.streampipes.model.graph.DataProcessorInvocation;
-import org.apache.streampipes.model.node.NodeInfo;
-
-import java.util.Optional;
 
 public class InvocableEntityUrlGenerator extends EndpointUrlGenerator<InvocableStreamPipesEntity> {
 
-    private static final String COLON = ":";
-    private static final String SLASH = "/";
-    protected static final String SINK_IDENTIFIER = "sec";
-    protected static final String PROCESSOR_IDENTIFIER = "sepa";
-    private static final String DEFAULT_NODE_ID = "default";
-    private static final String PE_PORT_KEY = "SP_PORT";
-    private static final String PE_HOST_KEY = "SP_HOST";
-
-    private static final String NODE_CONTROLLER_ROUTE = "node/container";
+    private static final String DEFAULT_TARGET_NODE_ID = "default";
+    private static final String INVOKE_ROUTE = "api/v2/node/container/invoke";
+    private static final String DETACH_ROUTE = "api/v2/node/container/detach";
 
-    public InvocableEntityUrlGenerator(InvocableStreamPipesEntity pipelineElement) {
-        super(pipelineElement);
+    public InvocableEntityUrlGenerator(InvocableStreamPipesEntity graph) {
+        super(graph);
     }
 
     @Override
     public String generateInvokeEndpoint() {
-        if (pipelineElement.getDeploymentTargetNodeId() == null ||
-                pipelineElement.getDeploymentTargetNodeId().equals(DEFAULT_NODE_ID)) {
+        if (isDefaultTarget()) {
             // default deployments to primary pipeline element
-//            return URLPREFIX
-//                    + getHost()
-//                    + SLASH
-//                    + getIdentifier()
-//                    + SLASH
-//                    + pipelineElement.getAppId();
-            return defaultHost();
+            return getDefaultEndpoint();
         } else {
             // edge deployments to secondary pipeline element
-            return URLPREFIX
-                    + getHost()
-                    + SLASH
-                    + NODE_CONTROLLER_ROUTE
-                    + "invoke"
-                    + SLASH
-                    + getIdentifier()
-                    + SLASH
-                    + pipelineElement.getAppId();
+            return getDeploymentTargetEndpoint(INVOKE_ROUTE);
         }
     }
 
     @Override
     public String generateDetachEndpoint() {
-        // TODO: handle stop requests
-        return generateInvokeEndpoint()
-                + SLASH
-                + pipelineElement.getDeploymentRunningInstanceId();
-    }
-
-    private String getHost() {
-        if (pipelineElement.getDeploymentTargetNodeId() == null ||
-                pipelineElement.getDeploymentTargetNodeId().equals(DEFAULT_NODE_ID)) {
-            return defaultHost();
+        if (isDefaultTarget()) {
+            // detach primary pipeline element
+            return getDefaultEndpoint() + SLASH + graph.getDeploymentRunningInstanceId();
+        } else {
+            // detach edge deployments to secondary pipeline element
+            return getDeploymentTargetEndpoint(DETACH_ROUTE) + SLASH + graph.getDeploymentRunningInstanceId();
         }
-        else {
-            if (deploymentTargetNodeRunning()) {
-
-                String route = ConsulSpConfig.SERVICE_ROUTE_PREFIX
-                        + pipelineElement.getElementEndpointServiceName()
-                        + SLASH
-                        + ConsulSpConfig.BASE_PREFIX
-                        + SLASH
-                        + ConsulSpConfig.SECONDARY_NODE_KEY
-                        + SLASH
-                        + pipelineElement.getDeploymentTargetNodeId()
-                        + SLASH;
-
-                String host = ConsulUtil.getStringValue(route + PE_HOST_KEY);
-                int port = ConsulUtil.getIntValue(route + PE_PORT_KEY);
+    }
 
-                // Necessary because secondary pipeline element description is not stored in backend
-                // It uses information from primary pipeline element. Node controller will locally forward
-                // request accordingly, thus fields must be correct.
-                pipelineElement.setElementEndpointHostname(host);
-                pipelineElement.setElementEndpointPort(port);
-                pipelineElement.setBelongsTo(URLPREFIX + host + COLON + port + SLASH + getIdentifier() + SLASH + pipelineElement.getAppId());
-                pipelineElement.setElementId(pipelineElement.getBelongsTo() + SLASH + pipelineElement.getDeploymentRunningInstanceId());
+    // Helper methods
 
-                return pipelineElement.getDeploymentTargetNodeHostname()
-                        + COLON
-                        + pipelineElement.getDeploymentTargetNodePort();
-            }
-            else {
-                return defaultHost();
-            }
-        }
+    private boolean isDefaultTarget() {
+        return graph.getDeploymentTargetNodeId() == null ||
+                graph.getDeploymentTargetNodeId().equals(DEFAULT_TARGET_NODE_ID);
     }
 
-    private String defaultHost() {
-//        return pipelineElement.getElementEndpointHostname()
-//                + COLON
-//                + pipelineElement.getElementEndpointPort();
-        return pipelineElement.getBelongsTo();
+    private String getDefaultEndpoint() {
+        return graph.getBelongsTo();
     }
 
-    private String getIdentifier() {
-        return pipelineElement instanceof DataProcessorInvocation ? PROCESSOR_IDENTIFIER : SINK_IDENTIFIER;
+    private String getDeploymentTargetEndpoint(String route) {
+        modifyInvocableElement();
+        return HTTP_PROTOCOL + graph.getDeploymentTargetNodeHostname() + COLON + graph.getDeploymentTargetNodePort()
+                + SLASH
+                + route
+                + SLASH
+                + getIdentifier()
+                + SLASH
+                + graph.getAppId();
     }
 
-    private Optional<NodeInfo> getNodeInfo() {
-        return new AvailableNodesFetcher()
-                .fetchNodes()
-                .stream()
-                .filter(node -> node.getNodeControllerId().equals(pipelineElement.getDeploymentTargetNodeId()))
-                .findFirst();
+    private void modifyInvocableElement() {
+        // Necessary because secondary pipeline element description is not stored in backend
+        // It uses information from primary pipeline element. Node controller will locally forward
+        // request accordingly, thus fields must be correct.
+        String route = ConsulSpConfig.SERVICE_ROUTE_PREFIX
+                + graph.getElementEndpointServiceName()
+                + SLASH
+                + ConsulSpConfig.BASE_PREFIX
+                + SLASH
+                + ConsulSpConfig.SECONDARY_NODE_KEY
+                + SLASH
+                + graph.getDeploymentTargetNodeId()
+                + SLASH;
+
+        String host = ConsulUtil.getValueForRoute(route + "SP_HOST", String.class);
+        int port = ConsulUtil.getValueForRoute(route + "SP_PORT", Integer.class);
+        graph.setElementEndpointHostname(host);
+        graph.setElementEndpointPort(port);
+        graph.setBelongsTo(HTTP_PROTOCOL + host + COLON + port + SLASH + getIdentifier() + SLASH + graph.getAppId());
+        graph.setElementId(graph.getBelongsTo() + SLASH + graph.getDeploymentRunningInstanceId());
     }
 
-    private boolean deploymentTargetNodeRunning() {
-        return new AvailableNodesFetcher()
-                .fetchNodes()
-                .stream()
-                .anyMatch(node -> node.getNodeControllerId().equals(pipelineElement.getDeploymentTargetNodeId()));
+    private String getIdentifier() {
+        return graph instanceof DataProcessorInvocation ? "sepa" : "sec";
     }
 
 }
diff --git a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/matching/InvocationGraphBuilder.java b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/matching/InvocationGraphBuilder.java
index f1fe90f..597e9d0 100644
--- a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/matching/InvocationGraphBuilder.java
+++ b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/matching/InvocationGraphBuilder.java
@@ -111,7 +111,7 @@ public class InvocationGraphBuilder {
               // TODO: set event relay to true
               // TODO: add target edge node broker to List<EventRelays>
 
-              String broker = getEdgeBroker(t);
+              //String broker = getEdgeBroker(t);
 
               t.getInputStreams()
                       .get(getIndex(source.getDOM(), t))
@@ -185,10 +185,11 @@ public class InvocationGraphBuilder {
   }
 
   private String getEdgeBroker(InvocableStreamPipesEntity target) {
-    return ConsulUtil.getStringValue(
+    // TODO: no hardcoded route - only for testing
+    return ConsulUtil.getValueForRoute(
             "sp/v1/node/org.apache.streampipes.node.controller/"
-                    + target.getDeploymentTargetNodeId()
-                    + "/config/SP_NODE_BROKER_HOST");
+                    + target.getDeploymentTargetNodeHostname()
+                    + "/config/SP_NODE_BROKER_HOST", String.class);
   }
 
 
diff --git a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/node/AvailableNodesFetcher.java b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/node/AvailableNodesFetcher.java
index bba6d68..4c7e027 100644
--- a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/node/AvailableNodesFetcher.java
+++ b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/node/AvailableNodesFetcher.java
@@ -54,7 +54,7 @@ public class AvailableNodesFetcher {
 
     private NodeInfo fetchNodeInfo(String activeNode) throws IOException {
         String response = Request
-                .Get(activeNode + "/node/info")
+                .Get(activeNode + "/api/v2/node/info")
                 .addHeader("Accept", MediaType.APPLICATION_JSON)
                 .execute()
                 .returnContent()
diff --git a/streampipes-rest/src/main/java/org/apache/streampipes/rest/impl/ConsulConfig.java b/streampipes-rest/src/main/java/org/apache/streampipes/rest/impl/ConsulConfig.java
index 8d9d8c4..db9e3db 100644
--- a/streampipes-rest/src/main/java/org/apache/streampipes/rest/impl/ConsulConfig.java
+++ b/streampipes-rest/src/main/java/org/apache/streampipes/rest/impl/ConsulConfig.java
@@ -59,7 +59,7 @@ public class ConsulConfig extends AbstractRestInterface implements IConsulConfig
   @Override
   public Response getAllServiceConfigs() {
     LOG.info("Request for all service configs");
-    Map<String, Tuple2<String,String>> peServices = ConsulUtil.getPEServices();
+    Map<String, Tuple2<String,String>> peServices = ConsulUtil.getPeServices();
 
     List<PeConfig> peConfigs = new LinkedList<>();
 
diff --git a/ui/src/app/core-model/gen/streampipes-model-client.ts b/ui/src/app/core-model/gen/streampipes-model-client.ts
index 8962d8a..4e18b90 100644
--- a/ui/src/app/core-model/gen/streampipes-model-client.ts
+++ b/ui/src/app/core-model/gen/streampipes-model-client.ts
@@ -19,7 +19,7 @@
 /* tslint:disable */
 /* eslint-disable */
 // @ts-nocheck
-// Generated using typescript-generator version 2.24.612 on 2020-11-17 20:41:04.
+// Generated using typescript-generator version 2.24.612 on 2020-11-25 23:55:36.
 
 export class FileMetadata {
     createdAt: number;
diff --git a/ui/src/app/core-model/gen/streampipes-model.ts b/ui/src/app/core-model/gen/streampipes-model.ts
index 1481060..712f6d4 100644
--- a/ui/src/app/core-model/gen/streampipes-model.ts
+++ b/ui/src/app/core-model/gen/streampipes-model.ts
@@ -19,10 +19,10 @@
 /* tslint:disable */
 /* eslint-disable */
 // @ts-nocheck
-// Generated using typescript-generator version 2.24.612 on 2020-11-17 20:40:59.
+// Generated using typescript-generator version 2.24.612 on 2020-11-25 23:55:30.
 
 export class AbstractStreamPipesEntity {
-    "@class": "org.apache.streampipes.model.base.NamedStreamPipesEntity" | "org.apache.streampipes.model.connect.adapter.AdapterDescription" | "org.apache.streampipes.model.connect.adapter.AdapterSetDescription" | "org.apache.streampipes.model.connect.adapter.GenericAdapterSetDescription" | "org.apache.streampipes.model.connect.adapter.SpecificAdapterSetDescription" | "org.apache.streampipes.model.connect.adapter.AdapterStreamDescription" | "org.apache.streampipes.model.connect.adapter.G [...]
+    "@class": "org.apache.streampipes.model.base.NamedStreamPipesEntity" | "org.apache.streampipes.model.connect.adapter.AdapterDescription" | "org.apache.streampipes.model.connect.adapter.AdapterSetDescription" | "org.apache.streampipes.model.connect.adapter.GenericAdapterSetDescription" | "org.apache.streampipes.model.connect.adapter.SpecificAdapterSetDescription" | "org.apache.streampipes.model.connect.adapter.AdapterStreamDescription" | "org.apache.streampipes.model.connect.adapter.G [...]
 
     static fromData(data: AbstractStreamPipesEntity, target?: AbstractStreamPipesEntity): AbstractStreamPipesEntity {
         if (!data) {
@@ -54,7 +54,7 @@ export class AccessibleSensorActuatorResource {
 }
 
 export class UnnamedStreamPipesEntity extends AbstractStreamPipesEntity {
-    "@class": "org.apache.streampipes.model.base.UnnamedStreamPipesEntity" | "org.apache.streampipes.model.connect.guess.GuessSchema" | "org.apache.streampipes.model.connect.rules.TransformationRuleDescription" | "org.apache.streampipes.model.connect.rules.value.ValueTransformationRuleDescription" | "org.apache.streampipes.model.connect.rules.value.AddTimestampRuleDescription" | "org.apache.streampipes.model.connect.rules.value.AddValueTransformationRuleDescription" | "org.apache.streamp [...]
+    "@class": "org.apache.streampipes.model.base.UnnamedStreamPipesEntity" | "org.apache.streampipes.model.connect.guess.GuessSchema" | "org.apache.streampipes.model.connect.rules.TransformationRuleDescription" | "org.apache.streampipes.model.connect.rules.value.ValueTransformationRuleDescription" | "org.apache.streampipes.model.connect.rules.value.AddTimestampRuleDescription" | "org.apache.streampipes.model.connect.rules.value.AddValueTransformationRuleDescription" | "org.apache.streamp [...]
     elementId: string;
 
     static fromData(data: UnnamedStreamPipesEntity, target?: UnnamedStreamPipesEntity): UnnamedStreamPipesEntity {
@@ -2147,6 +2147,7 @@ export class NodeMetadata {
     nodeAddress: string;
     nodeLocationTags: string[];
     nodeModel: string;
+    nodeType: string;
 
     static fromData(data: NodeMetadata, target?: NodeMetadata): NodeMetadata {
         if (!data) {
@@ -2155,6 +2156,7 @@ export class NodeMetadata {
         const instance = target || new NodeMetadata();
         instance.nodeAddress = data.nodeAddress;
         instance.nodeModel = data.nodeModel;
+        instance.nodeType = data.nodeType;
         instance.nodeLocationTags = __getCopyArrayFn(__identity<string>())(data.nodeLocationTags);
         return instance;
     }
@@ -2285,6 +2287,31 @@ export class PipelineCategory {
     }
 }
 
+export class PipelineElementDockerContainer extends UnnamedStreamPipesEntity {
+    "@class": "org.apache.streampipes.model.node.PipelineElementDockerContainer";
+    containerName: string;
+    containerPorts: string[];
+    envVars: string[];
+    imageURI: string;
+    labels: { [index: string]: string };
+    serviceId: string;
+
+    static fromData(data: PipelineElementDockerContainer, target?: PipelineElementDockerContainer): PipelineElementDockerContainer {
+        if (!data) {
+            return data;
+        }
+        const instance = target || new PipelineElementDockerContainer();
+        super.fromData(data, instance);
+        instance.imageURI = data.imageURI;
+        instance.containerName = data.containerName;
+        instance.serviceId = data.serviceId;
+        instance.containerPorts = __getCopyArrayFn(__identity<string>())(data.containerPorts);
+        instance.envVars = __getCopyArrayFn(__identity<string>())(data.envVars);
+        instance.labels = __getCopyObjectFn(__identity<string>())(data.labels);
+        return instance;
+    }
+}
+
 export class PipelineElementRecommendation {
     count: number;
     description: string;
diff --git a/ui/src/app/editor/dialog/save-pipeline/save-pipeline.component.ts b/ui/src/app/editor/dialog/save-pipeline/save-pipeline.component.ts
index b40f136..247508f 100644
--- a/ui/src/app/editor/dialog/save-pipeline/save-pipeline.component.ts
+++ b/ui/src/app/editor/dialog/save-pipeline/save-pipeline.component.ts
@@ -143,7 +143,6 @@ export class SavePipelineComponent implements OnInit {
   modifyPipelineElementsDeployments(pipelineElements) {
     pipelineElements.forEach(p => {
       let selectedTargetNodeId = p.deploymentTargetNodeId
-      console.log(selectedTargetNodeId);
       if(selectedTargetNodeId != "default") {
         let selectedNode = this.edgeNodes
             .filter(node => node.nodeControllerId === selectedTargetNodeId)
@@ -155,7 +154,6 @@ export class SavePipelineComponent implements OnInit {
             .map(node => node.nodeControllerPort)[0]
       }
       else {
-        console.log('null');
         p.deploymentTargetNodeHostname = null
         p.deploymentTargetNodePort = null
       }