You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@streampipes.apache.org by wi...@apache.org on 2021/01/08 23:00:57 UTC

[incubator-streampipes] 02/02: merged dev and refactored node controller api

This is an automated email from the ASF dual-hosted git repository.

wiener pushed a commit to branch edge-extensions
in repository https://gitbox.apache.org/repos/asf/incubator-streampipes.git

commit 503ebb814caa093970b05dbeb4add7584c0d360c
Merge: 9049d91 5efd810
Author: Patrick Wiener <wi...@fzi.de>
AuthorDate: Fri Jan 8 23:59:18 2021 +0100

    merged dev and refactored node controller api

 README.md                                          |   6 +-
 pom.xml                                            |  36 ++++-
 .../backend/StreamPipesBackendApplication.java     |  54 ++++++-
 .../backend/StreamPipesResourceConfig.java         |   1 +
 streampipes-code-generation/pom.xml                |   9 ++
 streampipes-commons/pom.xml                        |   8 -
 .../java/org/apache/streampipes/commons/Utils.java |  36 -----
 .../master/management/AdapterMasterManagement.java |  15 +-
 .../container/master/rest/SourcesResource.java     |  33 ++--
 .../connect/container/worker/utils/Utils.java      |  38 +----
 streampipes-connect-container/pom.xml              |   2 +-
 .../connect/init/AdapterContainerConfig.java       |   4 +-
 .../connect/management/AdapterDeserializer.java    |  41 -----
 .../connect/management/AdapterUtilsTest.java       |   2 +-
 .../utils/ConnectContainerResourceTest.java        | 175 ---------------------
 streampipes-connect/pom.xml                        |   2 +-
 .../extensions/ExtensionsResourceConfig.java       |   9 +-
 .../PipelineElementContainerResourceConfig.java    |  19 +--
 streampipes-container/pom.xml                      |   8 +-
 ...t.java => AbstractPipelineElementResource.java} | 132 ++++++++--------
 ...a => DataProcessorPipelineElementResource.java} |  16 +-
 ...t.java => DataSinkPipelineElementResource.java} |  56 +------
 ...java => DataSourcePipelineElementResource.java} |  53 +++----
 ....java => InvocablePipelineElementResource.java} |  58 +++----
 ...t.java => PipelineElementTemplateResource.java} |   5 +-
 .../container/init/DeclarersSingleton.java         |  16 +-
 .../container/transform/Transformer.java           |  49 ------
 .../streampipes/container/util/DeclarerUtils.java  |  46 ------
 .../container/util/NodeControllerUtil.java         |   2 +-
 .../apache/streampipes/container/util/Util.java    |  17 +-
 .../org/streampipes/container/util/UtilTest.java   |  46 ------
 streampipes-model/pom.xml                          |  12 +-
 .../org/apache/streampipes/model/Response.java     |  30 ++--
 .../java/org/apache/streampipes/model/Tuple2.java  |  30 ++--
 .../model/base/AbstractStreamPipesEntity.java      |  27 +++-
 .../model/base/NamedStreamPipesEntity.java         |  17 --
 .../model/base/UnnamedStreamPipesEntity.java       |  17 --
 .../streampipes/model/pipeline/Pipeline.java       |  10 ++
 .../apache/streampipes/model/util/ModelUtils.java  |  14 --
 .../org/apache/streampipes/model/ResponseTest.java |  45 ------
 streampipes-node-controller-container/pom.xml      |   1 -
 .../controller/container/NodeControllerInit.java   |   2 +-
 .../container/{rest => api}/AbstractResource.java  |   2 +-
 .../api/AdapterDataStreamRelayResource.java        |  46 ++++++
 .../container/{rest => api}/ConnectResource.java   |   5 +-
 .../container/api/ContainerResource.java           |  50 ++++++
 .../api/DataProcessorPipelineElementResource.java  |  20 ++-
 .../api/DataSinkPipelineElementResource.java       |  20 +--
 .../{rest => api}/HealthCheckResource.java         |   4 +-
 .../{rest => api}/InfoStatusResource.java          |  37 +----
 .../container/api/InvocableEntityResource.java     | 107 +++++++++++++
 .../NodeControllerResourceConfig.java              |   8 +-
 .../management/connect/ConnectManager.java         |   1 -
 .../management/pe/InvocableElementManager.java     |  34 ++--
 .../management/pe/InvocableLifeCycle.java          |   2 +-
 .../management/relay/DataStreamRelayManager.java   | 109 +++++++++++++
 .../container/rest/DataStreamRelayResource.java    |  80 ----------
 .../container/rest/InvocableEntityResource.java    | 160 -------------------
 streampipes-pipeline-management/pom.xml            |  10 ++
 .../manager/endpoint/EndpointItemFetcher.java      |  14 +-
 .../manager/endpoint/EndpointItemParser.java       |  20 ++-
 .../manager/execution/http/HttpRequestBuilder.java |  23 ++-
 .../http/InvocableEntityUrlGenerator.java          |  11 +-
 .../matching/output/ListOutputSchemaGenerator.java |   4 +-
 .../matching/output/PropertyDuplicateRemover.java  |   8 +-
 .../manager/node/NodeClusterManager.java           |   2 +-
 .../manager/verification/ElementVerifier.java      |  22 ++-
 .../manager/verification/SepVerifier.java          |   5 +-
 .../verification/extractor/TypeExtractor.java      |  63 ++++----
 .../streampipes/manager/matching/v2/TestUtils.java |  16 +-
 streampipes-rest-shared/pom.xml                    |   2 +-
 .../streampipes/rest/shared/util/SpMediaType.java  |   5 +-
 streampipes-rest/pom.xml                           |  10 ++
 .../rest/api/IMeasurementUnitResource.java         |   2 +
 .../rest/impl/AbstractRestInterface.java           |  16 +-
 .../apache/streampipes/rest/impl/Deployment.java   |  32 ++--
 .../rest/impl/MeasurementUnitResource.java         |   9 ++
 .../rest}/serializer/JsonLdProvider.java           |   4 +-
 .../apache/streampipes/rest}/util/JsonLdUtils.java |   7 +-
 .../streampipes/rest}/util/JsonLdUtilsTest.java    |   4 +-
 .../streampipes/sdk/helpers/EpProperties.java      |   1 -
 .../pom.xml                                        |  13 +-
 .../serializers/json/AdapterSerializer.java        |   0
 .../serializers/json/AdapterTypeAdapter.java       |   0
 .../serializers/json/EcTypeAdapter.java            |   0
 .../serializers/json/EpaTypeAdapter.java           |   0
 .../serializers/json/GsonSerializer.java           |   0
 .../serializers/json/JacksonSerializer.java        |   0
 .../serializers/json/JsonLdSerializer.java         |   0
 .../serializers/json/PeTypeAdapter.java            |   0
 .../json/ProcessingElementSerializer.java          |   0
 .../serializers/json/RangeSerializer.java          |  34 ++--
 .../json/RuntimeTypeAdapterFactory.java            |   0
 .../json/TransformationRuleSerializer.java         |   0
 .../serializers/json/UriSerializer.java            |   0
 .../apache/streampipes/serializers/json/Utils.java |   0
 .../pom.xml                                        |  20 ++-
 .../jsonld/CustomAnnotationProvider.java           |   0
 .../serializers/jsonld/JsonLdTransformer.java      |   0
 .../serializers/jsonld/JsonLdUtils.java            |  52 ++----
 .../serializers/jsonld/RdfTransformer.java         |   0
 streampipes-storage-api/pom.xml                    |   8 +
 streampipes-storage-couchdb/pom.xml                |   2 +-
 .../storage/couchdb/dao/AbstractDao.java           |   6 +-
 .../storage/couchdb/dao/PersistCommand.java        |   8 +-
 streampipes-storage-rdf4j/pom.xml                  |   2 +-
 ui/package.json                                    |  50 +++---
 .../configuration/configuration.component.spec.ts  |   8 +-
 .../consul-configs-password.component.spec.ts      |  10 +-
 .../consul-service.component.spec.ts               |  20 +--
 .../edit-unit-transformation.component.css         |   4 -
 .../edit-unit-transformation.component.html        |   2 +-
 .../apis/measurement-units.service.ts              |  32 ++--
 ui/src/app/platform-services/platform.module.ts    |   4 +-
 ui/src/tsconfig.app.json                           |   2 +-
 ui/src/tsconfig.spec.json                          |   2 +-
 ui/tsconfig.base.json                              |  26 ---
 ui/tsconfig.json                                   |  39 +++--
 ui/tsconfig.spec.json                              |   2 +-
 119 files changed, 1004 insertions(+), 1516 deletions(-)

diff --cc pom.xml
index 7f2520f,ec9c0d5..28b9c5c
--- a/pom.xml
+++ b/pom.xml
@@@ -56,8 -56,7 +56,9 @@@
          <geojson-jackson.version>1.8</geojson-jackson.version>
          <guava.version>27.1-jre</guava.version>
          <hibernate-validator.version>6.1.5.Final</hibernate-validator.version>
+         <httpclient.version>4.5.13</httpclient.version>
 +        <hawtbuf.version>1.11</hawtbuf.version>
 +        <httpclient.version>4.5.10</httpclient.version>
          <httpcore.version>4.4.9</httpcore.version>
          <httpcore-osgi.version>4.4.9</httpcore-osgi.version>
          <influxdb.version>2.16</influxdb.version>
@@@ -327,6 -326,6 +328,11 @@@
              </dependency>
              <dependency>
                  <groupId>org.apache.httpcomponents</groupId>
++                <artifactId>httpclient-cache</artifactId>
++                <version>${httpclient.version}</version>
++            </dependency>
++            <dependency>
++                <groupId>org.apache.httpcomponents</groupId>
                  <artifactId>httpcore-osgi</artifactId>
                  <version>${httpcore-osgi.version}</version>
              </dependency>
diff --cc streampipes-container/src/main/java/org/apache/streampipes/container/api/AbstractPipelineElementResource.java
index 9018628,5b87557..1797897
--- a/streampipes-container/src/main/java/org/apache/streampipes/container/api/AbstractPipelineElementResource.java
+++ b/streampipes-container/src/main/java/org/apache/streampipes/container/api/AbstractPipelineElementResource.java
@@@ -195,14 -181,10 +181,14 @@@ public abstract class AbstractPipelineE
          }
        } else if (desc instanceof ConsumableStreamPipesEntity) {
          Collection<TransportProtocol> supportedProtocols =
-                 declarersSingleton.getSupportedProtocols();
+                 DeclarersSingleton.getInstance().getSupportedProtocols();
          Collection<TransportFormat> supportedFormats =
-                 declarersSingleton.getSupportedFormats();
+                 DeclarersSingleton.getInstance().getSupportedFormats();
  
-         ((ConsumableStreamPipesEntity) desc).setElementEndpointHostname(declarersSingleton.getHostname());
-         ((ConsumableStreamPipesEntity) desc).setElementEndpointPort(declarersSingleton.getPort());
-         ((ConsumableStreamPipesEntity) desc).setElementEndpointServiceName(declarersSingleton.getServiceName());
++        ((ConsumableStreamPipesEntity) desc).setElementEndpointHostname(DeclarersSingleton.getInstance().getHostname());
++        ((ConsumableStreamPipesEntity) desc).setElementEndpointPort(DeclarersSingleton.getInstance().getPort());
++        ((ConsumableStreamPipesEntity) desc).setElementEndpointServiceName(DeclarersSingleton.getInstance().getServiceName());
 +
          if (supportedProtocols.size() > 0 && supportedFormats.size() > 0) {
            // Overwrite existing grounding from default provided by declarers singleton
            ((ConsumableStreamPipesEntity) desc)
diff --cc streampipes-container/src/main/java/org/apache/streampipes/container/api/InvocablePipelineElementResource.java
index eb79cce,5664a57..a83b595
--- a/streampipes-container/src/main/java/org/apache/streampipes/container/api/InvocablePipelineElementResource.java
+++ b/streampipes-container/src/main/java/org/apache/streampipes/container/api/InvocablePipelineElementResource.java
@@@ -71,18 -61,17 +61,18 @@@ public abstract class InvocablePipeline
              InvocableDeclarer declarer = (InvocableDeclarer) getDeclarerById(elementId);
  
              if (declarer != null) {
 -                String runningInstanceId = getInstanceId(graph.getElementId(), elementId);
 +                //String runningInstanceId = getInstanceId(graph.getElementId(), elementId);
 +                String runningInstanceId = graph.getDeploymentRunningInstanceId();
                  RunningInstances.INSTANCE.add(runningInstanceId, graph, declarer.getClass().newInstance());
                  Response resp = RunningInstances.INSTANCE.getInvocation(runningInstanceId).invokeRuntime(graph);
-                 return Util.toResponseString(resp);
+                 return ok(resp);
              }
-         } catch (RDFParseException | IOException | RepositoryException | InstantiationException | IllegalAccessException e) {
+         } catch (InstantiationException | IllegalAccessException e) {
              e.printStackTrace();
-             return Util.toResponseString(new Response(elementId, false, e.getMessage()));
+             return ok(new Response(elementId, false, e.getMessage()));
          }
  
-         return Util.toResponseString(elementId, false, "Could not find the element with id: " + elementId);
+         return ok(new Response(elementId, false, "Could not find the element with id: " + elementId));
      }
  
      @POST
diff --cc streampipes-container/src/main/java/org/apache/streampipes/container/util/NodeControllerUtil.java
index 903321d,0000000..be7ee90
mode 100644,000000..100644
--- a/streampipes-container/src/main/java/org/apache/streampipes/container/util/NodeControllerUtil.java
+++ b/streampipes-container/src/main/java/org/apache/streampipes/container/util/NodeControllerUtil.java
@@@ -1,134 -1,0 +1,134 @@@
 +package org.apache.streampipes.container.util;/*
 + * Licensed to the Apache Software Foundation (ASF) under one or more
 + * contributor license agreements.  See the NOTICE file distributed with
 + * this work for additional information regarding copyright ownership.
 + * The ASF licenses this file to You under the Apache License, Version 2.0
 + * (the "License"); you may not use this file except in compliance with
 + * the License.  You may obtain a copy of the License at
 + *
 + *    http://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing, software
 + * distributed under the License is distributed on an "AS IS" BASIS,
 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 + * See the License for the specific language governing permissions and
 + * limitations under the License.
 + *
 + */
 +
 +import com.fasterxml.jackson.core.JsonProcessingException;
 +import org.apache.http.client.fluent.Request;
 +import org.apache.http.entity.ContentType;
 +import org.apache.streampipes.container.declarer.SemanticEventProcessingAgentDeclarer;
 +import org.apache.streampipes.container.model.node.InvocableRegistration;
 +import org.apache.streampipes.container.model.consul.ConsulServiceRegistrationBody;
 +import org.apache.streampipes.container.model.consul.HealthCheckConfiguration;
 +import org.apache.streampipes.serializers.json.JacksonSerializer;
 +import org.slf4j.Logger;
 +import org.slf4j.LoggerFactory;
 +
 +import java.io.IOException;
 +import java.util.*;
 +
 +public class NodeControllerUtil {
 +    static Logger LOG = LoggerFactory.getLogger(NodeControllerUtil.class);
 +
 +    private static final String HTTP_PROTOCOL = "http://";
 +    private static final String COLON = ":";
 +    private static final String SLASH = "/";
 +    private static final String HEALTH_CHECK_INTERVAL = "10s";
 +    private static final String PE_TAG = "pe";
 +    private static final String SECONDARY_PE_IDENTIFIER_TAG = "secondary";
-     private static final String NODE_CONTROLLER_REGISTER_SVC_URL = "api/v2/node/container/register";
++    private static final String NODE_CONTROLLER_REGISTER_SVC_URL = "api/v2/node/element/register";
 +
 +    private static final String NODE_CONTROLLER_CONTAINER_HOST = "SP_NODE_CONTROLLER_CONTAINER_HOST";
 +    private static final String NODE_CONTROLLER_CONTAINER_PORT = "SP_NODE_CONTROLLER_CONTAINER_PORT";
 +
 +    public static void register(String serviceID, String host, int port,
 +                                Map<String, SemanticEventProcessingAgentDeclarer> epaDeclarers) {
 +        register(PE_TAG, makeSvcId(host, serviceID), host, port,
 +                Arrays.asList(PE_TAG, SECONDARY_PE_IDENTIFIER_TAG), epaDeclarers);
 +    }
 +
 +    public static void register(String svcName, String svcId, String host, int port, List<String> tag,
 +                                Map<String, SemanticEventProcessingAgentDeclarer> epaDeclarers) {
 +        boolean connected = false;
 +
 +        while (!connected) {
 +            LOG.info("Trying to register pipeline element container at node controller: " + makeRegistrationEndpoint());
 +            String body = createSvcBody(svcName, svcId, host, port, tag, epaDeclarers);
 +            connected = registerSvcHttpClient(body);
 +
 +            if (!connected) {
 +                LOG.info("Retrying in 5 seconds");
 +                try {
 +                    Thread.sleep(5000);
 +                } catch (InterruptedException e) {
 +                    e.printStackTrace();
 +                }
 +            }
 +        }
 +        LOG.info("Successfully registered pipeline element container: " + svcId);
 +    }
 +
 +    private static boolean registerSvcHttpClient(String body) {
 +        String endpoint = makeRegistrationEndpoint();
 +        try {
 +            Request.Post(makeRegistrationEndpoint())
 +                    .bodyString(body, ContentType.APPLICATION_JSON)
 +                    .connectTimeout(1000)
 +                    .socketTimeout(100000)
 +                    .execute();
 +            return true;
 +        } catch (IOException e) {
 +            LOG.error("Could not register at " + endpoint);
 +        }
 +        return false;
 +    }
 +
 +    private static String createSvcBody(String name, String id, String host, int port, List<String> tags,
 +                                        Map<String, SemanticEventProcessingAgentDeclarer> epaDeclarers) {
 +        try {
 +            ConsulServiceRegistrationBody body = new ConsulServiceRegistrationBody();
 +            String healthCheckURL = HTTP_PROTOCOL + host + COLON + port;
 +            body.setID(id);
 +            body.setName(name);
 +            body.setTags(tags);
 +            body.setAddress(HTTP_PROTOCOL + host);
 +            body.setPort(port);
 +            body.setEnableTagOverride(true);
 +            body.setCheck(new HealthCheckConfiguration("GET", healthCheckURL, HEALTH_CHECK_INTERVAL));
 +
 +            InvocableRegistration svcBody = new InvocableRegistration();
 +            svcBody.setConsulServiceRegistrationBody(body);
 +            svcBody.setSupportedPipelineElementAppIds(new ArrayList<>(epaDeclarers.keySet()));
 +
 +            return JacksonSerializer.getObjectMapper().writeValueAsString(svcBody);
 +        } catch (JsonProcessingException e) {
 +            e.printStackTrace();
 +        }
 +        throw new IllegalArgumentException("Failure");
 +    }
 +
 +    private static String makeRegistrationEndpoint() {
 +        if (System.getenv(NODE_CONTROLLER_CONTAINER_HOST) != null) {
 +            return HTTP_PROTOCOL
 +                    + System.getenv(NODE_CONTROLLER_CONTAINER_HOST)
 +                    + COLON
 +                    + System.getenv(NODE_CONTROLLER_CONTAINER_PORT)
 +                    + SLASH
 +                    + NODE_CONTROLLER_REGISTER_SVC_URL;
 +        } else {
 +            return HTTP_PROTOCOL
 +                    + "localhost"
 +                    + COLON
 +                    + "7077"
 +                    + SLASH
 +                    + NODE_CONTROLLER_REGISTER_SVC_URL;
 +        }
 +    }
 +
 +    private static String makeSvcId(String host, String serviceID) {
 +        return host + SLASH + serviceID;
 +    }
 +}
diff --cc streampipes-container/src/main/java/org/apache/streampipes/container/util/Util.java
index 5b39877,0761f12..572b00d
--- a/streampipes-container/src/main/java/org/apache/streampipes/container/util/Util.java
+++ b/streampipes-container/src/main/java/org/apache/streampipes/container/util/Util.java
@@@ -26,11 -26,11 +26,11 @@@ public class Util 
  
      private static final String Slash = "/";
  
--    public static String getInstanceId(String url, String type, String elemntId) {
++    public static String getInstanceId(String url, String type, String elementId) {
          return url.replace(DeclarersSingleton.getInstance().getBaseUri()
                  + type
                  + Slash
--                + elemntId
++                + elementId
                  + Slash, "");
      }
  
diff --cc streampipes-model/src/main/java/org/apache/streampipes/model/pipeline/Pipeline.java
index 58774ca,2e1ad1a..8394db8
--- a/streampipes-model/src/main/java/org/apache/streampipes/model/pipeline/Pipeline.java
+++ b/streampipes-model/src/main/java/org/apache/streampipes/model/pipeline/Pipeline.java
@@@ -134,14 -135,14 +136,22 @@@ public class Pipeline extends ElementCo
      this.createdAt = createdAt;
    }
  
 +  public String getEventRelayStrategy() {
 +    return eventRelayStrategy;
 +  }
 +
 +  public void setEventRelayStrategy(String eventRelayStrategy) {
 +    this.eventRelayStrategy = eventRelayStrategy;
 +  }
 +
+   public boolean isRestartOnSystemReboot() {
+     return restartOnSystemReboot;
+   }
+ 
+   public void setRestartOnSystemReboot(boolean restartOnSystemReboot) {
+     this.restartOnSystemReboot = restartOnSystemReboot;
+   }
+ 
    public Pipeline clone() {
      Pipeline pipeline = new Pipeline();
      pipeline.setName(name);
diff --cc streampipes-node-controller-container/pom.xml
index a593487,d9444f2..4076064
--- a/streampipes-node-controller-container/pom.xml
+++ b/streampipes-node-controller-container/pom.xml
@@@ -75,79 -108,31 +75,78 @@@
              <version>0.68.0-SNAPSHOT</version>
          </dependency>
  
 -        <!-- External dependencies -->
 -        <dependency>
 -            <groupId>org.apache.maven.shared</groupId>
 -            <artifactId>maven-invoker</artifactId>
 -        </dependency>
 -        <dependency>
 -            <groupId>org.jgrapht</groupId>
 -            <artifactId>jgrapht-core</artifactId>
 -        </dependency>
 -        <dependency>
 -            <groupId>org.objenesis</groupId>
 -            <artifactId>objenesis</artifactId>
 -        </dependency>
 -
 -        <!-- Test dependencies -->
 -        <dependency>
 -            <groupId>junit</groupId>
 -            <artifactId>junit</artifactId>
 -            <scope>test</scope>
 -        </dependency>
 -        <dependency>
 -            <groupId>org.apache.streampipes</groupId>
 -            <artifactId>streampipes-measurement-units</artifactId>
 -            <version>0.68.0-SNAPSHOT</version>
 -            <scope>test</scope>
 +        <!--external dependencies-->
 +        <!-- parse yaml config -->
 +<!--        <dependency>-->
 +<!--            <groupId>org.yaml</groupId>-->
 +<!--            <artifactId>snakeyaml</artifactId>-->
 +<!--            <version>1.21</version>-->
 +<!--        </dependency>-->
 +        <!-- docker client for java -->
 +        <dependency>
 +            <groupId>com.spotify</groupId>
 +            <artifactId>docker-client</artifactId>
 +            <classifier>shaded</classifier>
 +            <version>8.16.0</version>
 +            <exclusions>
 +                <exclusion>
 +                    <groupId>org.ow2.asm</groupId>
 +                    <artifactId>asm-tree</artifactId>
 +                </exclusion>
 +                <exclusion>
 +                    <groupId>org.ow2.asm</groupId>
 +                    <artifactId>asm-util</artifactId>
 +                </exclusion>
 +                <exclusion>
 +                    <groupId>org.ow2.asm</groupId>
 +                    <artifactId>asm-commons</artifactId>
 +                </exclusion>
 +                <exclusion>
 +                    <groupId>org.ow2.asm</groupId>
 +                    <artifactId>asm-analysis</artifactId>
 +                </exclusion>
 +                <exclusion>
 +                    <groupId>com.fasterxml.jackson.module</groupId>
 +                    <artifactId>jackson-module-jaxb-annotations</artifactId>
 +                </exclusion>
 +                <exclusion>
 +                    <groupId>com.fasterxml.jackson.datatype</groupId>
 +                    <artifactId>jackson-datatype-guava</artifactId>
 +                </exclusion>
 +            </exclusions>
 +        </dependency>
 +<!--        </dependency>-->
 +        <!-- used for gathering system information-->
 +        <dependency>
 +            <groupId>com.github.oshi</groupId>
 +            <artifactId>oshi-core</artifactId>
 +            <version>5.3.6</version>
 +        </dependency>
 +        <dependency>
 +            <groupId>org.eclipse.paho</groupId>
 +            <artifactId>org.eclipse.paho.client.mqttv3</artifactId>
 +            <version>1.2.4</version>
 +        </dependency>
 +        <dependency>
 +            <groupId>org.apache.kafka</groupId>
 +            <artifactId>kafka-clients</artifactId>
-             <version>2.2.0</version>
          </dependency>
      </dependencies>
 +
 +    <build>
 +        <plugins>
 +            <plugin>
 +                <groupId>org.springframework.boot</groupId>
 +                <artifactId>spring-boot-maven-plugin</artifactId>
 +                <executions>
 +                    <execution>
 +                        <goals>
 +                            <goal>repackage</goal>
 +                        </goals>
 +                    </execution>
 +                </executions>
 +            </plugin>
 +        </plugins>
 +        <finalName>streampipes-node-controller-container</finalName>
 +    </build>
  </project>
diff --cc streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/NodeControllerInit.java
index be6c385,0000000..48e2e88
mode 100644,000000..100644
--- a/streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/NodeControllerInit.java
+++ b/streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/NodeControllerInit.java
@@@ -1,78 -1,0 +1,78 @@@
 +/*
 + * Licensed to the Apache Software Foundation (ASF) under one or more
 + * contributor license agreements.  See the NOTICE file distributed with
 + * this work for additional information regarding copyright ownership.
 + * The ASF licenses this file to You under the Apache License, Version 2.0
 + * (the "License"); you may not use this file except in compliance with
 + * the License.  You may obtain a copy of the License at
 + *
 + *    http://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing, software
 + * distributed under the License is distributed on an "AS IS" BASIS,
 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 + * See the License for the specific language governing permissions and
 + * limitations under the License.
 + *
 + */
 +package org.apache.streampipes.node.controller.container;
 +
 +import org.apache.streampipes.container.util.ConsulUtil;
 +import org.apache.streampipes.node.controller.container.management.orchestrator.docker.DockerContainerManager;
- import org.apache.streampipes.node.controller.container.rest.NodeControllerResourceConfig;
++import org.apache.streampipes.node.controller.container.api.NodeControllerResourceConfig;
 +import org.apache.streampipes.node.controller.container.config.NodeControllerConfig;
 +import org.apache.streampipes.node.controller.container.management.node.NodeManager;
 +import org.apache.streampipes.node.controller.container.management.janitor.JanitorManager;
 +import org.apache.streampipes.node.controller.container.management.resource.ResourceManager;
 +import org.slf4j.Logger;
 +import org.slf4j.LoggerFactory;
 +import org.springframework.boot.SpringApplication;
 +import org.springframework.boot.autoconfigure.EnableAutoConfiguration;
 +import org.springframework.context.annotation.Configuration;
 +import org.springframework.context.annotation.Import;
 +
 +import javax.annotation.PreDestroy;
 +import java.util.Collections;
 +
 +@Configuration
 +@EnableAutoConfiguration
 +@Import({ NodeControllerResourceConfig.class })
 +public class NodeControllerInit {
 +
 +    private static final Logger LOG =
 +            LoggerFactory.getLogger(NodeControllerInit.class.getCanonicalName());
 +
 +    public static void main(String [] args) {
 +
 +        NodeControllerConfig conf = NodeControllerConfig.INSTANCE;
 +
 +        SpringApplication app = new SpringApplication(NodeControllerInit.class);
 +        app.setDefaultProperties(Collections.singletonMap("server.port", conf.getNodeControllerPort()));
 +        app.run();
 +
 +        LOG.info("Load node info");
 +        NodeManager.getInstance().init();
 +
 +        LOG.info("Start Node resource manager");
 +        ResourceManager.getInstance().run();
 +
 +        if (!"true".equals(System.getenv("SP_DEBUG"))) {
 +            LOG.info("Auto-deploy StreamPipes node container");
 +            DockerContainerManager.getInstance().init();
 +
 +            LOG.info("Start Janitor manager");
 +            JanitorManager.getInstance().run();
 +        }
 +
 +        // registration with consul here
 +        ConsulUtil.registerNodeService(
 +                conf.getNodeServiceId(),
 +                conf.getNodeHostName(),
 +                conf.getNodeControllerPort()
 +        );
 +    }
 +
 +    @PreDestroy
 +    public void onExit(){
 +    }
 +}
diff --cc streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/api/AbstractResource.java
index 4d25c34,0000000..dc18520
mode 100644,000000..100644
--- a/streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/api/AbstractResource.java
+++ b/streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/api/AbstractResource.java
@@@ -1,61 -1,0 +1,61 @@@
 +/*
 + * Licensed to the Apache Software Foundation (ASF) under one or more
 + * contributor license agreements.  See the NOTICE file distributed with
 + * this work for additional information regarding copyright ownership.
 + * The ASF licenses this file to You under the Apache License, Version 2.0
 + * (the "License"); you may not use this file except in compliance with
 + * the License.  You may obtain a copy of the License at
 + *
 + *    http://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing, software
 + * distributed under the License is distributed on an "AS IS" BASIS,
 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 + * See the License for the specific language governing permissions and
 + * limitations under the License.
 + *
 + */
- package org.apache.streampipes.node.controller.container.rest;
++package org.apache.streampipes.node.controller.container.api;
 +
 +import org.apache.streampipes.model.message.Message;
 +
 +import javax.ws.rs.core.Response;
 +
 +
 +public abstract class AbstractResource {
 +
 +    protected <T> Response ok(T entity) {
 +        return Response
 +                .ok()
 +                .entity(entity)
 +                .build();
 +    }
 +
 +    protected <T> Response ok() {
 +        return Response
 +                .ok()
 +                .build();
 +    }
 +
 +    protected <T> Response error(T entity) {
 +        return Response
 +                .status(500)
 +                .entity(entity)
 +                .build();
 +    }
 +
 +    protected Response statusMessage(Message message) {
 +        return ok(message);
 +    }
 +
 +    protected Response fail() {
 +        return Response.serverError().build();
 +    }
 +
 +    protected <T> Response fail(T entity) {
 +        return Response
 +                .serverError()
 +                .entity(entity)
 +                .build();
 +    }
 +}
diff --cc streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/api/AdapterDataStreamRelayResource.java
index 0000000,0000000..00bb34a
new file mode 100644
--- /dev/null
+++ b/streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/api/AdapterDataStreamRelayResource.java
@@@ -1,0 -1,0 +1,46 @@@
++/*
++ * Licensed to the Apache Software Foundation (ASF) under one or more
++ * contributor license agreements.  See the NOTICE file distributed with
++ * this work for additional information regarding copyright ownership.
++ * The ASF licenses this file to You under the Apache License, Version 2.0
++ * (the "License"); you may not use this file except in compliance with
++ * the License.  You may obtain a copy of the License at
++ *
++ *    http://www.apache.org/licenses/LICENSE-2.0
++ *
++ * Unless required by applicable law or agreed to in writing, software
++ * distributed under the License is distributed on an "AS IS" BASIS,
++ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
++ * See the License for the specific language governing permissions and
++ * limitations under the License.
++ *
++ */
++package org.apache.streampipes.node.controller.container.api;
++
++import org.apache.streampipes.model.SpDataStreamRelayContainer;
++import org.apache.streampipes.node.controller.container.management.relay.DataStreamRelayManager;
++import org.apache.streampipes.rest.shared.annotation.JacksonSerialized;
++
++import javax.ws.rs.*;
++import javax.ws.rs.core.MediaType;
++
++@Path("/api/v2/node/stream/relay")
++public class AdapterDataStreamRelayResource extends AbstractResource {
++
++    @POST
++    @JacksonSerialized
++    @Path("/invoke")
++    @Consumes(MediaType.APPLICATION_JSON)
++    @Produces(MediaType.APPLICATION_JSON)
++    public javax.ws.rs.core.Response invoke(SpDataStreamRelayContainer graph) {
++        return ok(DataStreamRelayManager.getInstance().startAdapterDataStreamRelay(graph));
++    }
++
++    @DELETE
++    @Path("/detach/{runningInstanceId}")
++    @Consumes(MediaType.APPLICATION_JSON)
++    @Produces(MediaType.APPLICATION_JSON)
++    public javax.ws.rs.core.Response detach(@PathParam("runningInstanceId") String runningInstanceId) {
++        return ok(DataStreamRelayManager.getInstance().stopAdapterDataStreamRelay(runningInstanceId));
++    }
++}
diff --cc streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/api/ConnectResource.java
index 0439633,0000000..2865cc1
mode 100644,000000..100644
--- a/streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/api/ConnectResource.java
+++ b/streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/api/ConnectResource.java
@@@ -1,151 -1,0 +1,148 @@@
 +/*
 + * Licensed to the Apache Software Foundation (ASF) under one or more
 + * contributor license agreements.  See the NOTICE file distributed with
 + * this work for additional information regarding copyright ownership.
 + * The ASF licenses this file to You under the Apache License, Version 2.0
 + * (the "License"); you may not use this file except in compliance with
 + * the License.  You may obtain a copy of the License at
 + *
 + *    http://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing, software
 + * distributed under the License is distributed on an "AS IS" BASIS,
 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 + * See the License for the specific language governing permissions and
 + * limitations under the License.
 + *
 + */
- package org.apache.streampipes.node.controller.container.rest;
++package org.apache.streampipes.node.controller.container.api;
 +
 +import org.apache.streampipes.model.connect.adapter.AdapterDescription;
 +import org.apache.streampipes.model.connect.adapter.AdapterSetDescription;
 +import org.apache.streampipes.model.connect.adapter.AdapterStreamDescription;
 +import org.apache.streampipes.model.connect.worker.ConnectWorkerContainer;
 +import org.apache.streampipes.model.runtime.RuntimeOptionsRequest;
 +import org.apache.streampipes.node.controller.container.management.connect.ConnectManager;
 +import org.apache.streampipes.rest.shared.annotation.JacksonSerialized;
- import org.slf4j.Logger;
- import org.slf4j.LoggerFactory;
 +
 +import javax.ws.rs.*;
 +import javax.ws.rs.core.MediaType;
 +import javax.ws.rs.core.Response;
 +
 +
 +@Path("/api/v1/{username}/worker")
 +public class ConnectResource extends AbstractResource {
-     private static final Logger LOG = LoggerFactory.getLogger(ConnectResource.class.getCanonicalName());
 +
 +    // Registration
 +   @POST
 +   @JacksonSerialized
 +   @Path("/register")
 +   @Consumes(MediaType.APPLICATION_JSON)
 +   @Produces(MediaType.APPLICATION_JSON)
 +   public Response register(@PathParam("username") String username, ConnectWorkerContainer cw) {
 +       return ok(ConnectManager.getInstance().register(username, cw));
 +   }
 +
 +    // AdapterResource
 +    @GET
 +    @Path("/adapters/{id}/assets")
 +    @Produces("application/zip")
 +    public Response getAdapterAssets(@PathParam("username") String username, @PathParam("id") String appId) {
 +        return ok(ConnectManager.getInstance().assets(username, appId, "adapter", "/"));
 +    }
 +
 +    @GET
 +    @Path("/adapters/{id}/assets/icon")
 +    @Produces("image/png")
 +    public Response getAdapterIconAsset(@PathParam("username") String username, @PathParam("id") String appId) {
 +        return ok(ConnectManager.getInstance().assets(username, appId, "adapter", "/icon"));
 +    }
 +
 +    @GET
 +    @Path("/adapters/{id}/assets/documentation")
 +    @Produces(MediaType.TEXT_PLAIN)
 +    public String getAdapterDocumentationAsset(@PathParam("username") String username, @PathParam("id") String appId) {
 +        return ConnectManager.getInstance()
 +                .assets(username, appId, "adapter", "/documentation").toString();
 +    }
 +
 +    // ProtocolResource
 +    @GET
 +    @Path("/protocols/{id}/assets")
 +    @Produces("application/zip")
 +    public Response getProtocolAssets(@PathParam("username") String username, @PathParam("id") String appId) {
 +        return ok(ConnectManager.getInstance().assets(username, appId, "protocol", "/"));
 +    }
 +
 +    @GET
 +    @Path("/protocols/{id}/assets/icon")
 +    @Produces("image/png")
 +    public Response getProtocolIconAsset(@PathParam("username") String username, @PathParam("id") String appId) {
 +        return ok(ConnectManager.getInstance().assets(username, appId, "protocol", "/icon"));
 +    }
 +
 +    @GET
 +    @Path("/protocols/{id}/assets/documentation")
 +    @Produces(MediaType.TEXT_PLAIN)
 +    public String getProtocolDocumentationAsset(@PathParam("username") String username,
 +                                             @PathParam("id") String appId) {
 +        return ConnectManager.getInstance()
 +                .assets(username, appId, "protocol", "documentation").toString();
 +    }
 +
 +    // WorkerResource
 +    @POST
 +    @JacksonSerialized
 +    @Path("/stream/invoke")
 +    @Consumes(MediaType.APPLICATION_JSON)
 +    @Produces(MediaType.APPLICATION_JSON)
 +    public Response invokeStreamAdapter(@PathParam("username") String username, AdapterStreamDescription ad) {
 +        return ok(ConnectManager.getInstance().invoke(username, ad));
 +    }
 +
 +    @POST
 +    @JacksonSerialized
 +    @Path("/stream/stop")
 +    @Consumes(MediaType.APPLICATION_JSON)
 +    @Produces(MediaType.APPLICATION_JSON)
 +    public Response stopStreamAdapter(@PathParam("username") String username, AdapterStreamDescription ad) {
 +        return ok(ConnectManager.getInstance().stop(username, ad));
 +    }
 +
 +    @POST
 +    @JacksonSerialized
 +    @Path("/set/invoke")
 +    @Consumes(MediaType.APPLICATION_JSON)
 +    @Produces(MediaType.APPLICATION_JSON)
 +    public Response invokeSetAdapter(@PathParam("username") String username, AdapterSetDescription ad) {
 +        return ok(ConnectManager.getInstance().invoke(username, ad));
 +    }
 +
 +    @POST
 +    @JacksonSerialized
 +    @Path("/set/stop")
 +    @Consumes(MediaType.APPLICATION_JSON)
 +    @Produces(MediaType.APPLICATION_JSON)
 +    public Response stopSetAdapter(@PathParam("username") String username, AdapterSetDescription ad){
 +        return ok(ConnectManager.getInstance().stop(username, ad));
 +    }
 +
 +    @POST
 +    @JacksonSerialized
 +    @Path("/guess/schema")
 +    @Produces(MediaType.APPLICATION_JSON)
 +    public Response guessSchema(@PathParam("username") String username, AdapterDescription ad) {
 +       return ok(ConnectManager.getInstance().guess(username, ad));
 +    }
 +
 +    // RuntimeResolvableResource
 +    @POST
 +    @Path("/resolvable/{id}/configurations")
 +    @JacksonSerialized
 +    @Produces(MediaType.APPLICATION_JSON)
 +    @Consumes(MediaType.APPLICATION_JSON)
 +    public Response fetchConfigurations(@PathParam("username") String username, @PathParam("id") String appId,
 +                                        RuntimeOptionsRequest runtimeOptions) {
 +       return ok(ConnectManager.getInstance().fetchConfigurations(username, appId, runtimeOptions));
 +    }
 +}
diff --cc streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/api/ContainerResource.java
index 0000000,0000000..21733ef
new file mode 100644
--- /dev/null
+++ b/streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/api/ContainerResource.java
@@@ -1,0 -1,0 +1,50 @@@
++/*
++ * Licensed to the Apache Software Foundation (ASF) under one or more
++ * contributor license agreements.  See the NOTICE file distributed with
++ * this work for additional information regarding copyright ownership.
++ * The ASF licenses this file to You under the Apache License, Version 2.0
++ * (the "License"); you may not use this file except in compliance with
++ * the License.  You may obtain a copy of the License at
++ *
++ *    http://www.apache.org/licenses/LICENSE-2.0
++ *
++ * Unless required by applicable law or agreed to in writing, software
++ * distributed under the License is distributed on an "AS IS" BASIS,
++ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
++ * See the License for the specific language governing permissions and
++ * limitations under the License.
++ *
++ */
++package org.apache.streampipes.node.controller.container.api;
++
++import org.apache.streampipes.model.node.container.DockerContainer;
++import org.apache.streampipes.node.controller.container.management.orchestrator.docker.DockerContainerManager;
++import org.apache.streampipes.node.controller.container.management.pe.InvocableElementManager;
++
++import javax.ws.rs.*;
++import javax.ws.rs.core.MediaType;
++
++@Path("/api/v2/node/container")
++public class ContainerResource extends AbstractResource {
++
++    @GET
++    @Produces(MediaType.APPLICATION_JSON)
++    public javax.ws.rs.core.Response getPipelineElementContainer(){
++        return ok(DockerContainerManager.getInstance().list());
++    }
++
++    @POST
++    @Path("/deploy")
++    @Consumes(MediaType.APPLICATION_JSON)
++    public javax.ws.rs.core.Response deployPipelineElementContainer(DockerContainer container) {
++        return ok(DockerContainerManager.getInstance().deploy(container));
++    }
++
++    @DELETE
++    @Path("/remove")
++    @Consumes(MediaType.APPLICATION_JSON)
++    public javax.ws.rs.core.Response removePipelineElementContainer(DockerContainer container) {
++        InvocableElementManager.getInstance().unregister();
++        return ok(DockerContainerManager.getInstance().remove(container));
++    }
++}
diff --cc streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/api/DataProcessorPipelineElementResource.java
index 573d13d,f3eb16a..6e5ebb0
--- a/streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/api/DataProcessorPipelineElementResource.java
+++ b/streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/api/DataProcessorPipelineElementResource.java
@@@ -1,32 -1,29 +1,30 @@@
 -/*
 - * Licensed to the Apache Software Foundation (ASF) under one or more
 - * contributor license agreements.  See the NOTICE file distributed with
 - * this work for additional information regarding copyright ownership.
 - * The ASF licenses this file to You under the Apache License, Version 2.0
 - * (the "License"); you may not use this file except in compliance with
 - * the License.  You may obtain a copy of the License at
 - *
 - *    http://www.apache.org/licenses/LICENSE-2.0
 - *
 - * Unless required by applicable law or agreed to in writing, software
 - * distributed under the License is distributed on an "AS IS" BASIS,
 - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 - * See the License for the specific language governing permissions and
 - * limitations under the License.
 - *
 - */
 -
 -import {StreampipesPeContainerConifgs} from "./streampipes-pe-container-configs";
 -
 -//ConsulService = StreampipesPeContainer ERLEDIGT
 -export interface StreampipesPeContainer {
 -    name: string;
 -    mainKey: string;
 -    meta: {
 -        status: string;
 -    }
 -    configs: [StreampipesPeContainerConifgs];
 -}
 +/*
 + * Licensed to the Apache Software Foundation (ASF) under one or more
 + * contributor license agreements.  See the NOTICE file distributed with
 + * this work for additional information regarding copyright ownership.
 + * The ASF licenses this file to You under the Apache License, Version 2.0
 + * (the "License"); you may not use this file except in compliance with
 + * the License.  You may obtain a copy of the License at
 + *
 + *    http://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing, software
 + * distributed under the License is distributed on an "AS IS" BASIS,
 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 + * See the License for the specific language governing permissions and
 + * limitations under the License.
 + *
 + */
++package org.apache.streampipes.node.controller.container.api;
 +
- package org.apache.streampipes.rest.shared.annotation;
++import org.apache.streampipes.model.graph.DataProcessorInvocation;
 +
- import javax.ws.rs.NameBinding;
- import java.lang.annotation.ElementType;
- import java.lang.annotation.Retention;
- import java.lang.annotation.RetentionPolicy;
- import java.lang.annotation.Target;
++import javax.ws.rs.Path;
 +
- @NameBinding
- @Target({ElementType.TYPE, ElementType.METHOD})
- @Retention(RetentionPolicy.RUNTIME)
- public @interface RdfRootElement {
-     String value();
++@Path("/api/v2/node/element/sepa")
++public class DataProcessorPipelineElementResource extends InvocableEntityResource<DataProcessorInvocation> {
++
++    public DataProcessorPipelineElementResource() {
++        super(DataProcessorInvocation.class);
++    }
 +}
diff --cc streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/api/DataSinkPipelineElementResource.java
index 0e851d6,f3eb16a..af99b25
--- a/streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/api/DataSinkPipelineElementResource.java
+++ b/streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/api/DataSinkPipelineElementResource.java
@@@ -1,30 -1,29 +1,30 @@@
 -/*
 - * Licensed to the Apache Software Foundation (ASF) under one or more
 - * contributor license agreements.  See the NOTICE file distributed with
 - * this work for additional information regarding copyright ownership.
 - * The ASF licenses this file to You under the Apache License, Version 2.0
 - * (the "License"); you may not use this file except in compliance with
 - * the License.  You may obtain a copy of the License at
 - *
 - *    http://www.apache.org/licenses/LICENSE-2.0
 - *
 - * Unless required by applicable law or agreed to in writing, software
 - * distributed under the License is distributed on an "AS IS" BASIS,
 - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 - * See the License for the specific language governing permissions and
 - * limitations under the License.
 - *
 - */
 -
 -import {StreampipesPeContainerConifgs} from "./streampipes-pe-container-configs";
 -
 -//ConsulService = StreampipesPeContainer ERLEDIGT
 -export interface StreampipesPeContainer {
 -    name: string;
 -    mainKey: string;
 -    meta: {
 -        status: string;
 -    }
 -    configs: [StreampipesPeContainerConifgs];
 -}
 +/*
 + * Licensed to the Apache Software Foundation (ASF) under one or more
 + * contributor license agreements.  See the NOTICE file distributed with
 + * this work for additional information regarding copyright ownership.
 + * The ASF licenses this file to You under the Apache License, Version 2.0
 + * (the "License"); you may not use this file except in compliance with
 + * the License.  You may obtain a copy of the License at
 + *
 + *    http://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing, software
 + * distributed under the License is distributed on an "AS IS" BASIS,
 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 + * See the License for the specific language governing permissions and
 + * limitations under the License.
 + *
 + */
- package org.apache.streampipes.rest.shared.annotation;
++package org.apache.streampipes.node.controller.container.api;
 +
- import javax.ws.rs.NameBinding;
- import java.lang.annotation.ElementType;
- import java.lang.annotation.Retention;
- import java.lang.annotation.RetentionPolicy;
- import java.lang.annotation.Target;
++import org.apache.streampipes.model.graph.DataSinkInvocation;
 +
- @NameBinding
- @Target({ElementType.TYPE, ElementType.METHOD})
- @Retention(RetentionPolicy.RUNTIME)
- public @interface JsonLdSerialized {
++import javax.ws.rs.Path;
++
++@Path("/api/v2/node/element/sec")
++public class DataSinkPipelineElementResource extends InvocableEntityResource<DataSinkInvocation> {
++
++    public DataSinkPipelineElementResource() {
++        super(DataSinkInvocation.class);
++    }
 +}
diff --cc streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/api/HealthCheckResource.java
index 37796be,ff6da01..8c593a4
--- a/streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/api/HealthCheckResource.java
+++ b/streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/api/HealthCheckResource.java
@@@ -15,22 -15,19 +15,22 @@@
   * limitations under the License.
   *
   */
- package org.apache.streampipes.node.controller.container.rest;
++package org.apache.streampipes.node.controller.container.api;
  
 -import {Component, Input} from '@angular/core';
 -import {StreampipesPeContainerConifgs} from "../shared/streampipes-pe-container-configs";
 -import {ConfigurationService} from '../shared/configuration.service';
 +import org.apache.streampipes.node.controller.container.config.NodeControllerConfig;
  
 -@Component({
 -    selector: 'consul-configs-number',
 -    templateUrl: './consul-configs-number.component.html',
 -    styleUrls: ['./consul-configs-number.component.css']
 -})
 -export class ConsulConfigsNumberComponent {
 -    @Input() configuration: StreampipesPeContainerConifgs
 -    constructor(public configService:ConfigurationService) {
 -    }
 +import javax.ws.rs.GET;
 +import javax.ws.rs.Path;
 +import javax.ws.rs.Produces;
 +import javax.ws.rs.core.MediaType;
 +import javax.ws.rs.core.Response;
 +
 +@Path("/")
 +public class HealthCheckResource extends AbstractResource {
  
 -}
 +    @GET
 +    @Produces(MediaType.APPLICATION_JSON)
 +    public Response getHealth() {
-         return ok(String.format("hello from node controller: %s", NodeControllerConfig.INSTANCE.getNodeControllerId()));
++        return ok(String.format("PONG: %s", NodeControllerConfig.INSTANCE.getNodeControllerId()));
 +    }
 +}
diff --cc streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/api/InfoStatusResource.java
index 949d5e1,0000000..b3a01f2
mode 100644,000000..100644
--- a/streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/api/InfoStatusResource.java
+++ b/streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/api/InfoStatusResource.java
@@@ -1,82 -1,0 +1,59 @@@
 +/*
 + * Licensed to the Apache Software Foundation (ASF) under one or more
 + * contributor license agreements.  See the NOTICE file distributed with
 + * this work for additional information regarding copyright ownership.
 + * The ASF licenses this file to You under the Apache License, Version 2.0
 + * (the "License"); you may not use this file except in compliance with
 + * the License.  You may obtain a copy of the License at
 + *
 + *    http://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing, software
 + * distributed under the License is distributed on an "AS IS" BASIS,
 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 + * See the License for the specific language governing permissions and
 + * limitations under the License.
 + *
 + */
- package org.apache.streampipes.node.controller.container.rest;
++package org.apache.streampipes.node.controller.container.api;
 +
- import com.fasterxml.jackson.core.JsonProcessingException;
 +import org.apache.streampipes.model.node.NodeInfoDescription;
 +import org.apache.streampipes.node.controller.container.management.node.NodeManager;
- import org.apache.streampipes.node.controller.container.management.relay.EventRelay;
- import org.apache.streampipes.node.controller.container.management.relay.RunningRelayInstances;
- import org.apache.streampipes.node.controller.container.management.relay.metrics.RelayMetrics;
++import org.apache.streampipes.node.controller.container.management.relay.DataStreamRelayManager;;
 +import org.apache.streampipes.node.controller.container.management.resource.ResourceManager;
 +import org.apache.streampipes.rest.shared.annotation.JacksonSerialized;
- import org.apache.streampipes.serializers.json.JacksonSerializer;
- import org.slf4j.Logger;
- import org.slf4j.LoggerFactory;
 +
 +import javax.ws.rs.*;
 +import javax.ws.rs.core.MediaType;
 +import javax.ws.rs.core.Response;
- import java.util.List;
- import java.util.stream.Collectors;
 +
- @Path("/api/v2/node")
++@Path("/api/v2/node/info")
 +public class InfoStatusResource extends AbstractResource {
-     private static final Logger LOG = LoggerFactory.getLogger(InfoStatusResource.class.getCanonicalName());
 +
 +    @GET
-     @Path("/info")
 +    @Produces(MediaType.APPLICATION_JSON)
 +    public Response getNodeInfo() {
 +        return ok(NodeManager.getInstance().retrieveNodeInfoDescription());
 +    }
 +
 +    @PUT
-     @Path("/update")
 +    @JacksonSerialized
 +    @Consumes(MediaType.APPLICATION_JSON)
 +    public Response updateNodeInfo(NodeInfoDescription desc) {
 +        return ok(NodeManager.getInstance().updateNodeInfoDescription(desc));
 +    }
 +
 +    @GET
-     @Path("/status")
++    @Path("/resources")
 +    @Produces(MediaType.APPLICATION_JSON)
 +    public Response getStatus() {
 +        return ok(ResourceManager.getInstance().retrieveNodeResources());
 +    }
 +
 +    @GET
-     @Path("/metrics")
++    @Path("/relays")
 +    @Produces(MediaType.APPLICATION_JSON)
-     public Response getMetrics() {
-         try {
-             List<RelayMetrics> metrics = RunningRelayInstances.INSTANCE.getRunningInstances()
-                     .stream()
-                     .map(EventRelay::getRelayMetrics)
-                     .collect(Collectors.toList());
- 
-             String metricsList = JacksonSerializer.getObjectMapper().writeValueAsString(metrics);
- 
-             return ok(metricsList);
-         } catch (JsonProcessingException e) {
-             e.printStackTrace();
-         }
-         return fail();
++    public Response getAllRelays() {
++        return ok(DataStreamRelayManager.getInstance().getAllRelays());
 +    }
 +}
diff --cc streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/api/InvocableEntityResource.java
index 0000000,0000000..0c71b57
new file mode 100644
--- /dev/null
+++ b/streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/api/InvocableEntityResource.java
@@@ -1,0 -1,0 +1,107 @@@
++/*
++ * Licensed to the Apache Software Foundation (ASF) under one or more
++ * contributor license agreements.  See the NOTICE file distributed with
++ * this work for additional information regarding copyright ownership.
++ * The ASF licenses this file to You under the Apache License, Version 2.0
++ * (the "License"); you may not use this file except in compliance with
++ * the License.  You may obtain a copy of the License at
++ *
++ *    http://www.apache.org/licenses/LICENSE-2.0
++ *
++ * Unless required by applicable law or agreed to in writing, software
++ * distributed under the License is distributed on an "AS IS" BASIS,
++ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
++ * See the License for the specific language governing permissions and
++ * limitations under the License.
++ *
++ */
++package org.apache.streampipes.node.controller.container.api;
++
++import com.fasterxml.jackson.core.JsonProcessingException;
++import org.apache.streampipes.commons.exceptions.SpRuntimeException;
++import org.apache.streampipes.container.model.node.InvocableRegistration;
++import org.apache.streampipes.model.Response;
++import org.apache.streampipes.model.base.InvocableStreamPipesEntity;
++import org.apache.streampipes.model.graph.DataProcessorInvocation;
++import org.apache.streampipes.model.graph.DataSinkInvocation;
++import org.apache.streampipes.node.controller.container.management.pe.InvocableElementManager;
++import org.apache.streampipes.node.controller.container.management.pe.RunningInvocableInstances;
++import org.apache.streampipes.node.controller.container.management.relay.DataStreamRelayManager;
++import org.apache.streampipes.serializers.json.JacksonSerializer;
++import org.slf4j.Logger;
++import org.slf4j.LoggerFactory;
++
++import javax.ws.rs.*;
++import javax.ws.rs.core.MediaType;
++
++public abstract class InvocableEntityResource<I extends InvocableStreamPipesEntity> extends AbstractResource {
++    private static final Logger LOG = LoggerFactory.getLogger(InvocableEntityResource.class.getCanonicalName());
++
++    private static final String SLASH = "/";
++
++    protected Class<I> clazz;
++
++    public InvocableEntityResource(Class<I> clazz) {
++        this.clazz = clazz;
++    }
++
++    @POST
++    @Path("/register")
++    public void register(InvocableRegistration registration) {
++        InvocableElementManager.getInstance().register(registration);
++    }
++
++    @POST
++    @Path("{elementId}")
++    @Consumes(MediaType.APPLICATION_JSON)
++    @Produces(MediaType.APPLICATION_JSON)
++    public javax.ws.rs.core.Response invoke(@PathParam("elementId") String elementId, I graph) {
++        String endpoint;
++
++        if (graph instanceof DataProcessorInvocation) {
++            endpoint = graph.getBelongsTo();
++            DataStreamRelayManager.getInstance().startPipelineElementDataStreamRelay((DataProcessorInvocation) graph);
++            Response resp = InvocableElementManager.getInstance().invoke(endpoint, toJson(graph));
++            if (resp.isSuccess()) {
++                RunningInvocableInstances.INSTANCE.add(graph.getDeploymentRunningInstanceId(), graph);
++            }
++            return ok(resp);
++        }
++        // Currently no data sinks are registered at node controller. If we, at some point, want to also run data
++        // sinks on edge nodes we need to register there Declarer at the node controller one startup.
++        else if (graph instanceof DataSinkInvocation) {
++            endpoint = graph.getBelongsTo();
++            Response resp = InvocableElementManager.getInstance().invoke(endpoint, toJson(graph));
++            if (resp.isSuccess()) {
++                RunningInvocableInstances.INSTANCE.add(graph.getDeploymentRunningInstanceId(), graph);
++            }
++            return ok(resp);
++        }
++
++        return ok();
++    }
++
++    @DELETE
++    @Path("{elementId}/{runningInstanceId}")
++    @Produces(MediaType.APPLICATION_JSON)
++    public javax.ws.rs.core.Response detach(@PathParam("elementId") String elementId,
++                         @PathParam("runningInstanceId") String runningInstanceId) {
++        LOG.info("receive stop request elementId={}, runningInstanceId={}", elementId, runningInstanceId);
++
++        String endpoint = RunningInvocableInstances.INSTANCE.get(runningInstanceId).getBelongsTo();
++        Response resp = InvocableElementManager.getInstance().detach(endpoint + SLASH + runningInstanceId);
++        RunningInvocableInstances.INSTANCE.remove(runningInstanceId);
++        DataStreamRelayManager.getInstance().stopPipelineElementDataStreamRelay(runningInstanceId);
++
++        return ok(resp);
++    }
++
++    private String toJson(I graph) {
++        try {
++            return JacksonSerializer.getObjectMapper().writeValueAsString(graph);
++        } catch (JsonProcessingException e) {
++            e.printStackTrace();
++        }
++        throw new SpRuntimeException("Could not serialize object: " + graph);
++    }
++}
diff --cc streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/api/NodeControllerResourceConfig.java
index 97b2dd2,630dcb5..8db6aba
--- a/streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/api/NodeControllerResourceConfig.java
+++ b/streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/api/NodeControllerResourceConfig.java
@@@ -15,19 -15,20 +15,21 @@@
   * limitations under the License.
   *
   */
- package org.apache.streampipes.node.controller.container.rest;
++package org.apache.streampipes.node.controller.container.api;
  
 -import {Component, Input} from '@angular/core';
 -import {StreampipesPeContainerConifgs} from "../shared/streampipes-pe-container-configs";
 -import {ConfigurationService} from '../shared/configuration.service'
 +import org.glassfish.jersey.server.ResourceConfig;
 +import org.springframework.stereotype.Component;
  
 -@Component({
 -    selector: 'consul-configs-boolean',
 -    templateUrl: './consul-configs-boolean.component.html',
 -    styleUrls: ['./consul-configs-boolean.component.css'],
 -    providers: [ConfigurationService]
 -})
 -export class ConsulConfigsBooleanComponent {
 -    @Input() configuration: StreampipesPeContainerConifgs
 -    constructor(public configService:ConfigurationService) {
 -    }
 +@Component
 +public class NodeControllerResourceConfig extends ResourceConfig {
  
 -}
 +    public NodeControllerResourceConfig() {
 +        register(HealthCheckResource.class);
 +        register(InfoStatusResource.class);
-         register(InvocableEntityResource.class);
-         register(DataStreamRelayResource.class);
++        register(DataProcessorPipelineElementResource.class);
++        register(DataSinkPipelineElementResource.class);
++        register(AdapterDataStreamRelayResource.class);
 +        register(ConnectResource.class);
++        register(ContainerResource.class);
 +    }
 +}
diff --cc streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/management/connect/ConnectManager.java
index 3eee852,0000000..0756e1b
mode 100644,000000..100644
--- a/streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/management/connect/ConnectManager.java
+++ b/streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/management/connect/ConnectManager.java
@@@ -1,225 -1,0 +1,224 @@@
 +/*
 + * Licensed to the Apache Software Foundation (ASF) under one or more
 + * contributor license agreements.  See the NOTICE file distributed with
 + * this work for additional information regarding copyright ownership.
 + * The ASF licenses this file to You under the Apache License, Version 2.0
 + * (the "License"); you may not use this file except in compliance with
 + * the License.  You may obtain a copy of the License at
 + *
 + *    http://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing, software
 + * distributed under the License is distributed on an "AS IS" BASIS,
 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 + * See the License for the specific language governing permissions and
 + * limitations under the License.
 + *
 + */
 +package org.apache.streampipes.node.controller.container.management.connect;
 +
 +import com.fasterxml.jackson.core.JsonProcessingException;
 +import org.apache.http.HttpResponse;
 +import org.apache.http.client.fluent.Request;
 +import org.apache.http.client.fluent.Response;
 +import org.apache.http.entity.ContentType;
 +import org.apache.http.util.EntityUtils;
 +import org.apache.streampipes.commons.exceptions.SpRuntimeException;
 +import org.apache.streampipes.model.base.UnnamedStreamPipesEntity;
 +import org.apache.streampipes.model.connect.adapter.AdapterDescription;
 +import org.apache.streampipes.model.connect.adapter.AdapterSetDescription;
 +import org.apache.streampipes.model.connect.adapter.AdapterStreamDescription;
 +import org.apache.streampipes.model.connect.guess.GuessSchema;
 +import org.apache.streampipes.model.connect.worker.ConnectWorkerContainer;
 +import org.apache.streampipes.model.runtime.RuntimeOptionsRequest;
 +import org.apache.streampipes.node.controller.container.config.NodeControllerConfig;
 +import org.apache.streampipes.serializers.json.JacksonSerializer;
 +import org.slf4j.Logger;
 +import org.slf4j.LoggerFactory;
 +
 +import java.io.IOException;
 +
 +public class ConnectManager {
- 
 +    private static final Logger LOG =
 +            LoggerFactory.getLogger(ConnectManager.class.getCanonicalName());
 +
 +    private static final String HTTP_PROTOCOL = "http://";
 +    private static final String COLON = ":";
 +    private static final String SLASH = "/";
 +    private static final String BACKEND_HOST = NodeControllerConfig.INSTANCE.getBackendHost();
 +    private static final int BACKEND_PORT = NodeControllerConfig.INSTANCE.getBackendPort();
 +    private static final String BACKEND_ADMINISTRATION_ROUTE = "/streampipes-backend/api/v2/connect/{username}/master" +
 +            "/administration";
 +
 +    // Connect adapter base route
 +    // TODO: get from registered extensions or connect adapater config
 +    private static final String CONNECT_WORKER_HOST = "localhost";
 +    private static final int CONNECT_WORKER_PORT = 7024;
 +    private static final String CONNECT_WORKER_BASE_ROUTE = "/api/v1/{username}/worker";
 +    private static final String STREAM_ROUTE = "/stream";
 +    private static final String SET_ROUTE = "/set";
 +    private static final String INVOKE_ROUTE = "/invoke";
 +    private static final String STOP_ROUTE ="/stop";
 +    private static final String GUESS_ROUTE = "/guess/schema";
 +    private static final String RESOLVABLE_ROUTE = "/resolvable/{id}/configurations";
 +    private static final String ADAPTER_ROUTE = "/adapters/{id}/assets";
 +    private static final String PROCOTOL_ROUTE = "/protocols/{id}/assets";
 +
 +    private static final Integer CONNECT_TIMEOUT = 10000;
 +    private static final Integer SOCKET_TIMEOUT = 100000;
 +    private static ConnectManager instance = null;
 +
 +    private ConnectManager() {}
 +
 +    public static ConnectManager getInstance() {
 +        if (instance == null) {
 +            synchronized (ConnectManager.class) {
 +                if (instance == null)
 +                    instance = new ConnectManager();
 +            }
 +        }
 +        return instance;
 +    }
 +
 +    // adapter -> backend communication: registration
 +
 +    // MasterRestClient
 +    public String register(String username, ConnectWorkerContainer wc) {
 +        String endpoint = (backendUrl() + BACKEND_ADMINISTRATION_ROUTE.replace("{username}", username));
 +        LOG.info("Trying to register connect worker at backend: " + endpoint);
 +        return post(endpoint , jackson(wc)).toString();
 +    }
 +
 +    // backend -> adapter communication
 +
 +    // WorkerResource
 +    public <T extends AdapterDescription> String invoke(String username, T ad) {
 +        LOG.info("Invoke adapter: appId=" + ad.getAppId() + ", name=" + ad.getName());
 +        if (ad instanceof AdapterStreamDescription) {
 +            return post(endpointFromDescription(username, ad, STREAM_ROUTE + INVOKE_ROUTE), jackson(ad)).toString();
 +        } else if (ad instanceof AdapterSetDescription) {
 +            return post(endpointFromDescription(username, ad, SET_ROUTE + INVOKE_ROUTE), jackson(ad)).toString();
 +        }
 +        throw new SpRuntimeException("Could not invoke adapter: " + ad.getAppId());
 +    }
 +
 +    public <T extends AdapterDescription> String stop(String username, T ad) {
 +        LOG.info("Stop adapter: appId=" + ad.getAppId() + ", name=" + ad.getName());
 +        if (ad instanceof AdapterStreamDescription) {
 +            return post(endpointFromDescription(username, ad, STREAM_ROUTE + STOP_ROUTE), jackson(ad)).toString();
 +        } else if (ad instanceof AdapterSetDescription) {
 +            return post(endpointFromDescription(username, ad, SET_ROUTE + STOP_ROUTE), jackson(ad)).toString();
 +        }
 +        throw new SpRuntimeException("Could not stop adapter: " + ad.getAppId());
 +    }
 +
 +    // GuessResource
 +    public GuessSchema guess(String username, AdapterDescription ad) {
 +        try {
 +            LOG.info("Trying to guess schema: " + ad.getAppId());
 +
 +            Response resp = post(endpointFromDescription(username, ad, GUESS_ROUTE), jackson(ad));
 +            HttpResponse httpResponse = resp.returnResponse();
 +            String responseString = EntityUtils.toString(httpResponse.getEntity());
 +            return JacksonSerializer.getObjectMapper().readValue(responseString, GuessSchema.class);
 +        } catch (IOException e) {
 +            e.printStackTrace();
 +        }
 +        throw new SpRuntimeException("Could not deserialize object");
 +    }
 +
 +    // RuntimeResolvableResource
 +    public String fetchConfigurations(String username, String appId, RuntimeOptionsRequest runtimeOptions) {
 +        String endpoint = endpointFromStringRoute(username, RESOLVABLE_ROUTE.replace("{id}", appId));
 +        LOG.info("Trying to fetch configurations at: " + endpoint);
 +        return post(endpoint, jackson(runtimeOptions)).toString();
 +    }
 +
 +    // AdapterResource
 +    public byte[] assets(String username, String appId, String assetType, String subroute) {
 +        String endpoint = "";
 +        if ("adapter".equals(assetType)) {
 +            if (subroute.isEmpty()) {
 +                endpoint = endpointFromStringRoute(username, ADAPTER_ROUTE.replace("{id}", appId));
 +            } else {
 +                endpoint = endpointFromStringRoute(username, (ADAPTER_ROUTE.replace("{id}", appId) + subroute));
 +            }
 +        } else if ("protocol".equals(assetType)) {
 +            if (subroute.isEmpty()) {
 +                endpoint = endpointFromStringRoute(username, PROCOTOL_ROUTE.replace("{id}", appId));
 +            } else {
 +                endpoint = endpointFromStringRoute(username, (PROCOTOL_ROUTE.replace("{id}", appId) + subroute));
 +            }
 +        }
 +        return get(endpoint);
 +    }
 +
 +    // Helper methods
 +    private Response post(String endpoint, String payload) {
 +        try {
 +            return Request.Post(endpoint)
 +                    .bodyString(payload, ContentType.APPLICATION_JSON)
 +                    .connectTimeout(CONNECT_TIMEOUT)
 +                    .socketTimeout(SOCKET_TIMEOUT)
 +                    .execute();
 +        } catch (IOException e) {
 +            e.printStackTrace();
 +        }
 +        throw new SpRuntimeException("Post request was not successful");
 +    }
 +
 +    private byte[] get(String endpoint) {
 +        try {
 +            return Request.Get(endpoint)
 +                    .connectTimeout(CONNECT_TIMEOUT)
 +                    .socketTimeout(SOCKET_TIMEOUT)
 +                    .execute().returnContent().asBytes();
 +        } catch (IOException e) {
 +            e.printStackTrace();
 +        }
 +        throw new SpRuntimeException("Get request was not successful");
 +    }
 +
 +    private <T extends AdapterDescription> String jackson(T ad) {
 +        try {
 +            return JacksonSerializer.getObjectMapper().writeValueAsString(ad);
 +        } catch (JsonProcessingException e) {
 +            e.printStackTrace();
 +        }
 +        throw new SpRuntimeException("Could not serialize object");
 +    }
 +
 +    private <T extends UnnamedStreamPipesEntity> String jackson(T ad) {
 +        try {
 +            return JacksonSerializer.getObjectMapper().writeValueAsString(ad);
 +        } catch (JsonProcessingException e) {
 +            e.printStackTrace();
 +        }
 +        throw new SpRuntimeException("Could not serialize object");
 +    }
 +
 +    private <T extends AdapterDescription> String endpointFromDescription(String username, T ad, String subroute) {
 +        return workerUrl(ad) + addUserToBaseRoute(username) + subroute;
 +    }
 +
 +    private String endpointFromStringRoute(String username, String subroute) {
 +        return workerUrl() + addUserToBaseRoute(username) + subroute;
 +    }
 +
 +    private String addUserToBaseRoute(String username) {
 +        return CONNECT_WORKER_BASE_ROUTE.replace("{username}", username);
 +    }
 +
 +
 +    private String backendUrl() {
 +        return HTTP_PROTOCOL + BACKEND_HOST + COLON + BACKEND_PORT;
 +    }
 +
 +    private String workerUrl() {
 +        return HTTP_PROTOCOL + CONNECT_WORKER_HOST + COLON + CONNECT_WORKER_PORT;
 +    }
 +
 +    private <T extends AdapterDescription> String workerUrl(T ad) {
 +        return HTTP_PROTOCOL + ad.getElementEndpointHostname() + COLON + ad.getElementEndpointPort();
 +    }
 +}
diff --cc streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/management/pe/InvocableElementManager.java
index d89fddd,0000000..692f3b4
mode 100644,000000..100644
--- a/streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/management/pe/InvocableElementManager.java
+++ b/streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/management/pe/InvocableElementManager.java
@@@ -1,188 -1,0 +1,186 @@@
 +/*
 + * Licensed to the Apache Software Foundation (ASF) under one or more
 + * contributor license agreements.  See the NOTICE file distributed with
 + * this work for additional information regarding copyright ownership.
 + * The ASF licenses this file to You under the Apache License, Version 2.0
 + * (the "License"); you may not use this file except in compliance with
 + * the License.  You may obtain a copy of the License at
 + *
 + *    http://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing, software
 + * distributed under the License is distributed on an "AS IS" BASIS,
 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 + * See the License for the specific language governing permissions and
 + * limitations under the License.
 + *
 + */
 +package org.apache.streampipes.node.controller.container.management.pe;
 +
- import com.fasterxml.jackson.core.JsonProcessingException;
- import com.google.gson.Gson;
- import org.apache.http.client.ClientProtocolException;
++import com.google.gson.JsonSyntaxException;
 +import org.apache.http.client.fluent.Request;
- import org.apache.http.client.fluent.Response;
 +import org.apache.http.entity.ContentType;
 +import org.apache.http.entity.StringEntity;
 +import org.apache.streampipes.container.model.node.InvocableRegistration;
++import org.apache.streampipes.model.Response;
 +import org.apache.streampipes.node.controller.container.config.NodeControllerConfig;
 +import org.apache.streampipes.node.controller.container.management.node.NodeManager;
 +import org.apache.streampipes.serializers.json.JacksonSerializer;
 +import org.slf4j.Logger;
 +import org.slf4j.LoggerFactory;
 +
 +import java.io.IOException;
 +import java.util.Collections;
 +
 +public class InvocableElementManager implements InvocableLifeCycle {
 +
 +    private static final Logger LOG =
 +            LoggerFactory.getLogger(InvocableElementManager.class.getCanonicalName());
 +
 +    private static final String HTTP_PROTOCOL = "http://";
 +    private static final String COLON = ":";
 +    private static final String SLASH = "/";
 +    private static final String ENV_CONSUL_LOCATION = "CONSUL_LOCATION";
 +    private static final Integer CONNECT_TIMEOUT = 10000;
 +    private static InvocableElementManager instance = null;
 +
 +    private InvocableElementManager() {}
 +
 +    public static InvocableElementManager getInstance() {
 +        if (instance == null) {
 +            synchronized (InvocableElementManager.class) {
 +                if (instance == null)
 +                    instance = new InvocableElementManager();
 +            }
 +        }
 +        return instance;
 +    }
 +
 +    @Override
 +    public void register(InvocableRegistration registration) {
 +        try {
 +            Request.Put(makeConsulRegistrationEndpoint())
 +                    .addHeader("accept", "application/json")
 +                    .body(new StringEntity(JacksonSerializer
 +                            .getObjectMapper()
 +                            .writeValueAsString(registration.getConsulServiceRegistrationBody())))
 +                    .execute();
 +
 +            // TODO: persistent storage to survive failures
 +            NodeManager.getInstance()
 +                    .retrieveNodeInfoDescription()
 +                    .setSupportedElements(registration.getSupportedPipelineElementAppIds());
 +
 +            String url = "http://"
 +                            + NodeControllerConfig.INSTANCE.getBackendHost()
 +                            + ":"
 +                            + NodeControllerConfig.INSTANCE.getBackendPort()
 +                            + "/"
 +                            + "streampipes-backend/api/v2/users/admin@streampipes.org/nodes"
 +                            + "/"
 +                            + NodeControllerConfig.INSTANCE.getNodeControllerId();
 +
 +            String desc = JacksonSerializer.getObjectMapper()
 +                    .writeValueAsString(NodeManager.getInstance().retrieveNodeInfoDescription());
 +
 +            Request.Put(url)
 +                    .bodyString(desc, ContentType.APPLICATION_JSON)
 +//                    .connectTimeout(1000)
 +//                    .socketTimeout(100000)
 +                    .execute();
 +
 +            LOG.info("Successfully registered pipeline element container");
 +        } catch (IOException e) {
 +            e.printStackTrace();
 +        }
 +    }
 +
 +    @Override
-     public org.apache.streampipes.model.Response invoke(String endpoint, String payload) {
++    public Response invoke(String endpoint, String payload) {
 +        LOG.info("Invoke pipeline element: {}", endpoint);
 +        try {
-             Response httpResp = Request
++            org.apache.http.client.fluent.Response httpResp = Request
 +                    .Post(endpoint)
 +                    .bodyString(payload, ContentType.APPLICATION_JSON)
 +                    .connectTimeout(CONNECT_TIMEOUT)
 +                    .execute();
- 
-             return new Gson().fromJson(httpResp.returnContent().asString(),
-                     org.apache.streampipes.model.Response.class);
- 
++            return handleResponse(httpResp);
 +        } catch (Exception e) {
 +            LOG.error(e.getMessage());
 +        }
 +        throw new RuntimeException("Failed to invoke pipeline element: " + endpoint);
 +    }
 +
 +    @Override
-     public String detach(String endpoint) {
++    public Response detach(String endpoint) {
 +        LOG.info("Detach pipeline element: {}", endpoint);
 +        try {
-             Response httpResp = Request
++            org.apache.http.client.fluent.Response httpResp = Request
 +                    .Delete(endpoint)
 +                    .connectTimeout(CONNECT_TIMEOUT)
 +                    .execute();
- 
-             String resp = httpResp.returnContent().asString();
-             org.apache.streampipes.model.Response streamPipesResp = new Gson().fromJson(resp,
-                     org.apache.streampipes.model.Response.class);
- 
-             return streamPipesResp.toString();
++            return handleResponse(httpResp);
 +        } catch (Exception e) {
 +            LOG.error(e.getMessage());
 +        }
 +        throw new IllegalArgumentException("Failed to detach pipeline element: " + endpoint);
 +    }
 +
 +    @Override
 +    public void unregister(){
 +        // TODO: unregister element from Consul and
 +        NodeManager.getInstance()
 +                .retrieveNodeInfoDescription()
 +                .setSupportedElements(Collections.emptyList());
 +
 +        String url = "http://"
 +                + NodeControllerConfig.INSTANCE.getBackendHost()
 +                + ":"
 +                + NodeControllerConfig.INSTANCE.getBackendPort()
 +                + "/"
 +                + "streampipes-backend/api/v2/users/admin@streampipes.org/nodes"
 +                + "/"
 +                + NodeControllerConfig.INSTANCE.getNodeControllerId();
 +
 +        try {
 +            String desc = JacksonSerializer.getObjectMapper()
 +                    .writeValueAsString(NodeManager.getInstance().retrieveNodeInfoDescription());
 +
 +            Request.Put(url)
 +                    .bodyString(desc, ContentType.APPLICATION_JSON)
 +                    .connectTimeout(1000)
 +                    .socketTimeout(100000)
 +                    .execute();
 +
 +        } catch (IOException e) {
 +            e.printStackTrace();
 +        }
 +    }
 +
++    private Response handleResponse(org.apache.http.client.fluent.Response httpResp) throws JsonSyntaxException,
++            IOException {
++        String resp = httpResp.returnContent().asString();
++        return JacksonSerializer
++                .getObjectMapper()
++                .readValue(resp, Response.class);
++    }
++
 +    private String makeConsulRegistrationEndpoint() {
 +        if (System.getenv(ENV_CONSUL_LOCATION) != null) {
 +            return HTTP_PROTOCOL
 +                    + System.getenv(ENV_CONSUL_LOCATION)
 +                    + COLON
 +                    + "8500"
 +                    + SLASH
 +                    + "v1/agent/service/register";
 +        } else {
 +            return HTTP_PROTOCOL
 +                    + "localhost"
 +                    + COLON
 +                    + "8500"
 +                    + SLASH
 +                    + "v1/agent/service/register";
 +        }
 +    }
 +
 +}
diff --cc streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/management/pe/InvocableLifeCycle.java
index 2782cce,c5dce8d..168d998
--- a/streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/management/pe/InvocableLifeCycle.java
+++ b/streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/management/pe/InvocableLifeCycle.java
@@@ -15,19 -15,32 +15,19 @@@
   * limitations under the License.
   *
   */
 +package org.apache.streampipes.node.controller.container.management.pe;
  
 -@import '../../../../scss/sp/sp-dialog.scss';
 +import org.apache.streampipes.container.model.node.InvocableRegistration;
 +import org.apache.streampipes.model.Response;
  
 -.customize-section {
 -  display:flex;
 -  flex: 1 1 auto;
 -  padding: 20px;
 -}
 +public interface InvocableLifeCycle {
  
 -.padding-20 {
 -  padding: 20px;
 -}
 +    void register(InvocableRegistration registration);
  
 -.mb-10 {
 -  margin-bottom: 10px;
 -}
 +    Response invoke(String endpoint, String payload);
  
-     String detach(String runningInstanceId);
 -::ng-deep .pipeline-radio-group .mat-radio-label {
 -  padding: 0;
 -}
++    Response detach(String runningInstanceId);
  
 -.status-text {
 -  font-size: 14pt;
 -  margin-top:10px;
 -}
 +    void unregister();
  
 -.status-subtext {
 -  font-size: 12pt;
 -}
 +}
diff --cc streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/management/relay/DataStreamRelayManager.java
index 0000000,0000000..ccfa9c6
new file mode 100644
--- /dev/null
+++ b/streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/management/relay/DataStreamRelayManager.java
@@@ -1,0 -1,0 +1,109 @@@
++/*
++ * Licensed to the Apache Software Foundation (ASF) under one or more
++ * contributor license agreements.  See the NOTICE file distributed with
++ * this work for additional information regarding copyright ownership.
++ * The ASF licenses this file to You under the Apache License, Version 2.0
++ * (the "License"); you may not use this file except in compliance with
++ * the License.  You may obtain a copy of the License at
++ *
++ *    http://www.apache.org/licenses/LICENSE-2.0
++ *
++ * Unless required by applicable law or agreed to in writing, software
++ * distributed under the License is distributed on an "AS IS" BASIS,
++ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
++ * See the License for the specific language governing permissions and
++ * limitations under the License.
++ *
++ */
++package org.apache.streampipes.node.controller.container.management.relay;
++
++import com.fasterxml.jackson.core.JsonProcessingException;
++import org.apache.streampipes.model.Response;
++import org.apache.streampipes.model.SpDataStreamRelay;
++import org.apache.streampipes.model.SpDataStreamRelayContainer;
++import org.apache.streampipes.model.graph.DataProcessorInvocation;
++import org.apache.streampipes.model.grounding.TransportProtocol;
++import org.apache.streampipes.node.controller.container.management.relay.metrics.RelayMetrics;
++
++import java.util.HashMap;
++import java.util.List;
++import java.util.Map;
++import java.util.stream.Collectors;
++
++public class DataStreamRelayManager {
++
++    private static DataStreamRelayManager instance = null;
++
++    private DataStreamRelayManager() {}
++
++    public static DataStreamRelayManager getInstance() {
++        if (instance == null) {
++            synchronized (DataStreamRelayManager.class) {
++                if (instance == null)
++                    instance = new DataStreamRelayManager();
++            }
++        }
++        return instance;
++    }
++
++    public Response startAdapterDataStreamRelay(SpDataStreamRelayContainer desc) {
++        String strategy = desc.getEventRelayStrategy();
++        String runningInstanceId = desc.getRunningStreamRelayInstanceId();
++        TransportProtocol source = desc.getInputGrounding().getTransportProtocol();
++
++        Map<String, EventRelay> eventRelayMap = new HashMap<>();
++
++        desc.getOutputStreamRelays().forEach(r -> {
++            TransportProtocol target = r.getEventGrounding().getTransportProtocol();
++            EventRelay eventRelay = new EventRelay(source, target, strategy);
++            eventRelay.start();
++            eventRelayMap.put(r.getElementId(), eventRelay);
++        });
++        RunningRelayInstances.INSTANCE.add(desc.getRunningStreamRelayInstanceId(), eventRelayMap);
++        return new Response(runningInstanceId,true,"");
++    }
++
++    public Response stopAdapterDataStreamRelay(String id) {
++        Map<String, EventRelay> relay = RunningRelayInstances.INSTANCE.get(id);
++        if (relay != null) {
++            relay.values().forEach(EventRelay::stop);
++        }
++        RunningRelayInstances.INSTANCE.remove(id);
++        return new Response(id, true, "");
++    }
++
++    public void startPipelineElementDataStreamRelay(DataProcessorInvocation graph) {
++        TransportProtocol source = graph
++                .getOutputStream()
++                .getEventGrounding()
++                .getTransportProtocol();
++
++        String strategy = graph.getEventRelayStrategy();
++        Map<String, EventRelay> eventRelayMap = new HashMap<>();
++
++        List<SpDataStreamRelay> dataStreamRelays = graph.getOutputStreamRelays();
++        dataStreamRelays.forEach(r -> {
++            TransportProtocol target = r.getEventGrounding().getTransportProtocol();
++            EventRelay eventRelay = new EventRelay(source, target, strategy);
++            eventRelay.start();
++            eventRelayMap.put(r.getElementId(), eventRelay);
++        });
++        RunningRelayInstances.INSTANCE.add(graph.getDeploymentRunningInstanceId(), eventRelayMap);
++    }
++
++    public void stopPipelineElementDataStreamRelay(String id) {
++        // Stop relay for invocable if existing
++        Map<String, EventRelay> relay = RunningRelayInstances.INSTANCE.get(id);
++        if (relay != null) {
++            relay.values().forEach(EventRelay::stop);
++        }
++        RunningRelayInstances.INSTANCE.remove(id);
++    }
++
++    public List<RelayMetrics> getAllRelays() {
++       return RunningRelayInstances.INSTANCE.getRunningInstances()
++                    .stream()
++                    .map(EventRelay::getRelayMetrics)
++                    .collect(Collectors.toList());
++    }
++}
diff --cc streampipes-pipeline-management/pom.xml
index 34d43de,d9444f2..5cf9721
--- a/streampipes-pipeline-management/pom.xml
+++ b/streampipes-pipeline-management/pom.xml
@@@ -99,6 -94,11 +99,16 @@@
          </dependency>
          <dependency>
              <groupId>org.apache.streampipes</groupId>
+             <artifactId>streampipes-serializers-json</artifactId>
+             <version>0.68.0-SNAPSHOT</version>
+         </dependency>
+         <dependency>
+             <groupId>org.apache.streampipes</groupId>
++            <artifactId>streampipes-serializers-jsonld</artifactId>
++            <version>0.68.0-SNAPSHOT</version>
++        </dependency>
++        <dependency>
++            <groupId>org.apache.streampipes</groupId>
              <artifactId>streampipes-storage-management</artifactId>
              <version>0.68.0-SNAPSHOT</version>
          </dependency>
diff --cc streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/http/HttpRequestBuilder.java
index f6706eb,2077607..35509cb
--- a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/http/HttpRequestBuilder.java
+++ b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/http/HttpRequestBuilder.java
@@@ -24,23 -22,18 +24,23 @@@ import com.google.gson.JsonSyntaxExcept
  import org.apache.http.client.fluent.Request;
  import org.apache.http.client.fluent.Response;
  import org.apache.http.entity.ContentType;
 +import org.apache.streampipes.commons.Utils;
 +import org.apache.streampipes.model.base.InvocableStreamPipesEntity;
  import org.apache.streampipes.model.base.NamedStreamPipesEntity;
  import org.apache.streampipes.model.pipeline.PipelineElementStatus;
--import org.apache.streampipes.serializers.json.JacksonSerializer;
- import org.apache.streampipes.serializers.jsonld.JsonLdTransformer;
  import org.slf4j.Logger;
  import org.slf4j.LoggerFactory;
++import org.apache.streampipes.serializers.json.JacksonSerializer;
++import org.apache.streampipes.serializers.jsonld.JsonLdTransformer;
  
  import java.io.IOException;
  
  public class HttpRequestBuilder {
  
-   private NamedStreamPipesEntity payload;
-   private String endpointUrl;
+   private final NamedStreamPipesEntity payload;
 -  private final String belongsTo;
++  private final String endpointUrl;
 +
 +  private static final Integer CONNECT_TIMEOUT = 10000;
  
    private final static Logger LOG = LoggerFactory.getLogger(HttpRequestBuilder.class);
  
@@@ -50,20 -43,11 +50,18 @@@
    }
  
    public PipelineElementStatus invoke() {
 -    LOG.info("Invoking element: " + belongsTo);
      try {
-       String json;
 -      String jsonDocument = toJson();
 -      Response httpResp =
 -              Request.Post(belongsTo).bodyString(jsonDocument, ContentType.APPLICATION_JSON).connectTimeout(10000).execute();
 +      if (payload instanceof InvocableStreamPipesEntity) {
 +        LOG.info("Invoking pipeline element: " + endpointUrl);
-         json = jsonLd();
 +      } else {
 +        LOG.info("Invoking data stream relay: " + endpointUrl);
-         json = jackson();
 +      }
++      String json = toJson();
 +      Response httpResp = Request
 +              .Post(endpointUrl)
 +              .bodyString(json, ContentType.APPLICATION_JSON)
 +              .connectTimeout(CONNECT_TIMEOUT)
 +              .execute();
        return handleResponse(httpResp);
      } catch (Exception e) {
        LOG.error(e.getMessage());
@@@ -72,21 -56,12 +70,20 @@@
    }
  
    public PipelineElementStatus detach() {
 +    if (payload instanceof InvocableStreamPipesEntity) {
 +      LOG.info("Detaching pipeline element: " + endpointUrl);
 +    } else {
 +      LOG.info("Detaching data stream relay: " + endpointUrl);
 +    }
- 
      try {
 -      Response httpResp = Request.Delete(belongsTo).connectTimeout(10000).execute();
 +      Response httpResp = Request
 +              .Delete(endpointUrl)
 +              .connectTimeout(CONNECT_TIMEOUT)
 +              .execute();
        return handleResponse(httpResp);
      } catch (Exception e) {
 -      LOG.error("Could not stop pipeline " + belongsTo, e.getMessage());
 -      return new PipelineElementStatus(belongsTo, payload.getName(), false, e.getMessage());
 +      LOG.error("Could not stop pipeline " + endpointUrl, e.getMessage());
 +      return new PipelineElementStatus(endpointUrl, payload.getName(), false, e.getMessage());
      }
    }
  
@@@ -96,15 -73,11 +95,11 @@@
      return convert(streamPipesResp);
    }
  
-   private String jsonLd() throws Exception {
-     return Utils.asString(new JsonLdTransformer().toJsonLd(payload));
-   }
- 
 -  private String toJson() throws Exception {
 -    return JacksonSerializer.getObjectMapper().writeValueAsString(payload);
 +  private PipelineElementStatus convert(org.apache.streampipes.model.Response response) {
 +    return new PipelineElementStatus(endpointUrl, payload.getName(), response.isSuccess(), response.getOptionalMessage());
    }
  
-   private String jackson() throws JsonProcessingException {
 -  private PipelineElementStatus convert(org.apache.streampipes.model.Response response) {
 -    return new PipelineElementStatus(belongsTo, payload.getName(), response.isSuccess(), response.getOptionalMessage());
++  private String toJson() throws Exception {
 +    return JacksonSerializer.getObjectMapper().writeValueAsString(payload);
    }
  }
diff --cc streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/http/InvocableEntityUrlGenerator.java
index 814f026,0000000..1b4cb35
mode 100644,000000..100644
--- a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/http/InvocableEntityUrlGenerator.java
+++ b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/http/InvocableEntityUrlGenerator.java
@@@ -1,105 -1,0 +1,106 @@@
 +/*
 + * Licensed to the Apache Software Foundation (ASF) under one or more
 + * contributor license agreements.  See the NOTICE file distributed with
 + * this work for additional information regarding copyright ownership.
 + * The ASF licenses this file to You under the Apache License, Version 2.0
 + * (the "License"); you may not use this file except in compliance with
 + * the License.  You may obtain a copy of the License at
 + *
 + *    http://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing, software
 + * distributed under the License is distributed on an "AS IS" BASIS,
 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 + * See the License for the specific language governing permissions and
 + * limitations under the License.
 + *
 + */
 +package org.apache.streampipes.manager.execution.http;
 +
 +import org.apache.streampipes.config.consul.ConsulSpConfig;
 +import org.apache.streampipes.container.util.ConsulUtil;
 +import org.apache.streampipes.model.base.InvocableStreamPipesEntity;
 +import org.apache.streampipes.model.graph.DataProcessorInvocation;
 +
 +public class InvocableEntityUrlGenerator extends EndpointUrlGenerator<InvocableStreamPipesEntity> {
 +
++    private static final String DATA_PROCESSOR_PREFIX = "sepa";
++    private static final String DATA_SINK_PREFIX = "sec";
 +    private static final String DEFAULT_TARGET_NODE_ID = "default";
-     private static final String INVOKE_ROUTE = "api/v2/node/container/invoke";
-     private static final String DETACH_ROUTE = "api/v2/node/container/detach";
++    private static final String ELEMENT_ROUTE = "api/v2/node/element";
 +
 +    public InvocableEntityUrlGenerator(InvocableStreamPipesEntity graph) {
 +        super(graph);
 +    }
 +
 +    @Override
 +    public String generateInvokeEndpoint() {
 +        if (isDefaultTarget()) {
 +            // default deployments to primary pipeline element
 +            return getDefaultEndpoint();
 +        } else {
 +            // edge deployments to secondary pipeline element
-             return getDeploymentTargetEndpoint(INVOKE_ROUTE);
++            return getDeploymentTargetEndpoint(ELEMENT_ROUTE);
 +        }
 +    }
 +
 +    @Override
 +    public String generateDetachEndpoint() {
 +        if (isDefaultTarget()) {
 +            // detach primary pipeline element
 +            return getDefaultEndpoint() + SLASH + graph.getDeploymentRunningInstanceId();
 +        } else {
 +            // detach edge deployments to secondary pipeline element
-             return getDeploymentTargetEndpoint(DETACH_ROUTE) + SLASH + graph.getDeploymentRunningInstanceId();
++            return getDeploymentTargetEndpoint(ELEMENT_ROUTE) + SLASH + graph.getDeploymentRunningInstanceId();
 +        }
 +    }
 +
 +    // Helper methods
 +
 +    private boolean isDefaultTarget() {
 +        return graph.getDeploymentTargetNodeId() == null ||
 +                graph.getDeploymentTargetNodeId().equals(DEFAULT_TARGET_NODE_ID);
 +    }
 +
 +    private String getDefaultEndpoint() {
 +        return graph.getBelongsTo();
 +    }
 +
 +    private String getDeploymentTargetEndpoint(String route) {
 +        modifyInvocableElement();
 +        return HTTP_PROTOCOL + graph.getDeploymentTargetNodeHostname() + COLON + graph.getDeploymentTargetNodePort()
 +                + SLASH
 +                + route
 +                + SLASH
 +                + getIdentifier()
 +                + SLASH
 +                + graph.getAppId();
 +    }
 +
 +    private void modifyInvocableElement() {
 +        // Necessary because secondary pipeline element description is not stored in backend
 +        // It uses information from primary pipeline element. Node controller will locally forward
 +        // request accordingly, thus fields must be correct.
 +        String route = ConsulSpConfig.SERVICE_ROUTE_PREFIX
 +                + graph.getElementEndpointServiceName()
 +                + SLASH
 +                + ConsulSpConfig.BASE_PREFIX
 +                + SLASH
 +                + ConsulSpConfig.SECONDARY_NODE_KEY
 +                + SLASH
 +                + graph.getDeploymentTargetNodeId()
 +                + SLASH;
 +
 +        String host = ConsulUtil.getValueForRoute(route + "SP_HOST", String.class);
 +        int port = ConsulUtil.getValueForRoute(route + "SP_PORT", Integer.class);
 +        graph.setElementEndpointHostname(host);
 +        graph.setElementEndpointPort(port);
 +        graph.setBelongsTo(HTTP_PROTOCOL + host + COLON + port + SLASH + getIdentifier() + SLASH + graph.getAppId());
 +        graph.setElementId(graph.getBelongsTo() + SLASH + graph.getDeploymentRunningInstanceId());
 +    }
 +
 +    private String getIdentifier() {
-         return graph instanceof DataProcessorInvocation ? "sepa" : "sec";
++        return graph instanceof DataProcessorInvocation ? DATA_PROCESSOR_PREFIX : DATA_SINK_PREFIX;
 +    }
 +
 +}
diff --cc streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/node/NodeClusterManager.java
index c49a53f,0000000..d37e23a
mode 100644,000000..100644
--- a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/node/NodeClusterManager.java
+++ b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/node/NodeClusterManager.java
@@@ -1,84 -1,0 +1,84 @@@
 +/*
 + * Licensed to the Apache Software Foundation (ASF) under one or more
 + * contributor license agreements.  See the NOTICE file distributed with
 + * this work for additional information regarding copyright ownership.
 + * The ASF licenses this file to You under the Apache License, Version 2.0
 + * (the "License"); you may not use this file except in compliance with
 + * the License.  You may obtain a copy of the License at
 + *
 + *    http://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing, software
 + * distributed under the License is distributed on an "AS IS" BASIS,
 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 + * See the License for the specific language governing permissions and
 + * limitations under the License.
 + *
 + */
 +package org.apache.streampipes.manager.node;
 +
 +import com.fasterxml.jackson.core.JsonProcessingException;
 +import org.apache.http.client.ClientProtocolException;
 +import org.apache.http.client.fluent.Request;
 +import org.apache.http.entity.ContentType;
 +import org.apache.streampipes.model.node.NodeInfoDescription;
 +import org.apache.streampipes.serializers.json.JacksonSerializer;
 +import org.slf4j.Logger;
 +import org.slf4j.LoggerFactory;
 +
 +import java.io.IOException;
 +
 +public enum NodeClusterManager {
 +    INSTANCE;
 +
 +
 +    private static final Logger LOG =
 +            LoggerFactory.getLogger(NodeClusterManager.class.getCanonicalName());
 +
 +
 +    public boolean updateNodeInfoDescription(NodeInfoDescription desc) {
 +        boolean successfullyUpdated = false;
 +        try {
 +            String body = JacksonSerializer.getObjectMapper().writeValueAsString(desc);
 +            String url = makeNodeControllerEndpoint(desc);
 +
 +            LOG.info("Trying to update description for node controller: " + url);
 +
 +            boolean connected = false;
 +            while (!connected) {
 +                connected = put(url, body);
 +
 +                if (!connected) {
 +                    LOG.info("Retrying in 5 seconds");
 +                    try {
 +                        Thread.sleep(5000);
 +                    } catch (InterruptedException e) {
 +                        e.printStackTrace();
 +                    }
 +                }
 +            }
 +            successfullyUpdated = true;
 +        } catch (IOException e) {
 +            e.printStackTrace();
 +        }
 +        return successfullyUpdated;
 +    }
 +
 +    private String makeNodeControllerEndpoint(NodeInfoDescription desc) {
-         return "http://" + desc.getHostname() + ":" + desc.getPort() + "/api/v2/node/update";
++        return "http://" + desc.getHostname() + ":" + desc.getPort() + "/api/v2/node/info";
 +    }
 +
 +    private boolean put(String url, String body) {
 +        try {
 +            Request.Put(url)
 +                    .bodyString(body, ContentType.APPLICATION_JSON)
 +                    .connectTimeout(1000)
 +                    .socketTimeout(100000)
 +                    .execute();
 +            return true;
 +        } catch (IOException e) {
 +            e.printStackTrace();
 +        }
 +        return false;
 +    }
 +}
diff --cc ui/package.json
index 31a9149,3808ec0..40c64e2
--- a/ui/package.json
+++ b/ui/package.json
@@@ -36,11 -36,11 +36,11 @@@
      "@ngx-loading-bar/core": "5.1.0",
      "@ngx-loading-bar/http-client": "5.1.0",
      "@stomp/ng2-stompjs": "7.2.0",
-     "@swimlane/ngx-charts": "13.0.2",
-     "angular-datatables": "9.0.2",
+     "@swimlane/ngx-charts": "16.0.0",
+     "angular-datatables": "^10.0.0",
      "angular-gridster2": "8.3.0",
      "angular-loading-bar": "0.8.0",
 -    "angular-material-icons": "0.4.0",
 +    "angular-material-icons": "^0.4.0",
      "angular-plotly.js": "1.5.0",
      "angular-tree-component": "8.5.6",
      "angular-ui-tree": "2.9.0",
diff --cc ui/src/app/platform-services/platform.module.ts
index b09c0ad,85962e5..bc6f440
--- a/ui/src/app/platform-services/platform.module.ts
+++ b/ui/src/app/platform-services/platform.module.ts
@@@ -22,7 -22,7 +22,8 @@@ import {PipelineService} from "./apis/p
  import {PlatformServicesCommons} from "./apis/commons.service";
  import {PipelineElementEndpointService} from "./apis/pipeline-element-endpoint.service";
  import {FilesService} from "./apis/files.service";
 +import {NodeService} from "./apis/node.service";
+ import {MeasurementUnitsService} from "./apis/measurement-units.service";
  
  @NgModule({
    imports: [],