You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@streampipes.apache.org by wi...@apache.org on 2020/12/22 08:36:36 UTC

[incubator-streampipes] branch edge-extensions updated: [WIP] use DockerConstants to store all config for local deployment

This is an automated email from the ASF dual-hosted git repository.

wiener pushed a commit to branch edge-extensions
in repository https://gitbox.apache.org/repos/asf/incubator-streampipes.git


The following commit(s) were added to refs/heads/edge-extensions by this push:
     new aa02d98  [WIP] use DockerConstants to store all config for local deployment
aa02d98 is described below

commit aa02d98d6664a051f1f8c2bf38f9633472239628
Author: Patrick Wiener <wi...@fzi.de>
AuthorDate: Tue Dec 22 09:36:22 2020 +0100

    [WIP] use DockerConstants to store all config for local deployment
---
 .../model/node/DockerContainerBuilder.java         |  10 +-
 .../apache/streampipes/model/node/NodeInfo.java    |   9 ++
 .../streampipes/model/node/NodeInfoBuilder.java    |   9 ++
 ...rContainerInit.java => NodeControllerInit.java} |  32 +++---
 .../container/config/NodeControllerConfig.java     |   2 +-
 ...NodeJanitorManager.java => JanitorManager.java} |  21 ++--
 .../container/management/node/NodeManager.java     |  30 +++++-
 .../management/orchestrator/ContainerStatus.java   |  26 -----
 .../orchestrator/docker/ContainerType.java         |  22 ----
 .../orchestrator/docker/DockerConstants.java       | 114 +++++++++++++++++++++
 ...chestrator.java => DockerContainerManager.java} |  29 +++---
 .../docker/StreamPipesNodeContainer.java           |  95 -----------------
 .../orchestrator/docker/utils/DockerUtils.java     |  34 +++---
 .../container/rest/InvocableEntityResource.java    |   8 +-
 14 files changed, 231 insertions(+), 210 deletions(-)

diff --git a/streampipes-model/src/main/java/org/apache/streampipes/model/node/DockerContainerBuilder.java b/streampipes-model/src/main/java/org/apache/streampipes/model/node/DockerContainerBuilder.java
index 69a58b2..b94d358 100644
--- a/streampipes-model/src/main/java/org/apache/streampipes/model/node/DockerContainerBuilder.java
+++ b/streampipes-model/src/main/java/org/apache/streampipes/model/node/DockerContainerBuilder.java
@@ -19,13 +19,13 @@ import java.util.*;
 
 public class DockerContainerBuilder {
 
-    private DockerContainer dockerContainer;
-    private String imageURI;
-    private String containerName;
+    private final DockerContainer dockerContainer;
+    private final String imageURI;
+    private final String containerName;
     private String serviceId;
     private final String [] containerPorts;
-    private List<String> envVars;
-    private Map<String, String> labels;
+    private final List<String> envVars;
+    private final Map<String, String> labels;
 
     public DockerContainerBuilder(String id) {
         this.dockerContainer = new DockerContainer();
diff --git a/streampipes-model/src/main/java/org/apache/streampipes/model/node/NodeInfo.java b/streampipes-model/src/main/java/org/apache/streampipes/model/node/NodeInfo.java
index e79f254..f9c97b7 100644
--- a/streampipes-model/src/main/java/org/apache/streampipes/model/node/NodeInfo.java
+++ b/streampipes-model/src/main/java/org/apache/streampipes/model/node/NodeInfo.java
@@ -30,6 +30,7 @@ public class NodeInfo {
     private NodeBrokerInfo nodeBrokerInfo;
     private NodeResources nodeResources;
     private List<String> supportedPipelineElementAppIds;
+    private List<DockerContainer> registeredDockerContainer;
 
     public NodeInfo() {
 
@@ -82,4 +83,12 @@ public class NodeInfo {
     public void setNodeBrokerInfo(NodeBrokerInfo nodeBrokerInfo) {
         this.nodeBrokerInfo = nodeBrokerInfo;
     }
+
+    public List<DockerContainer> getRegisteredDockerContainer() {
+        return registeredDockerContainer;
+    }
+
+    public void setRegisteredDockerContainer(List<DockerContainer> registeredDockerContainer) {
+        this.registeredDockerContainer = registeredDockerContainer;
+    }
 }
diff --git a/streampipes-model/src/main/java/org/apache/streampipes/model/node/NodeInfoBuilder.java b/streampipes-model/src/main/java/org/apache/streampipes/model/node/NodeInfoBuilder.java
index eb7a2f6..40483dc 100644
--- a/streampipes-model/src/main/java/org/apache/streampipes/model/node/NodeInfoBuilder.java
+++ b/streampipes-model/src/main/java/org/apache/streampipes/model/node/NodeInfoBuilder.java
@@ -30,6 +30,7 @@ public class NodeInfoBuilder {
     private NodeMetadata nodeMetadata;
     private NodeResources nodeResources;
     private List<String> supportedPipelineElementAppIds;
+    private List<DockerContainer> registeredDockerContainer;
     private NodeBrokerInfo nodeBrokerInfo;
 
     private NodeInfoBuilder(String nodeId) {
@@ -38,6 +39,7 @@ public class NodeInfoBuilder {
         this.nodeMetadata = new NodeMetadata();
         this.nodeResources = new NodeResources();
         this.supportedPipelineElementAppIds = new ArrayList<>();
+        this.registeredDockerContainer = new ArrayList<>();
         this.nodeBrokerInfo = new NodeBrokerInfo();
     }
 
@@ -99,6 +101,12 @@ public class NodeInfoBuilder {
         return this;
     }
 
+
+    public NodeInfoBuilder withRegisteredDockerContainer(List<DockerContainer> registeredDockerContainer) {
+        this.registeredDockerContainer = registeredDockerContainer;
+        return this;
+    }
+
     private JmsTransportProtocol makeJmsTransportProtocol(String brokerHost, Integer brokerPort) {
         return new JmsTransportProtocol(brokerHost, brokerPort);
     }
@@ -109,6 +117,7 @@ public class NodeInfoBuilder {
         nodeInfo.setNodeResources(nodeResources);
         nodeInfo.setNodeBrokerInfo(nodeBrokerInfo);
         nodeInfo.setSupportedPipelineElementAppIds(supportedPipelineElementAppIds);
+        nodeInfo.setRegisteredDockerContainer(registeredDockerContainer);
         return nodeInfo;
     }
 }
diff --git a/streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/NodeControllerContainerInit.java b/streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/NodeControllerInit.java
similarity index 75%
rename from streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/NodeControllerContainerInit.java
rename to streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/NodeControllerInit.java
index a785ca2..6e61480 100644
--- a/streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/NodeControllerContainerInit.java
+++ b/streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/NodeControllerInit.java
@@ -18,11 +18,11 @@
 package org.apache.streampipes.node.controller.container;
 
 import org.apache.streampipes.container.util.ConsulUtil;
-import org.apache.streampipes.node.controller.container.management.orchestrator.docker.DockerContainerOrchestrator;
+import org.apache.streampipes.node.controller.container.management.orchestrator.docker.DockerContainerManager;
 import org.apache.streampipes.node.controller.container.rest.NodeControllerResourceConfig;
 import org.apache.streampipes.node.controller.container.config.NodeControllerConfig;
 import org.apache.streampipes.node.controller.container.management.node.NodeManager;
-import org.apache.streampipes.node.controller.container.management.janitor.NodeJanitorManager;
+import org.apache.streampipes.node.controller.container.management.janitor.JanitorManager;
 import org.apache.streampipes.node.controller.container.management.resource.ResourceManager;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -37,38 +37,38 @@ import java.util.Collections;
 @Configuration
 @EnableAutoConfiguration
 @Import({ NodeControllerResourceConfig.class })
-public class NodeControllerContainerInit {
+public class NodeControllerInit {
 
     private static final Logger LOG =
-            LoggerFactory.getLogger(NodeControllerContainerInit.class.getCanonicalName());
+            LoggerFactory.getLogger(NodeControllerInit.class.getCanonicalName());
 
     public static void main(String [] args) {
 
-        NodeControllerConfig nodeConfig = NodeControllerConfig.INSTANCE;
+        NodeControllerConfig conf = NodeControllerConfig.INSTANCE;
 
-        SpringApplication app = new SpringApplication(NodeControllerContainerInit.class);
-        app.setDefaultProperties(Collections.singletonMap("server.port", nodeConfig.getNodeControllerPort()));
+        SpringApplication app = new SpringApplication(NodeControllerInit.class);
+        app.setDefaultProperties(Collections.singletonMap("server.port", conf.getNodeControllerPort()));
         app.run();
 
-        LOG.info("Load static node metadata");
-        NodeManager.init();
+        LOG.info("Load node info");
+        NodeManager.getInstance().init();
 
-        LOG.info("Start Node Resource manager");
+        LOG.info("Start Node resource manager");
         ResourceManager.getInstance().run();
 
         if (!"true".equals(System.getenv("SP_DEBUG"))) {
             LOG.info("Auto-deploy StreamPipes node container");
-            DockerContainerOrchestrator.getInstance().init();
+            DockerContainerManager.getInstance().init();
 
-            LOG.info("Start Node Janitor manager");
-            NodeJanitorManager.getInstance().run();
+            LOG.info("Start Janitor manager");
+            JanitorManager.getInstance().run();
         }
 
         // registration with consul here
         ConsulUtil.registerNodeService(
-                nodeConfig.getNodeServiceId(),
-                nodeConfig.getNodeHostName(),
-                nodeConfig.getNodeControllerPort()
+                conf.getNodeServiceId(),
+                conf.getNodeHostName(),
+                conf.getNodeControllerPort()
         );
     }
 
diff --git a/streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/config/NodeControllerConfig.java b/streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/config/NodeControllerConfig.java
index 958690e..6c733da 100644
--- a/streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/config/NodeControllerConfig.java
+++ b/streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/config/NodeControllerConfig.java
@@ -34,7 +34,7 @@ public enum NodeControllerConfig {
     private static final String DEFAULT_NODE_CONTROLLER_ID = "node-controller";
     private static final int DEFAULT_NODE_CONTROLLER_PORT = 7077;
     private static final String DEFAULT_NODE_TYPE = "edge";
-    private static final String DEFAULT_NODE_BROKER_HOST = "node-broker";
+    private static final String DEFAULT_NODE_BROKER_HOST = "broker";
     private static final int DEFAULT_NODE_BROKER_PORT = 1883;
     private static final String DEFAULT_NODE_HOST_NAME = "host.docker.internal";
     private static final String DEFAULT_BACKEND_HOST = "host.docker.internal";
diff --git a/streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/management/janitor/NodeJanitorManager.java b/streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/management/janitor/JanitorManager.java
similarity index 74%
rename from streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/management/janitor/NodeJanitorManager.java
rename to streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/management/janitor/JanitorManager.java
index b30b2c9..818e7eb 100644
--- a/streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/management/janitor/NodeJanitorManager.java
+++ b/streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/management/janitor/JanitorManager.java
@@ -26,22 +26,20 @@ import java.util.concurrent.Executors;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
 
-public class NodeJanitorManager {
+public class JanitorManager {
 
     private static final Logger LOG =
-            LoggerFactory.getLogger(NodeJanitorManager.class.getCanonicalName());
+            LoggerFactory.getLogger(JanitorManager.class.getCanonicalName());
 
-    private ScheduledExecutorService scheduledExecutorService = null;
+    private static JanitorManager instance = null;
 
-    private static NodeJanitorManager instance = null;
+    private JanitorManager() {}
 
-    private NodeJanitorManager() {}
-
-    public static NodeJanitorManager getInstance() {
+    public static JanitorManager getInstance() {
         if (instance == null) {
-            synchronized (NodeJanitorManager.class) {
+            synchronized (JanitorManager.class) {
                 if (instance == null)
-                    instance = new NodeJanitorManager();
+                    instance = new JanitorManager();
             }
         }
         return instance;
@@ -51,8 +49,9 @@ public class NodeJanitorManager {
     public void run() {
         LOG.debug("Create Janitor scheduler");
 
-        scheduledExecutorService =  Executors.newScheduledThreadPool(1);
-        scheduledExecutorService.scheduleAtFixedRate(pruneDocker, 30, NodeControllerConfig.INSTANCE.getPruningFreq(), TimeUnit.MINUTES);
+        ScheduledExecutorService scheduledExecutorService = Executors.newScheduledThreadPool(1);
+        scheduledExecutorService.scheduleAtFixedRate(pruneDocker, 30,
+                NodeControllerConfig.INSTANCE.getPruningFreq(), TimeUnit.MINUTES);
     }
 
     private final Runnable pruneDocker = () -> {
diff --git a/streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/management/node/NodeManager.java b/streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/management/node/NodeManager.java
index 910aae4..a5f57e1 100644
--- a/streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/management/node/NodeManager.java
+++ b/streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/management/node/NodeManager.java
@@ -17,6 +17,7 @@
  */
 package org.apache.streampipes.node.controller.container.management.node;
 
+import com.spotify.docker.client.messages.Container;
 import org.apache.streampipes.model.node.*;
 import org.apache.streampipes.model.node.resources.hardware.HardwareResource;
 import org.apache.streampipes.model.node.resources.hardware.CPU;
@@ -39,6 +40,8 @@ import oshi.software.os.OSFileStore;
 import oshi.software.os.OperatingSystem;
 
 import java.util.*;
+import java.util.function.Function;
+import java.util.stream.Collectors;
 
 public class NodeManager {
 
@@ -71,7 +74,7 @@ public class NodeManager {
         return nodeInfo;
     }
 
-    public static void init() {
+    public void init() {
 
         NodeInfo nodeInfo = NodeInfoBuilder.create(getNodeControllerId())
                 .withNodeControllerPort(getNodeControllerPort())
@@ -81,12 +84,36 @@ public class NodeManager {
                 .withNodeModel(getNodeModel())
                 .withNodeResources(getNodeResources())
                 .withNodeBroker(getNodeBrokerHost(), getNodeBrokerPort())
+                .withRegisteredDockerContainer(getRegisteredDockerContainer())
                 .withSupportedPipelineElements(getSupportedPipelineElements())
                 .build();
 
         NodeManager.getInstance().add(nodeInfo);
     }
 
+    private static List<DockerContainer> getRegisteredDockerContainer() {
+        List<DockerContainer> containers = new ArrayList<>();
+        DockerUtils.getInstance().getRunningStreamPipesContainer()
+                .forEach(rc -> {
+                    DockerContainer c = new DockerContainer();
+                    c.setContainerName(rc.names().get(0));
+                    c.setImageURI(rc.image());
+
+                    Optional<String> serviceId = rc.labels().entrySet().stream()
+                            .filter(l -> l.getKey().contains("org.apache.streampipes.service.id"))
+                            .map(Map.Entry::getValue)
+                            .findFirst();
+
+                    serviceId.ifPresent(c::setServiceId);
+
+                    c.setLabels(rc.labels().entrySet().stream()
+                            .filter(l -> l.getKey().contains("org.apache.streampipes"))
+                            .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue)));
+                    containers.add(c);
+                });
+        return containers;
+    }
+
     private static String getNodeType() {
         return NodeControllerConfig.INSTANCE.getNodeType();
     }
@@ -201,7 +228,6 @@ public class NodeManager {
     }
 
     private static Docker getDocker() {
-
         Docker docker = new Docker();
         docker.setHasDocker(true);
         docker.setHasNvidiaRuntime(DockerInfo.isHasNvidiaRuntime());
diff --git a/streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/management/orchestrator/ContainerStatus.java b/streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/management/orchestrator/ContainerStatus.java
deleted file mode 100644
index 00a2c75..0000000
--- a/streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/management/orchestrator/ContainerStatus.java
+++ /dev/null
@@ -1,26 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-package org.apache.streampipes.node.controller.container.management.orchestrator;
-
-public enum ContainerStatus {
-    DEPLOYED,
-    RUNNING,
-    STOPPED,
-    REMOVED,
-    UNKNOWN
-}
diff --git a/streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/management/orchestrator/docker/ContainerType.java b/streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/management/orchestrator/docker/ContainerType.java
deleted file mode 100644
index e5e3924..0000000
--- a/streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/management/orchestrator/docker/ContainerType.java
+++ /dev/null
@@ -1,22 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-package org.apache.streampipes.node.controller.container.management.orchestrator.docker;
-
-public enum ContainerType {
-    EXTENSIONS, BROKER
-}
diff --git a/streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/management/orchestrator/docker/DockerConstants.java b/streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/management/orchestrator/docker/DockerConstants.java
new file mode 100644
index 0000000..d5b0a98
--- /dev/null
+++ b/streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/management/orchestrator/docker/DockerConstants.java
@@ -0,0 +1,114 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.apache.streampipes.node.controller.container.management.orchestrator.docker;
+
+import org.apache.streampipes.model.node.DockerContainer;
+import org.apache.streampipes.model.node.DockerContainerBuilder;
+import org.apache.streampipes.node.controller.container.config.ConfigKeys;
+import org.apache.streampipes.node.controller.container.config.NodeControllerConfig;
+
+import java.util.*;
+
+public class DockerConstants {
+
+    public static final String SP_DOCKER_CONTAINER_REPOSITORY = "apachestreampipes";
+    public static final String SP_VERSION = NodeControllerConfig.INSTANCE.getSpVersion();
+    public static final String SP_DOCKER_CONTAINER_NAME_PREFIX = "streampipes_";
+    public static final String SP_DOCKER_NETWORK_NAME = "spnet";
+
+    // extensions container
+    public static final String SP_EXTENSIONS_ID = "pe/org.apache.streampipes.extensions.all.jvm";
+    public static final String SP_DOCKER_IMAGE_EXTENSIONS_NAME = "extensions-all-jvm";
+    public static final String SP_DOCKER_CONTAINER_EXTENSIONS_IMG_TAG = SP_DOCKER_CONTAINER_REPOSITORY +
+            "/" + SP_DOCKER_IMAGE_EXTENSIONS_NAME + ":" + SP_VERSION;
+    public static final String SP_DOCKER_CONTAINER_EXTENSIONS_NAME = SP_DOCKER_CONTAINER_NAME_PREFIX + "extensions";
+    public static final String[] SP_DOCKER_CONTAINER_EXTENSIONS_PORT = new String[]{"7025"};
+
+    //broker container
+    public static final String SP_BROKER_ID = "pe/org.apache.streampipes.node.broker";
+    public static final String SP_DOCKER_IMAGE_BROKER_NAME = "eclipse-mosquitto";
+    public static final String SP_DOCKER_IMAGE_BROKER_VERSION = "1.6.12";
+    public static final String SP_DOCKER_CONTAINER_BROKER_IMG_TAG =
+            SP_DOCKER_IMAGE_BROKER_NAME + ":" + SP_DOCKER_IMAGE_BROKER_VERSION;
+    public static final String SP_DOCKER_CONTAINER_BROKER_NAME = SP_DOCKER_CONTAINER_NAME_PREFIX + "broker";
+    public static final String[] SP_DOCKER_CONTAINER_BROKER_PORT =
+            System.getenv(ConfigKeys.NODE_BROKER_CONTAINER_PORT) != null ?
+                    new String[]{System.getenv(ConfigKeys.NODE_BROKER_CONTAINER_PORT)} : new String[]{"1883"};
+
+    public static final List<String> SP_DOCKER_CONTAINER_ENV_VARIABLES = Arrays.asList(
+            ConfigKeys.NODE_CONTROLLER_ID + "=" + NodeControllerConfig.INSTANCE.getNodeControllerId(),
+            ConfigKeys.NODE_CONTROLLER_CONTAINER_HOST +  "=" + NodeControllerConfig.INSTANCE.getNodeHostName(),
+            ConfigKeys.NODE_CONTROLLER_CONTAINER_PORT + "=" + NodeControllerConfig.INSTANCE.getNodeControllerPort(),
+            ConfigKeys.NODE_BROKER_CONTAINER_HOST + "=" + NodeControllerConfig.INSTANCE.getNodeBrokerHost(),
+            ConfigKeys.NODE_BROKER_CONTAINER_PORT + "=" + NodeControllerConfig.INSTANCE.getNodeBrokerPort()
+    );
+
+    public static final Map<String, String> SP_DOCKER_CONTAINER_EXTENSIONS_LABELS = new HashMap<String, String>() {{
+        put("org.apache.streampipes.service.id", SP_EXTENSIONS_ID);
+        put("org.apache.streampipes.node.type", NodeControllerConfig.INSTANCE.getNodeType());
+        put("org.apache.streampipes.container.type", "extensions");
+    }};
+
+    public static final Map<String, String> SP_DOCKER_CONTAINER_BROKER_LABELS = new HashMap<String, String>() {{
+        put("org.apache.streampipes.service.id", SP_BROKER_ID);
+        put("org.apache.streampipes.node.type", NodeControllerConfig.INSTANCE.getNodeType());
+        put("org.apache.streampipes.container.type", "broker");
+    }};
+
+
+    public enum ContainerStatus {
+        DEPLOYED, RUNNING, STOPPED, REMOVED, UNKNOWN
+    }
+
+    public enum ContainerType {
+        EXTENSIONS, BROKER
+    }
+
+    public enum NodeContainer {
+        INSTANCE;
+
+        public List<DockerContainer> getAllStreamPipesContainer() {
+            List<DockerContainer> nodeContainers = new ArrayList<>();
+
+            // Extensions container
+            DockerContainer extensions = DockerContainerBuilder.create(SP_EXTENSIONS_ID)
+                    .withImage(DockerConstants.SP_DOCKER_CONTAINER_EXTENSIONS_IMG_TAG)
+                    .withName(DockerConstants.SP_DOCKER_CONTAINER_EXTENSIONS_NAME)
+                    .withExposedPorts(DockerConstants.SP_DOCKER_CONTAINER_EXTENSIONS_PORT)
+                    .withEnvironmentVariables(SP_DOCKER_CONTAINER_ENV_VARIABLES)
+                    .withLabels(DockerConstants.SP_DOCKER_CONTAINER_EXTENSIONS_LABELS)
+                    .build();
+
+            // Node broker container
+            DockerContainer nodeBroker = DockerContainerBuilder.create(SP_BROKER_ID)
+                    .withImage(DockerConstants.SP_DOCKER_CONTAINER_BROKER_IMG_TAG)
+                    .withName(DockerConstants.SP_DOCKER_CONTAINER_BROKER_NAME)
+                    .withExposedPorts(DockerConstants.SP_DOCKER_CONTAINER_BROKER_PORT)
+                    .withEnvironmentVariables(SP_DOCKER_CONTAINER_ENV_VARIABLES)
+                    .withLabels(SP_DOCKER_CONTAINER_BROKER_LABELS)
+                    .build();
+
+            nodeContainers.add(extensions);
+            nodeContainers.add(nodeBroker);
+
+            return nodeContainers;
+        }
+
+    }
+
+}
diff --git a/streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/management/orchestrator/docker/DockerContainerOrchestrator.java b/streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/management/orchestrator/docker/DockerContainerManager.java
similarity index 84%
rename from streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/management/orchestrator/docker/DockerContainerOrchestrator.java
rename to streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/management/orchestrator/docker/DockerContainerManager.java
index fff9158..f07446a 100644
--- a/streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/management/orchestrator/docker/DockerContainerOrchestrator.java
+++ b/streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/management/orchestrator/docker/DockerContainerManager.java
@@ -26,7 +26,6 @@ import org.apache.commons.lang.StringUtils;
 import org.apache.streampipes.container.util.ConsulUtil;
 import org.apache.streampipes.model.node.DockerContainer;
 import org.apache.streampipes.node.controller.container.management.orchestrator.ContainerOrchestrator;
-import org.apache.streampipes.node.controller.container.management.orchestrator.ContainerStatus;
 import org.apache.streampipes.node.controller.container.management.orchestrator.docker.utils.DockerUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -34,21 +33,21 @@ import org.slf4j.LoggerFactory;
 import java.io.Serializable;
 import java.util.*;
 
-public class DockerContainerOrchestrator implements ContainerOrchestrator {
+public class DockerContainerManager implements ContainerOrchestrator {
 
     private static final Logger LOG =
-            LoggerFactory.getLogger(DockerContainerOrchestrator.class.getCanonicalName());
+            LoggerFactory.getLogger(DockerContainerManager.class.getCanonicalName());
 
     private final DockerUtils docker = DockerUtils.getInstance();
-    private static DockerContainerOrchestrator instance = null;
+    private static DockerContainerManager instance = null;
 
-    private DockerContainerOrchestrator() {}
+    private DockerContainerManager() {}
 
-    public static DockerContainerOrchestrator getInstance() {
+    public static DockerContainerManager getInstance() {
         if (instance == null) {
-            synchronized (DockerContainerOrchestrator.class) {
+            synchronized (DockerContainerManager.class) {
                 if (instance == null)
-                    instance = new DockerContainerOrchestrator();
+                    instance = new DockerContainerManager();
             }
         }
         return instance;
@@ -56,7 +55,7 @@ public class DockerContainerOrchestrator implements ContainerOrchestrator {
 
     @Override
     public void init() {
-        StreamPipesNodeContainer.INSTANCE.get().forEach(this::deploy);
+        DockerConstants.NodeContainer.INSTANCE.getAllStreamPipesContainer().forEach(this::deploy);
     }
 
     @Override
@@ -76,14 +75,14 @@ public class DockerContainerOrchestrator implements ContainerOrchestrator {
             ImmutableMap<String, ? extends Serializable> m = ImmutableMap.of(
                     "pipelineElementContainer", p.getContainerName(),
                     "containerId", containerId,
-                    "status", ContainerStatus.DEPLOYED
+                    "status", DockerConstants.ContainerStatus.DEPLOYED
             );
             return new Gson().toJson(m);
         }
         LOG.info("Container already running {}", p.getContainerName());
         ImmutableMap<String, ? extends Serializable> m = ImmutableMap.of(
                 "message", "Pipeline element container already running",
-                "status", ContainerStatus.RUNNING
+                "status", DockerConstants.ContainerStatus.RUNNING
         );
         return new Gson().toJson(m);
     }
@@ -92,7 +91,7 @@ public class DockerContainerOrchestrator implements ContainerOrchestrator {
     public String remove(DockerContainer p) {
         LOG.info("Remove pipeline element container: {}", p.getImageURI());
 
-        Optional<Container> containerOptional = docker.getContainer(p.getContainerName());
+        Optional<com.spotify.docker.client.messages.Container> containerOptional = docker.getContainer(p.getContainerName());
         if(containerOptional.isPresent()) {
 
             docker.forceRemove(p.getContainerName());
@@ -104,14 +103,14 @@ public class DockerContainerOrchestrator implements ContainerOrchestrator {
             ImmutableMap<String, ? extends Serializable> m = ImmutableMap.of(
                     "message",
                     "Pipeline element container removed",
-                    "status", ContainerStatus.REMOVED
+                    "status", DockerConstants.ContainerStatus.REMOVED
             );
             return new Gson().toJson(m);
         }
         ImmutableMap<String, ? extends Serializable> m = ImmutableMap.of(
                 "message",
                 "Pipeline element container does not exist",
-                "status", ContainerStatus.UNKNOWN
+                "status", DockerConstants.ContainerStatus.UNKNOWN
         );
         return new Gson().toJson(m);
     }
@@ -120,7 +119,7 @@ public class DockerContainerOrchestrator implements ContainerOrchestrator {
     public String list() {
         LOG.info("List running pipeline element container");
 
-        List<Container> containerList = docker.getRunningPipelineElementContainer();
+        List<Container> containerList = docker.getRunningStreamPipesContainer();
         HashMap<String, Object> m = new HashMap<>();
         if (containerList.size() > 0) {
             for (Container c: containerList) {
diff --git a/streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/management/orchestrator/docker/StreamPipesNodeContainer.java b/streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/management/orchestrator/docker/StreamPipesNodeContainer.java
deleted file mode 100644
index 1a1ae30..0000000
--- a/streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/management/orchestrator/docker/StreamPipesNodeContainer.java
+++ /dev/null
@@ -1,95 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-package org.apache.streampipes.node.controller.container.management.orchestrator.docker;
-
-import org.apache.streampipes.commons.exceptions.SpRuntimeException;
-import org.apache.streampipes.model.node.DockerContainer;
-import org.apache.streampipes.model.node.DockerContainerBuilder;
-import org.apache.streampipes.node.controller.container.config.ConfigKeys;
-import org.apache.streampipes.node.controller.container.config.NodeControllerConfig;
-
-import java.util.*;
-
-public enum StreamPipesNodeContainer {
-    INSTANCE;
-
-    private static final String LABEL_CONTAINER_TYPE_KEY = "org.apache.streampipes.container.type";
-    private static final String LABEL_CONTAINER_TYPE_EXTENSIONS_VALUE = "pipeline-element";
-    private static final String LABEL_CONTAINER_TYPE_BROKER_VALUE = "node-broker";
-    private static final String LABEL_NODE_TYPE_KEY = "org.apache.streampipes.node.type";
-    private static final String LABEL_NODE_TYPE_VALUE = NodeControllerConfig.INSTANCE.getNodeType();
-
-    private static final String SP_DOCKER_REPOSITORY = "apachestreampipes";
-    // TODO: get version from backend when registering node controller
-    private static final String SP_VERSION = NodeControllerConfig.INSTANCE.getSpVersion();
-    private static final String SP_CONTAINER_NAME_PREFIX = "streampipes_";
-
-    public List<DockerContainer> get() {
-        //NodeInfoStorage.getInstance().retrieveNodeInfo().getNodeResources().getHardwareResource().getCpu().getArch();
-        List<DockerContainer> nodeContainers = new ArrayList<>();
-
-        // Extensions container
-        DockerContainer extensions = DockerContainerBuilder.create("pe/org.apache.streampipes.extensions.all.jvm")
-                        .withImage(SP_DOCKER_REPOSITORY + "/" + "extensions-all-jvm" + ":" + SP_VERSION)
-                        .withName(SP_CONTAINER_NAME_PREFIX + "extensions-all-jvm")
-                        .withExposedPorts(new String[]{"7023"})
-                        .withEnvironmentVariables(makeContainerEnvVars())
-                        .withLabels(makeLabels(ContainerType.EXTENSIONS))
-                        .build();
-
-        // Node broker container
-        DockerContainer nodeBroker = DockerContainerBuilder.create("pe/org.apache.streampipes.node.broker")
-                        .withImage("eclipse-mosquitto:1.6.12")
-                        .withName(SP_CONTAINER_NAME_PREFIX + "node-broker")
-                        .withExposedPorts(new String[]{"1883"})
-                        .withEnvironmentVariables(makeContainerEnvVars())
-                        .withLabels(makeLabels(ContainerType.BROKER))
-                        .build();
-
-        nodeContainers.add(extensions);
-        nodeContainers.add(nodeBroker);
-
-        return nodeContainers;
-    }
-
-    private Map<String, String> makeLabels(ContainerType containerType) {
-        switch (containerType) {
-            case EXTENSIONS:
-                return new HashMap<String,String>(){{
-                    put(LABEL_CONTAINER_TYPE_KEY, LABEL_CONTAINER_TYPE_EXTENSIONS_VALUE);
-                    put(LABEL_NODE_TYPE_KEY, LABEL_NODE_TYPE_VALUE);}};
-            case BROKER:
-                return new HashMap<String,String>(){{
-                    put(LABEL_CONTAINER_TYPE_KEY, LABEL_CONTAINER_TYPE_BROKER_VALUE);
-                    put(LABEL_NODE_TYPE_KEY, LABEL_NODE_TYPE_VALUE);}};
-            default:
-                break;
-        }
-        throw new SpRuntimeException("Unsupported container type: " + containerType);
-    }
-
-    private List<String> makeContainerEnvVars() {
-        return Arrays.asList(
-                ConfigKeys.NODE_CONTROLLER_ID + "=" + NodeControllerConfig.INSTANCE.getNodeControllerId(),
-                ConfigKeys.NODE_CONTROLLER_CONTAINER_HOST +  "=" + NodeControllerConfig.INSTANCE.getNodeHostName(),
-                ConfigKeys.NODE_CONTROLLER_CONTAINER_PORT + "=" + NodeControllerConfig.INSTANCE.getNodeControllerPort(),
-                ConfigKeys.NODE_BROKER_CONTAINER_HOST + "=" + NodeControllerConfig.INSTANCE.getNodeBrokerHost(),
-                ConfigKeys.NODE_BROKER_CONTAINER_PORT + "=" + NodeControllerConfig.INSTANCE.getNodeControllerPort()
-        );
-    }
-}
diff --git a/streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/management/orchestrator/docker/utils/DockerUtils.java b/streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/management/orchestrator/docker/utils/DockerUtils.java
index 29c16ac..b91d9c5 100644
--- a/streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/management/orchestrator/docker/utils/DockerUtils.java
+++ b/streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/management/orchestrator/docker/utils/DockerUtils.java
@@ -26,6 +26,7 @@ import com.spotify.docker.client.exceptions.DockerException;
 import com.spotify.docker.client.messages.*;
 import com.spotify.docker.client.shaded.com.google.common.collect.ImmutableList;
 import org.apache.streampipes.model.node.DockerContainer;
+import org.apache.streampipes.node.controller.container.management.orchestrator.docker.DockerConstants;
 import org.apache.streampipes.node.controller.container.management.orchestrator.docker.DockerInfo;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -39,10 +40,8 @@ import java.util.stream.Collectors;
 public class DockerUtils {
     private static final Logger LOG = LoggerFactory.getLogger(DockerUtils.class.getCanonicalName());
 
-    private static final String SP_NETWORK = "spnet";
-    private static final String SP_CONTAINER_PREFIX = "streampipes_";
-    private static final String DOCKER_UNIX_SOCK = "/var/run/docker.sock";
     private static final String BLANK_SPACE = " ";
+    private static final String DOCKER_UNIX_SOCK = "/var/run/docker.sock";
     private static DockerUtils instance;
     private static DockerClient docker;
 
@@ -129,8 +128,8 @@ public class DockerUtils {
     }
 
     private String verifyContainerName(String containerName) {
-        return containerName.startsWith(SP_CONTAINER_PREFIX) ?
-                containerName : SP_CONTAINER_PREFIX + containerName;
+        return containerName.startsWith(DockerConstants.SP_DOCKER_CONTAINER_NAME_PREFIX) ?
+                containerName : DockerConstants.SP_DOCKER_CONTAINER_NAME_PREFIX + containerName;
     }
 
     private ContainerConfig getContainerConfig(DockerContainer p) {
@@ -140,8 +139,8 @@ public class DockerUtils {
                 .image(p.getImageURI())
                 .labels(p.getLabels())
                 .env(p.getEnvVars())
-                .hostConfig(getHostConfig(SP_NETWORK, p.getContainerPorts()))
-                .networkingConfig(getNetworkingConfig(SP_NETWORK, p.getContainerName()))
+                .hostConfig(getHostConfig(DockerConstants.SP_DOCKER_NETWORK_NAME, p.getContainerPorts()))
+                .networkingConfig(getNetworkingConfig(DockerConstants.SP_DOCKER_NETWORK_NAME, p.getContainerName()))
                 .build();
     }
 
@@ -207,7 +206,7 @@ public class DockerUtils {
     }
 
     public static void prune() {
-        //List<String> o = Arrays.asList("images", "containers", "volumes", "networks");
+        //List<String> pruneItems = Arrays.asList("images", "containers", "volumes", "networks");
         List<String> pruneItems = Collections.singletonList("images");
 
         for (String i: pruneItems) {
@@ -219,7 +218,7 @@ public class DockerUtils {
                                 + BLANK_SPACE
                                 + DOCKER_UNIX_SOCK
                                 + BLANK_SPACE
-                                + "http:/v" + getApiVersion() + "/" + i + "/prune");
+                                + "http:/v" + apiVersion() + "/" + i + "/prune");
 
                 BufferedReader stdInput = new BufferedReader(new
                         InputStreamReader(p.getInputStream()));
@@ -257,7 +256,7 @@ public class DockerUtils {
         return dockerInfo;
     }
 
-    private static String getApiVersion() {
+    private static String apiVersion() {
         String version = "";
         try {
             version = docker.version().apiVersion();
@@ -279,10 +278,19 @@ public class DockerUtils {
                 .isPresent();
     }
 
-    public List<Container> getRunningPipelineElementContainer() {
+    public List<Container> getRunningStreamPipesContainer() {
+
+        Map<String,String> allContainerLabels = new HashMap<>();
+        allContainerLabels.putAll(DockerConstants.SP_DOCKER_CONTAINER_EXTENSIONS_LABELS);
+        allContainerLabels.putAll(DockerConstants.SP_DOCKER_CONTAINER_BROKER_LABELS);
+
         return getContainerList()
                 .stream()
-                .filter(c -> c.labels().containsValue("pipeline-element"))
+                .filter(c -> c.labels().values().stream()
+                        .anyMatch(v -> allContainerLabels.values().stream()
+                                .anyMatch(cl -> cl.equals(v))
+                        )
+                )
                 .collect(Collectors.toList());
     }
 
@@ -294,7 +302,7 @@ public class DockerUtils {
                             + BLANK_SPACE
                             + DOCKER_UNIX_SOCK
                             + BLANK_SPACE
-                            + "http:/v" + getApiVersion() + "/info");
+                            + "http:/v" + apiVersion() + "/info");
 
             BufferedReader stdInput = new BufferedReader(new
                     InputStreamReader(p.getInputStream()));
diff --git a/streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/rest/InvocableEntityResource.java b/streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/rest/InvocableEntityResource.java
index 5cc29cd..fb09cc0 100644
--- a/streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/rest/InvocableEntityResource.java
+++ b/streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/rest/InvocableEntityResource.java
@@ -25,7 +25,7 @@ import org.apache.streampipes.model.graph.DataProcessorInvocation;
 import org.apache.streampipes.model.graph.DataSinkInvocation;
 import org.apache.streampipes.model.grounding.TransportProtocol;
 import org.apache.streampipes.model.node.DockerContainer;
-import org.apache.streampipes.node.controller.container.management.orchestrator.docker.DockerContainerOrchestrator;
+import org.apache.streampipes.node.controller.container.management.orchestrator.docker.DockerContainerManager;
 import org.apache.streampipes.node.controller.container.management.pe.InvocableElementManager;
 import org.apache.streampipes.node.controller.container.management.pe.RunningInvocableInstances;
 import org.apache.streampipes.node.controller.container.management.relay.EventRelay;
@@ -48,14 +48,14 @@ public class InvocableEntityResource extends AbstractResource {
     @GET
     @Produces(MediaType.APPLICATION_JSON)
     public Response getPipelineElementContainer(){
-        return ok(DockerContainerOrchestrator.getInstance().list());
+        return ok(DockerContainerManager.getInstance().list());
     }
 
     @POST
     @Path("/deploy")
     @Consumes(MediaType.APPLICATION_JSON)
     public Response deployPipelineElementContainer(DockerContainer container) {
-        return ok(DockerContainerOrchestrator.getInstance().deploy(container));
+        return ok(DockerContainerManager.getInstance().deploy(container));
     }
 
     @POST
@@ -155,6 +155,6 @@ public class InvocableEntityResource extends AbstractResource {
     @Consumes(MediaType.APPLICATION_JSON)
     public Response removePipelineElementContainer(DockerContainer container) {
         InvocableElementManager.getInstance().unregister();
-        return ok(DockerContainerOrchestrator.getInstance().remove(container));
+        return ok(DockerContainerManager.getInstance().remove(container));
     }
 }