You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@streampipes.apache.org by wi...@apache.org on 2021/05/12 07:07:19 UTC

[incubator-streampipes] 01/03: [WIP] refactoring node-controller

This is an automated email from the ASF dual-hosted git repository.

wiener pushed a commit to branch edge-extensions
in repository https://gitbox.apache.org/repos/asf/incubator-streampipes.git

commit bdfecf03daf33a6e1381f2e5ed27db56ba423177
Author: Patrick Wiener <wi...@fzi.de>
AuthorDate: Mon May 10 23:24:07 2021 +0200

    [WIP] refactoring node-controller
---
 .../extensions/ExtensionsModelSubmitter.java       |   8 +-
 .../api/ContainerDeploymentResource.java           |   4 +-
 .../api/NodeInfoDescriptionResource.java           |   8 +-
 .../container/DockerExtensionsContainer.java       |   1 -
 .../controller/management/node/INodeManager.java   |  34 ++++
 .../controller/management/node/NodeManager.java    | 199 +++++++--------------
 .../orchestrator/DockerEngineManager.java          |   2 +-
 .../docker/AbstractStreamPipesDockerContainer.java |  17 +-
 .../orchestrator/docker/utils/DockerUtils.java     |  41 ++++-
 .../management/pe/InvocableElementManager.java     |   4 +-
 .../node/controller/utils/HttpUtils.java           | 143 +++++++++++++++
 .../node/controller/utils/SocketUtils.java         |  37 ++++
 12 files changed, 341 insertions(+), 157 deletions(-)

diff --git a/streampipes-container-extensions/src/main/java/org/apache/streampipes/container/extensions/ExtensionsModelSubmitter.java b/streampipes-container-extensions/src/main/java/org/apache/streampipes/container/extensions/ExtensionsModelSubmitter.java
index 56a6004..cbf2ed0 100644
--- a/streampipes-container-extensions/src/main/java/org/apache/streampipes/container/extensions/ExtensionsModelSubmitter.java
+++ b/streampipes-container-extensions/src/main/java/org/apache/streampipes/container/extensions/ExtensionsModelSubmitter.java
@@ -103,12 +103,12 @@ public abstract class ExtensionsModelSubmitter extends ModelSubmitter<EdgeExtens
 
             boolean connected = false;
             while (!connected) {
-                LOG.info("Trying to connect to the node controller: " + nodeControllerUrl);
+                LOG.debug("Trying to connect to the node controller: " + nodeControllerUrl);
                 connected = NodeControllerRestClient.register(nodeControllerUrl,
                         getContainerDescription(conf, true));
 
                 if (!connected) {
-                    LOG.info("Retrying in 5 seconds");
+                    LOG.debug("Retrying in 5 seconds");
                     try {
                         Thread.sleep(5000);
                     } catch (InterruptedException e) {
@@ -129,12 +129,12 @@ public abstract class ExtensionsModelSubmitter extends ModelSubmitter<EdgeExtens
 
             boolean connected = false;
             while (!connected) {
-                LOG.info("Trying to connect to master in backend: " + backendUrl);
+                LOG.debug("Trying to connect to master in backend: " + backendUrl);
                 connected = MasterRestClient.register(backendUrl,
                         getContainerDescription(conf,  false));
 
                 if (!connected) {
-                    LOG.info("Retrying in 5 seconds");
+                    LOG.debug("Retrying in 5 seconds");
                     try {
                         Thread.sleep(5000);
                     } catch (InterruptedException e) {
diff --git a/streampipes-node-controller/src/main/java/org/apache/streampipes/node/controller/api/ContainerDeploymentResource.java b/streampipes-node-controller/src/main/java/org/apache/streampipes/node/controller/api/ContainerDeploymentResource.java
index cc20cb0..42c2ab6 100644
--- a/streampipes-node-controller/src/main/java/org/apache/streampipes/node/controller/api/ContainerDeploymentResource.java
+++ b/streampipes-node-controller/src/main/java/org/apache/streampipes/node/controller/api/ContainerDeploymentResource.java
@@ -51,7 +51,7 @@ public class ContainerDeploymentResource extends AbstractResource {
         ContainerDeploymentStatus status = DockerEngineManager.getInstance().deploy(container);
 
         if (status.getStatus() == ContainerStatus.DEPLOYED) {
-            NodeManager.getInstance().addToRegisteredContainers(status.getContainer());
+            NodeManager.getInstance().registerContainer(status.getContainer());
         }
         return ok(status);
     }
@@ -64,7 +64,7 @@ public class ContainerDeploymentResource extends AbstractResource {
 
         if (status.getStatus() == ContainerStatus.REMOVED) {
             InvocableElementManager.getInstance().unregister();
-            NodeManager.getInstance().removeFromRegisteredContainers(status.getContainer());
+            NodeManager.getInstance().deregisterContainer(status.getContainer());
         }
         return ok(status);
     }
diff --git a/streampipes-node-controller/src/main/java/org/apache/streampipes/node/controller/api/NodeInfoDescriptionResource.java b/streampipes-node-controller/src/main/java/org/apache/streampipes/node/controller/api/NodeInfoDescriptionResource.java
index d37e246..ed11292 100644
--- a/streampipes-node-controller/src/main/java/org/apache/streampipes/node/controller/api/NodeInfoDescriptionResource.java
+++ b/streampipes-node-controller/src/main/java/org/apache/streampipes/node/controller/api/NodeInfoDescriptionResource.java
@@ -36,7 +36,7 @@ public class NodeInfoDescriptionResource extends AbstractResource {
     @GET
     @Produces(MediaType.APPLICATION_JSON)
     public Response getNodeInfo() {
-        return ok(NodeManager.getInstance().retrieveNodeInfoDescription());
+        return ok(NodeManager.getInstance().getNode());
     }
 
     @PUT
@@ -44,7 +44,7 @@ public class NodeInfoDescriptionResource extends AbstractResource {
     @Consumes(MediaType.APPLICATION_JSON)
     @Produces(MediaType.APPLICATION_JSON)
     public Response updateNodeInfo(NodeInfoDescription desc) {
-        return ok(NodeManager.getInstance().updateNodeInfoDescription(desc));
+        return ok(NodeManager.getInstance().updateNode(desc));
     }
 
     @POST
@@ -54,9 +54,9 @@ public class NodeInfoDescriptionResource extends AbstractResource {
     @Produces(MediaType.APPLICATION_JSON)
     public Response activateNode(@PathParam("action") String action) {
         if (action.equals(ACTIVATE)) {
-            return ok(NodeManager.getInstance().activate());
+            return ok(NodeManager.getInstance().activateNode());
         } else if (action.equals(DEACTIVATE)) {
-            return ok(NodeManager.getInstance().deactivate());
+            return ok(NodeManager.getInstance().deactivateNode());
         } else return fail();
     }
 
diff --git a/streampipes-node-controller/src/main/java/org/apache/streampipes/node/controller/container/DockerExtensionsContainer.java b/streampipes-node-controller/src/main/java/org/apache/streampipes/node/controller/container/DockerExtensionsContainer.java
index b4d6433..a455714 100644
--- a/streampipes-node-controller/src/main/java/org/apache/streampipes/node/controller/container/DockerExtensionsContainer.java
+++ b/streampipes-node-controller/src/main/java/org/apache/streampipes/node/controller/container/DockerExtensionsContainer.java
@@ -38,7 +38,6 @@ public class DockerExtensionsContainer extends AbstractStreamPipesDockerContaine
                 .withExposedPorts(Ports.withMapping("8090"))
                 .withEnvironmentVariables(ContainerEnvBuilder.create()
                         .addNodeEnvs(generateStreamPipesNodeEnvs())
-                        .add("CONSUL_LOCATION", NodeConfiguration.getConsulHost())
                         .add("SP_HOST", NodeConfiguration.getNodeHost())
                         .add("SP_PORT", "8090")
                         .build())
diff --git a/streampipes-node-controller/src/main/java/org/apache/streampipes/node/controller/management/node/INodeManager.java b/streampipes-node-controller/src/main/java/org/apache/streampipes/node/controller/management/node/INodeManager.java
new file mode 100644
index 0000000..e5d799d
--- /dev/null
+++ b/streampipes-node-controller/src/main/java/org/apache/streampipes/node/controller/management/node/INodeManager.java
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.apache.streampipes.node.controller.management.node;
+
+import org.apache.streampipes.model.message.Message;
+import org.apache.streampipes.model.node.NodeInfoDescription;
+import org.apache.streampipes.model.node.container.DockerContainer;
+
+public interface INodeManager {
+    void init();
+    void addNode(NodeInfoDescription node);
+    NodeInfoDescription getNode();
+    Message updateNode(NodeInfoDescription node);
+    Message activateNode();
+    Message deactivateNode();
+    boolean registerNode();
+    void registerDeployedContainer(DockerContainer container);
+    void deregisterDeployContainer(DockerContainer container);
+}
diff --git a/streampipes-node-controller/src/main/java/org/apache/streampipes/node/controller/management/node/NodeManager.java b/streampipes-node-controller/src/main/java/org/apache/streampipes/node/controller/management/node/NodeManager.java
index fee44d0..8d5fe95 100644
--- a/streampipes-node-controller/src/main/java/org/apache/streampipes/node/controller/management/node/NodeManager.java
+++ b/streampipes-node-controller/src/main/java/org/apache/streampipes/node/controller/management/node/NodeManager.java
@@ -17,19 +17,17 @@
  */
 package org.apache.streampipes.node.controller.management.node;
 
-import org.apache.http.client.fluent.Request;
-import org.apache.http.client.fluent.Response;
-import org.apache.http.entity.ContentType;
 import org.apache.streampipes.model.client.version.VersionInfo;
 import org.apache.streampipes.model.message.Message;
 import org.apache.streampipes.model.message.NotificationType;
 import org.apache.streampipes.model.message.Notifications;
-import org.apache.streampipes.model.message.SuccessMessage;
 import org.apache.streampipes.model.node.*;
 import org.apache.streampipes.model.node.container.DeploymentContainer;
 import org.apache.streampipes.model.node.container.DockerContainer;
 import org.apache.streampipes.model.node.meta.GeoLocation;
 import org.apache.streampipes.node.controller.config.NodeConfiguration;
+import org.apache.streampipes.node.controller.utils.HttpUtils;
+import org.apache.streampipes.node.controller.utils.SocketUtils;
 import org.apache.streampipes.serializers.json.JacksonSerializer;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -42,15 +40,11 @@ import java.util.List;
 public class NodeManager {
     private static final Logger LOG = LoggerFactory.getLogger(NodeManager.class.getCanonicalName());
 
-    private static final String PROTOCOL = "http://";
-    private static final String SLASH = "/";
-    private static final String COLON = ":";
-    private static final String BACKEND_BASE_ROUTE = "/streampipes-backend";
-    private static final String BACKEND_VERSION_ROUTE = "/api/v2/info/versions";
-    private static final String NODE_REGISTRATION_ROUTE = "/api/v2/users/admin@streampipes.org/nodes";
-    private static final String NODE_SYNC_UPDATE_ROUTE = "/api/v2/users/admin@streampipes.org/nodes/sync";
+    private static final String BACKEND_BASE_ROUTE = "/streampipes-backend/api/v2";
+    private static final String BACKEND_VERSION_ROUTE = BACKEND_BASE_ROUTE + "/info/versions";
+    private static final String NODE_REGISTRATION_ROUTE = BACKEND_BASE_ROUTE + "/users/admin@streampipes.org/nodes";
+    private static final String NODE_SYNC_UPDATE_ROUTE = BACKEND_BASE_ROUTE + "/users/admin@streampipes.org/nodes/sync";
     private static final long RETRY_INTERVAL_MS = 5000;
-    private static final int CONNECT_TIMEOUT_MS = 10000;
 
     private NodeInfoDescription nodeInfo = new NodeInfoDescription();
 
@@ -68,7 +62,7 @@ public class NodeManager {
         nodeInfo = n;
     }
 
-    public NodeInfoDescription retrieveNodeInfoDescription() {
+    public NodeInfoDescription getNode() {
         return nodeInfo;
     }
 
@@ -105,32 +99,32 @@ public class NodeManager {
         add(nodeInfoDescription);
     }
 
-    public Message updateNodeInfoDescription(NodeInfoDescription desc) {
+    public Message updateNode(NodeInfoDescription desc) {
         LOG.info("Update node description for node controller: {}", desc.getNodeControllerId());
         this.nodeInfo = desc;
         return Notifications.success(NotificationType.OPERATION_SUCCESS);
     }
 
-    public Message activate() {
+    public Message activateNode() {
         LOG.info("Activate node controller");
         this.nodeInfo.setActive(true);
         return Notifications.success(NotificationType.OPERATION_SUCCESS);
     }
 
-    public Message deactivate() {
+    public Message deactivateNode() {
         LOG.info("Deactivate node controller");
         this.nodeInfo.setActive(false);
         return Notifications.success(NotificationType.OPERATION_SUCCESS);
     }
 
-    public void addToRegisteredContainers(DockerContainer container) {
+    public void registerContainer(DockerContainer container) {
         if(!registered(container)) {
             this.nodeInfo.addRegisteredContainer(container);
             syncWithNodeClusterManager();
         }
     }
 
-    public void removeFromRegisteredContainers(DockerContainer container) {
+    public void deregisterContainer(DockerContainer container) {
         if(registered(container)) {
             this.nodeInfo.removeRegisteredContainer(container);
             syncWithNodeClusterManager();
@@ -145,155 +139,92 @@ public class NodeManager {
 
     public boolean register() {
         boolean connected = false;
-        boolean registered = false;
-        try {
-            String body = JacksonSerializer.getObjectMapper().writeValueAsString(this.nodeInfo);
-            String endpoint = generateRegistrationEndpoint();
-
-            while (!connected) {
-                LOG.info("Trying to register node controller at StreamPipes cluster management: " + endpoint);
-                connected = isReady(NodeConfiguration.getBackendHost(), NodeConfiguration.getBackendPort());
-                if (!connected) {
-                    LOG.info("Retrying in {} seconds", (RETRY_INTERVAL_MS / 1000));
-                    try {
-                        Thread.sleep(RETRY_INTERVAL_MS);
-                    } catch (InterruptedException e) {
-                        e.printStackTrace();
-                    }
+        boolean registered;
+        String host = NodeConfiguration.getBackendHost();
+        int port = NodeConfiguration.getBackendPort();
+        String endpoint = HttpUtils.generateEndpoint(host, port, NODE_REGISTRATION_ROUTE);
+
+        while (!connected) {
+            LOG.info("Trying to register node controller at StreamPipes cluster management: " + endpoint);
+            connected = SocketUtils.isReady(NodeConfiguration.getBackendHost(), NodeConfiguration.getBackendPort());
+            if (!connected) {
+                LOG.info("Retrying in {} seconds", (RETRY_INTERVAL_MS / 1000));
+                try {
+                    Thread.sleep(RETRY_INTERVAL_MS);
+                } catch (InterruptedException e) {
+                    e.printStackTrace();
                 }
             }
+        }
 
-            registered = post(endpoint, body);
-            if (registered) {
-                LOG.info("Successfully registered node controller at StreamPipes cluster management");
-            }
-
-        } catch (IOException e) {
-            e.printStackTrace();
+        registered = HttpUtils.post(endpoint, this.nodeInfo);
+        if (registered) {
+            LOG.info("Successfully registered node controller at StreamPipes cluster management");
         }
+
         return registered;
     }
 
 
     private boolean syncWithNodeClusterManager() {
         boolean connected = false;
-        try {
-            String body = JacksonSerializer.getObjectMapper().writeValueAsString(this.nodeInfo);
-            String endpoint = generateSyncronizationEndpoint();
-
-            LOG.info("Trying to sync node updates with StreamPipes cluster management: " + endpoint);
-
-            while (!connected) {
-                connected = post(endpoint, body);
-                if (!connected) {
-                    LOG.info("Retrying in {} seconds", (RETRY_INTERVAL_MS / 1000));
-                    try {
-                        Thread.sleep(RETRY_INTERVAL_MS);
-                    } catch (InterruptedException e) {
-                        e.printStackTrace();
-                    }
+        String host = NodeConfiguration.getBackendHost();
+        int port = NodeConfiguration.getBackendPort();
+
+        String endpoint = HttpUtils.generateEndpoint(host, port, NODE_SYNC_UPDATE_ROUTE);
+
+        LOG.info("Trying to sync node updates with StreamPipes cluster management: " + endpoint);
+
+        while (!connected) {
+            connected = HttpUtils.post(endpoint, this.nodeInfo);
+            if (!connected) {
+                LOG.info("Retrying in {} seconds", (RETRY_INTERVAL_MS / 1000));
+                try {
+                    Thread.sleep(RETRY_INTERVAL_MS);
+                } catch (InterruptedException e) {
+                    e.printStackTrace();
                 }
             }
-            LOG.info("Successfully synced node controller with StreamPipes cluster management");
-        } catch (IOException e) {
-            e.printStackTrace();
         }
+        LOG.info("Successfully synced node controller with StreamPipes cluster management");
         return connected;
     }
 
     public String getStreamPipesVersion() {
         boolean connected = false;
         VersionInfo versionInfo = new VersionInfo();
-        try {
-            String endpoint = generateVersionEndpoint();
+        String host = NodeConfiguration.getBackendHost();
+        int port = NodeConfiguration.getBackendPort();
 
-            LOG.info("Trying to retrieve StreamPipes version from backend: " + endpoint);
+        String endpoint = HttpUtils.generateEndpoint(host, port, BACKEND_VERSION_ROUTE);
+        String bearerToken = NodeConfiguration.getNodeApiKey();
 
-            while (!connected) {
-                Response response = get(endpoint);
-                versionInfo = deserialize(response, VersionInfo.class);
+        LOG.info("Trying to retrieve StreamPipes version from backend: " + endpoint);
 
-                if (versionInfo.getBackendVersion() != null) {
-                    connected = true;
-                }
+        while (!connected) {
+            versionInfo = HttpUtils.get(endpoint, bearerToken, VersionInfo.class);
+
+            if (versionInfo.getBackendVersion() != null) {
+                connected = true;
+            }
 
-                if (!connected) {
-                    LOG.info("Retrying in {} seconds", (RETRY_INTERVAL_MS / 1000));
-                    try {
-                        Thread.sleep(RETRY_INTERVAL_MS);
-                    } catch (InterruptedException e) {
-                        e.printStackTrace();
-                    }
+            if (!connected) {
+                LOG.info("Retrying in {} seconds", (RETRY_INTERVAL_MS / 1000));
+                try {
+                    Thread.sleep(RETRY_INTERVAL_MS);
+                } catch (InterruptedException e) {
+                    e.printStackTrace();
                 }
             }
-            LOG.info("Successfully retrieved StreamPipes version from backend");
-        } catch (IOException e) {
-            e.printStackTrace();
         }
+        LOG.info("Successfully retrieved StreamPipes version from backend");
         return versionInfo.getBackendVersion();
     }
 
-    private Response get(String endpoint) throws IOException {
-        return Request.Get(endpoint)
-                .connectTimeout(CONNECT_TIMEOUT_MS)
-                .addHeader("Authorization", "Bearer " + NodeConfiguration.getNodeApiKey())
-                .execute();
-    }
-
-    private boolean post(String endpoint, String body) throws IOException {
-        Response response = Request.Post(endpoint)
-                .bodyString(body, ContentType.APPLICATION_JSON)
-                .connectTimeout(CONNECT_TIMEOUT_MS)
-                //.addHeader("Authorization", "Bearer " + NodeControllerConfig.INSTANCE.getApiKey())
-                .execute();
-        return handleResponse(response);
-    }
-
     // Helpers
 
-    private static boolean isReady(String host, int port) {
-        try {
-            InetSocketAddress sa = new InetSocketAddress(host, port);
-            Socket ss = new Socket();
-            ss.connect(sa, 1000);
-            ss.close();
-        } catch(Exception e) {
-            return false;
-        }
-        return true;
-    }
-
     private boolean registered(DockerContainer container) {
         return this.nodeInfo.getRegisteredContainers().contains(container);
     }
 
-    private boolean handleResponse(Response response) throws IOException {
-        SuccessMessage message = deserialize(response, SuccessMessage.class);
-        return message.isSuccess();
-    }
-
-    private <T> T deserialize(Response response, Class<T> clazz) throws IOException {
-        String resp = response.returnContent().asString();
-        return JacksonSerializer.getObjectMapper().readValue(resp, clazz);
-    }
-
-    private String generateRegistrationEndpoint() {
-        return generateBaseEndpoint() + NODE_REGISTRATION_ROUTE;
-    }
-
-    private String generateSyncronizationEndpoint() {
-        return generateBaseEndpoint() + NODE_SYNC_UPDATE_ROUTE;
-    }
-
-    private String generateVersionEndpoint() {
-        return generateBaseEndpoint() + BACKEND_VERSION_ROUTE;
-    }
-
-    private String generateBaseEndpoint() {
-        return  PROTOCOL
-                + NodeConfiguration.getBackendHost()
-                + COLON
-                + NodeConfiguration.getBackendPort()
-                + BACKEND_BASE_ROUTE;
-    }
 }
diff --git a/streampipes-node-controller/src/main/java/org/apache/streampipes/node/controller/management/orchestrator/DockerEngineManager.java b/streampipes-node-controller/src/main/java/org/apache/streampipes/node/controller/management/orchestrator/DockerEngineManager.java
index 40a98f6..af9b163 100644
--- a/streampipes-node-controller/src/main/java/org/apache/streampipes/node/controller/management/orchestrator/DockerEngineManager.java
+++ b/streampipes-node-controller/src/main/java/org/apache/streampipes/node/controller/management/orchestrator/DockerEngineManager.java
@@ -61,7 +61,7 @@ public class DockerEngineManager implements IContainerEngine {
             ContainerDeploymentStatus status = deploy(container);
 
             if (status.getStatus() == ContainerStatus.DEPLOYED) {
-                NodeManager.getInstance().addToRegisteredContainers(status.getContainer());
+                NodeManager.getInstance().registerContainer(status.getContainer());
             }
         });
     }
diff --git a/streampipes-node-controller/src/main/java/org/apache/streampipes/node/controller/management/orchestrator/docker/AbstractStreamPipesDockerContainer.java b/streampipes-node-controller/src/main/java/org/apache/streampipes/node/controller/management/orchestrator/docker/AbstractStreamPipesDockerContainer.java
index b8272a9..e5fb8fa 100644
--- a/streampipes-node-controller/src/main/java/org/apache/streampipes/node/controller/management/orchestrator/docker/AbstractStreamPipesDockerContainer.java
+++ b/streampipes-node-controller/src/main/java/org/apache/streampipes/node/controller/management/orchestrator/docker/AbstractStreamPipesDockerContainer.java
@@ -36,11 +36,18 @@ public abstract class AbstractStreamPipesDockerContainer {
 
     public List<String> generateStreamPipesNodeEnvs() {
         return new ArrayList<>(Arrays.asList(
-                toEnv(EnvConfigParam.NODE_CONTROLLER_ID.getEnvironmentKey(), NodeConfiguration.getNodeControllerId()),
-                toEnv(EnvConfigParam.NODE_CONTROLLER_CONTAINER_HOST.getEnvironmentKey(), NodeConfiguration.getNodeHost()),
-                toEnv( EnvConfigParam.NODE_CONTROLLER_CONTAINER_PORT.getEnvironmentKey(), NodeConfiguration.getNodeControllerPort()),
-                toEnv(EnvConfigParam.NODE_BROKER_CONTAINER_HOST.getEnvironmentKey(), NodeConfiguration.getNodeBrokerHost()),
-                toEnv(EnvConfigParam.NODE_BROKER_CONTAINER_PORT.getEnvironmentKey(), NodeConfiguration.getNodeBrokerPort())
+                toEnv(EnvConfigParam.NODE_CONTROLLER_ID.getEnvironmentKey(),
+                        NodeConfiguration.getNodeControllerId()),
+                toEnv(EnvConfigParam.NODE_CONTROLLER_CONTAINER_HOST.getEnvironmentKey(),
+                        NodeConfiguration.getNodeHost()),
+                toEnv( EnvConfigParam.NODE_CONTROLLER_CONTAINER_PORT.getEnvironmentKey(),
+                        NodeConfiguration.getNodeControllerPort()),
+                toEnv(EnvConfigParam.NODE_BROKER_CONTAINER_HOST.getEnvironmentKey(),
+                        NodeConfiguration.getNodeBrokerHost()),
+                toEnv(EnvConfigParam.NODE_BROKER_CONTAINER_PORT.getEnvironmentKey(),
+                        NodeConfiguration.getNodeBrokerPort()),
+                toEnv(EnvConfigParam.CONSUL_LOCATION.getEnvironmentKey(),
+                        NodeConfiguration.getBackendHost())
         ));
     }
 
diff --git a/streampipes-node-controller/src/main/java/org/apache/streampipes/node/controller/management/orchestrator/docker/utils/DockerUtils.java b/streampipes-node-controller/src/main/java/org/apache/streampipes/node/controller/management/orchestrator/docker/utils/DockerUtils.java
index 6748799..0080900 100644
--- a/streampipes-node-controller/src/main/java/org/apache/streampipes/node/controller/management/orchestrator/docker/utils/DockerUtils.java
+++ b/streampipes-node-controller/src/main/java/org/apache/streampipes/node/controller/management/orchestrator/docker/utils/DockerUtils.java
@@ -35,6 +35,8 @@ import org.slf4j.LoggerFactory;
 import java.io.BufferedReader;
 import java.io.IOException;
 import java.io.InputStreamReader;
+import java.net.InetAddress;
+import java.net.UnknownHostException;
 import java.util.*;
 import java.util.stream.Collectors;
 
@@ -173,15 +175,15 @@ public class DockerUtils {
         return Collections.emptyList();
     }
 
-    private static HostConfig getHostConfig(String network, String[] ports) {
+    private HostConfig getHostConfig(String network, String[] ports) {
         return getHostConfig(network, ports, null);
     }
 
-    private static HostConfig getHostConfig(String network) {
+    private HostConfig getHostConfig(String network) {
         return getHostConfig(network, null, null);
     }
 
-    private static HostConfig getHostConfig(String network, String[] ports, List<String> volumes) {
+    private HostConfig getHostConfig(String network, String[] ports, List<String> volumes) {
         Map<String, List<PortBinding>> portBindings = new HashMap<>();
         if (ports != null) {
             for (String port : ports) {
@@ -189,6 +191,9 @@ public class DockerUtils {
             }
         }
 
+        // find extra hosts in node controller that should also be added to the container
+        ImmutableList<String> configuredExtraHosts = findConfiguredExtraHostsFromNodeController();
+
         if (Objects.requireNonNull(volumes).size() > 0) {
             List<HostConfig.Bind> allVolumeBinds = new ArrayList<>();
             for (String volume: volumes) {
@@ -202,6 +207,7 @@ public class DockerUtils {
             return HostConfig.builder()
                     .portBindings(portBindings)
                     .networkMode(network)
+                    .extraHosts(configuredExtraHosts)
                     .restartPolicy(HostConfig.RestartPolicy.unlessStopped())
                     .appendBinds(allVolumeBinds.toArray(new HostConfig.Bind[0]))
                     .build();
@@ -209,6 +215,7 @@ public class DockerUtils {
             return HostConfig.builder()
                     .portBindings(portBindings)
                     .networkMode(network)
+                    .extraHosts(configuredExtraHosts)
                     .restartPolicy(HostConfig.RestartPolicy.unlessStopped())
                     .build();
         }
@@ -255,7 +262,7 @@ public class DockerUtils {
         }
     }
 
-        public DockerInfo getDockerInfo() {
+    public DockerInfo getDockerInfo() {
         DockerInfo dockerInfo = new DockerInfo();
         try {
             Info info = docker.info();
@@ -350,4 +357,30 @@ public class DockerUtils {
         }
         return modifyPorts.toArray(new String[0]);
     }
+
+    // TODO: find better option to find specified extra hosts
+    private ImmutableList<String> findConfiguredExtraHostsFromNodeController() {
+        ImmutableList<String> extraHosts = null;
+        try {
+            // currently assumes that node controller container name = hostname
+            String hostname = InetAddress.getLocalHost().getHostName();
+            Optional<Container> container = DockerUtils.getInstance().getContainerList()
+                    .stream()
+                    .filter(c -> c.names().contains("/" + hostname))
+                    .findAny();
+
+            if(container.isPresent()) {
+                String id = container.get().id();
+                extraHosts =  docker
+                        .inspectContainer(id)
+                        .hostConfig()
+                        .extraHosts();
+            }
+
+            return extraHosts;
+
+        } catch (DockerException | InterruptedException | UnknownHostException e) {
+            throw new SpRuntimeException("Failed to find configured extra hosts in node controller");
+        }
+    }
 }
diff --git a/streampipes-node-controller/src/main/java/org/apache/streampipes/node/controller/management/pe/InvocableElementManager.java b/streampipes-node-controller/src/main/java/org/apache/streampipes/node/controller/management/pe/InvocableElementManager.java
index 3e2ad80..39215e0 100644
--- a/streampipes-node-controller/src/main/java/org/apache/streampipes/node/controller/management/pe/InvocableElementManager.java
+++ b/streampipes-node-controller/src/main/java/org/apache/streampipes/node/controller/management/pe/InvocableElementManager.java
@@ -275,12 +275,12 @@ public class InvocableElementManager implements IPipelineElementLifeCycle {
     }
 
     private NodeInfoDescription getNodeInfoDescription() {
-        return NodeManager.getInstance().retrieveNodeInfoDescription();
+        return NodeManager.getInstance().getNode();
     }
 
     private void setSupportedPipelineElements(List<String> supportedPipelineElements) {
         NodeManager.getInstance()
-                .retrieveNodeInfoDescription()
+                .getNode()
                 .setSupportedElements(supportedPipelineElements);
     }
 
diff --git a/streampipes-node-controller/src/main/java/org/apache/streampipes/node/controller/utils/HttpUtils.java b/streampipes-node-controller/src/main/java/org/apache/streampipes/node/controller/utils/HttpUtils.java
new file mode 100644
index 0000000..b97386c
--- /dev/null
+++ b/streampipes-node-controller/src/main/java/org/apache/streampipes/node/controller/utils/HttpUtils.java
@@ -0,0 +1,143 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.apache.streampipes.node.controller.utils;
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+import org.apache.http.client.fluent.Request;
+import org.apache.http.client.fluent.Response;
+import org.apache.http.entity.ContentType;
+import org.apache.streampipes.commons.exceptions.SpRuntimeException;
+import org.apache.streampipes.model.eventrelay.SpDataStreamRelayContainer;
+import org.apache.streampipes.model.node.NodeInfoDescription;
+import org.apache.streampipes.node.controller.config.NodeConfiguration;
+import org.apache.streampipes.serializers.json.JacksonSerializer;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+
+public class HttpUtils {
+    private static final Logger LOG = LoggerFactory.getLogger(HttpUtils.class.getCanonicalName());
+
+    private static final String HTTP_PROTOCOL = "http";
+    private static final int CONNECT_TIMEOUT = 1000;
+
+    public static Response get(String url) {
+        try {
+            return Request.Get(url)
+                    .connectTimeout(CONNECT_TIMEOUT)
+                    .execute();
+        } catch (IOException e) {
+            throw new SpRuntimeException("Something went wrong during GET request", e);
+        }
+    }
+
+    public static <T>T get(String url, Class<T> clazz) {
+        try {
+            return deserialize(Request.Get(url)
+                            .connectTimeout(CONNECT_TIMEOUT)
+                            .execute()
+                            .returnContent().asString(), clazz);
+        } catch (IOException e) {
+            throw new SpRuntimeException("Something went wrong during GET request", e);
+        }
+    }
+
+    public static <T>T get(String url, String bearerToken, Class<T> clazz) {
+        try {
+            return deserialize(Request.Get(url)
+                    .connectTimeout(CONNECT_TIMEOUT)
+                    .addHeader("Authorization", "Bearer " + bearerToken)
+                    .execute()
+                    .returnContent().asString(), clazz);
+        } catch (IOException e) {
+            throw new SpRuntimeException("Something went wrong during GET request", e);
+        }
+    }
+
+    public static boolean put(String url, String body) {
+        try {
+            Request.Put(url)
+                    .bodyString(body, ContentType.APPLICATION_JSON)
+                    .connectTimeout(CONNECT_TIMEOUT)
+                    .execute();
+            return true;
+        } catch (IOException e) {
+            LOG.error("Something went wrong during PUT request", e);
+            return false;
+        }
+    }
+
+    public static boolean post(String url, String body) {
+        try {
+            Request.Post(url)
+                    .bodyString(body, ContentType.APPLICATION_JSON)
+                    .connectTimeout(CONNECT_TIMEOUT)
+                    .execute();
+            return true;
+        } catch (IOException e) {
+            LOG.error("Something went wrong during POST request", e);
+            return false;
+        }
+    }
+
+    public static <T> boolean post(String url, T object) {
+        String body = serialize(object);
+        try {
+            Request.Post(url)
+                    .bodyString(body, ContentType.APPLICATION_JSON)
+                    .connectTimeout(CONNECT_TIMEOUT)
+                    .execute();
+            return true;
+        } catch (IOException e) {
+            LOG.error("Something went wrong during POST request", e);
+            return false;
+        }
+    }
+
+    public static String generateEndpoint(String host, int port, String route) {
+
+        if (route.startsWith("/")) {
+            route = route.substring(1);
+        }
+
+        if (host != null && port != -1) {
+            return String.format("%s://%s:%s/%s", HTTP_PROTOCOL, host, port, route);
+        } else {
+            throw new SpRuntimeException("Could not generate endpoint");
+        }
+    }
+
+    public static <T> String serialize(T object) {
+        try {
+            return JacksonSerializer.getObjectMapper().writeValueAsString(object);
+        } catch (JsonProcessingException e) {
+            throw new SpRuntimeException("Could not serialize object", e);
+        }
+    }
+
+    public static <T>T deserialize(String objectString, Class<T> clazz) {
+        try {
+            return JacksonSerializer
+                    .getObjectMapper()
+                    .readValue(objectString, clazz);
+        } catch (JsonProcessingException e) {
+            throw new SpRuntimeException("Could not deserialize object", e);
+        }
+    }
+}
diff --git a/streampipes-node-controller/src/main/java/org/apache/streampipes/node/controller/utils/SocketUtils.java b/streampipes-node-controller/src/main/java/org/apache/streampipes/node/controller/utils/SocketUtils.java
new file mode 100644
index 0000000..4cd59fc
--- /dev/null
+++ b/streampipes-node-controller/src/main/java/org/apache/streampipes/node/controller/utils/SocketUtils.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.apache.streampipes.node.controller.utils;
+
+import java.net.InetSocketAddress;
+import java.net.Socket;
+
+public class SocketUtils {
+    private static final int SOCKET_TIMEOUT_MS = 1000;
+
+    public static boolean isReady(String host, int port) {
+        try {
+            InetSocketAddress sa = new InetSocketAddress(host, port);
+            Socket ss = new Socket();
+            ss.connect(sa, SOCKET_TIMEOUT_MS);
+            ss.close();
+        } catch(Exception e) {
+            return false;
+        }
+        return true;
+    }
+}