You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@streampipes.apache.org by wi...@apache.org on 2020/12/22 08:36:36 UTC
[incubator-streampipes] branch edge-extensions updated: [WIP] use
DockerConstants to store all config for local deployment
This is an automated email from the ASF dual-hosted git repository.
wiener pushed a commit to branch edge-extensions
in repository https://gitbox.apache.org/repos/asf/incubator-streampipes.git
The following commit(s) were added to refs/heads/edge-extensions by this push:
new aa02d98 [WIP] use DockerConstants to store all config for local deployment
aa02d98 is described below
commit aa02d98d6664a051f1f8c2bf38f9633472239628
Author: Patrick Wiener <wi...@fzi.de>
AuthorDate: Tue Dec 22 09:36:22 2020 +0100
[WIP] use DockerConstants to store all config for local deployment
---
.../model/node/DockerContainerBuilder.java | 10 +-
.../apache/streampipes/model/node/NodeInfo.java | 9 ++
.../streampipes/model/node/NodeInfoBuilder.java | 9 ++
...rContainerInit.java => NodeControllerInit.java} | 32 +++---
.../container/config/NodeControllerConfig.java | 2 +-
...NodeJanitorManager.java => JanitorManager.java} | 21 ++--
.../container/management/node/NodeManager.java | 30 +++++-
.../management/orchestrator/ContainerStatus.java | 26 -----
.../orchestrator/docker/ContainerType.java | 22 ----
.../orchestrator/docker/DockerConstants.java | 114 +++++++++++++++++++++
...chestrator.java => DockerContainerManager.java} | 29 +++---
.../docker/StreamPipesNodeContainer.java | 95 -----------------
.../orchestrator/docker/utils/DockerUtils.java | 34 +++---
.../container/rest/InvocableEntityResource.java | 8 +-
14 files changed, 231 insertions(+), 210 deletions(-)
diff --git a/streampipes-model/src/main/java/org/apache/streampipes/model/node/DockerContainerBuilder.java b/streampipes-model/src/main/java/org/apache/streampipes/model/node/DockerContainerBuilder.java
index 69a58b2..b94d358 100644
--- a/streampipes-model/src/main/java/org/apache/streampipes/model/node/DockerContainerBuilder.java
+++ b/streampipes-model/src/main/java/org/apache/streampipes/model/node/DockerContainerBuilder.java
@@ -19,13 +19,13 @@ import java.util.*;
public class DockerContainerBuilder {
- private DockerContainer dockerContainer;
- private String imageURI;
- private String containerName;
+ private final DockerContainer dockerContainer;
+ private final String imageURI;
+ private final String containerName;
private String serviceId;
private final String [] containerPorts;
- private List<String> envVars;
- private Map<String, String> labels;
+ private final List<String> envVars;
+ private final Map<String, String> labels;
public DockerContainerBuilder(String id) {
this.dockerContainer = new DockerContainer();
diff --git a/streampipes-model/src/main/java/org/apache/streampipes/model/node/NodeInfo.java b/streampipes-model/src/main/java/org/apache/streampipes/model/node/NodeInfo.java
index e79f254..f9c97b7 100644
--- a/streampipes-model/src/main/java/org/apache/streampipes/model/node/NodeInfo.java
+++ b/streampipes-model/src/main/java/org/apache/streampipes/model/node/NodeInfo.java
@@ -30,6 +30,7 @@ public class NodeInfo {
private NodeBrokerInfo nodeBrokerInfo;
private NodeResources nodeResources;
private List<String> supportedPipelineElementAppIds;
+ private List<DockerContainer> registeredDockerContainer;
public NodeInfo() {
@@ -82,4 +83,12 @@ public class NodeInfo {
public void setNodeBrokerInfo(NodeBrokerInfo nodeBrokerInfo) {
this.nodeBrokerInfo = nodeBrokerInfo;
}
+
+ public List<DockerContainer> getRegisteredDockerContainer() {
+ return registeredDockerContainer;
+ }
+
+ public void setRegisteredDockerContainer(List<DockerContainer> registeredDockerContainer) {
+ this.registeredDockerContainer = registeredDockerContainer;
+ }
}
diff --git a/streampipes-model/src/main/java/org/apache/streampipes/model/node/NodeInfoBuilder.java b/streampipes-model/src/main/java/org/apache/streampipes/model/node/NodeInfoBuilder.java
index eb7a2f6..40483dc 100644
--- a/streampipes-model/src/main/java/org/apache/streampipes/model/node/NodeInfoBuilder.java
+++ b/streampipes-model/src/main/java/org/apache/streampipes/model/node/NodeInfoBuilder.java
@@ -30,6 +30,7 @@ public class NodeInfoBuilder {
private NodeMetadata nodeMetadata;
private NodeResources nodeResources;
private List<String> supportedPipelineElementAppIds;
+ private List<DockerContainer> registeredDockerContainer;
private NodeBrokerInfo nodeBrokerInfo;
private NodeInfoBuilder(String nodeId) {
@@ -38,6 +39,7 @@ public class NodeInfoBuilder {
this.nodeMetadata = new NodeMetadata();
this.nodeResources = new NodeResources();
this.supportedPipelineElementAppIds = new ArrayList<>();
+ this.registeredDockerContainer = new ArrayList<>();
this.nodeBrokerInfo = new NodeBrokerInfo();
}
@@ -99,6 +101,12 @@ public class NodeInfoBuilder {
return this;
}
+
+ public NodeInfoBuilder withRegisteredDockerContainer(List<DockerContainer> registeredDockerContainer) {
+ this.registeredDockerContainer = registeredDockerContainer;
+ return this;
+ }
+
private JmsTransportProtocol makeJmsTransportProtocol(String brokerHost, Integer brokerPort) {
return new JmsTransportProtocol(brokerHost, brokerPort);
}
@@ -109,6 +117,7 @@ public class NodeInfoBuilder {
nodeInfo.setNodeResources(nodeResources);
nodeInfo.setNodeBrokerInfo(nodeBrokerInfo);
nodeInfo.setSupportedPipelineElementAppIds(supportedPipelineElementAppIds);
+ nodeInfo.setRegisteredDockerContainer(registeredDockerContainer);
return nodeInfo;
}
}
diff --git a/streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/NodeControllerContainerInit.java b/streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/NodeControllerInit.java
similarity index 75%
rename from streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/NodeControllerContainerInit.java
rename to streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/NodeControllerInit.java
index a785ca2..6e61480 100644
--- a/streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/NodeControllerContainerInit.java
+++ b/streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/NodeControllerInit.java
@@ -18,11 +18,11 @@
package org.apache.streampipes.node.controller.container;
import org.apache.streampipes.container.util.ConsulUtil;
-import org.apache.streampipes.node.controller.container.management.orchestrator.docker.DockerContainerOrchestrator;
+import org.apache.streampipes.node.controller.container.management.orchestrator.docker.DockerContainerManager;
import org.apache.streampipes.node.controller.container.rest.NodeControllerResourceConfig;
import org.apache.streampipes.node.controller.container.config.NodeControllerConfig;
import org.apache.streampipes.node.controller.container.management.node.NodeManager;
-import org.apache.streampipes.node.controller.container.management.janitor.NodeJanitorManager;
+import org.apache.streampipes.node.controller.container.management.janitor.JanitorManager;
import org.apache.streampipes.node.controller.container.management.resource.ResourceManager;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -37,38 +37,38 @@ import java.util.Collections;
@Configuration
@EnableAutoConfiguration
@Import({ NodeControllerResourceConfig.class })
-public class NodeControllerContainerInit {
+public class NodeControllerInit {
private static final Logger LOG =
- LoggerFactory.getLogger(NodeControllerContainerInit.class.getCanonicalName());
+ LoggerFactory.getLogger(NodeControllerInit.class.getCanonicalName());
public static void main(String [] args) {
- NodeControllerConfig nodeConfig = NodeControllerConfig.INSTANCE;
+ NodeControllerConfig conf = NodeControllerConfig.INSTANCE;
- SpringApplication app = new SpringApplication(NodeControllerContainerInit.class);
- app.setDefaultProperties(Collections.singletonMap("server.port", nodeConfig.getNodeControllerPort()));
+ SpringApplication app = new SpringApplication(NodeControllerInit.class);
+ app.setDefaultProperties(Collections.singletonMap("server.port", conf.getNodeControllerPort()));
app.run();
- LOG.info("Load static node metadata");
- NodeManager.init();
+ LOG.info("Load node info");
+ NodeManager.getInstance().init();
- LOG.info("Start Node Resource manager");
+ LOG.info("Start Node resource manager");
ResourceManager.getInstance().run();
if (!"true".equals(System.getenv("SP_DEBUG"))) {
LOG.info("Auto-deploy StreamPipes node container");
- DockerContainerOrchestrator.getInstance().init();
+ DockerContainerManager.getInstance().init();
- LOG.info("Start Node Janitor manager");
- NodeJanitorManager.getInstance().run();
+ LOG.info("Start Janitor manager");
+ JanitorManager.getInstance().run();
}
// registration with consul here
ConsulUtil.registerNodeService(
- nodeConfig.getNodeServiceId(),
- nodeConfig.getNodeHostName(),
- nodeConfig.getNodeControllerPort()
+ conf.getNodeServiceId(),
+ conf.getNodeHostName(),
+ conf.getNodeControllerPort()
);
}
diff --git a/streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/config/NodeControllerConfig.java b/streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/config/NodeControllerConfig.java
index 958690e..6c733da 100644
--- a/streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/config/NodeControllerConfig.java
+++ b/streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/config/NodeControllerConfig.java
@@ -34,7 +34,7 @@ public enum NodeControllerConfig {
private static final String DEFAULT_NODE_CONTROLLER_ID = "node-controller";
private static final int DEFAULT_NODE_CONTROLLER_PORT = 7077;
private static final String DEFAULT_NODE_TYPE = "edge";
- private static final String DEFAULT_NODE_BROKER_HOST = "node-broker";
+ private static final String DEFAULT_NODE_BROKER_HOST = "broker";
private static final int DEFAULT_NODE_BROKER_PORT = 1883;
private static final String DEFAULT_NODE_HOST_NAME = "host.docker.internal";
private static final String DEFAULT_BACKEND_HOST = "host.docker.internal";
diff --git a/streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/management/janitor/NodeJanitorManager.java b/streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/management/janitor/JanitorManager.java
similarity index 74%
rename from streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/management/janitor/NodeJanitorManager.java
rename to streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/management/janitor/JanitorManager.java
index b30b2c9..818e7eb 100644
--- a/streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/management/janitor/NodeJanitorManager.java
+++ b/streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/management/janitor/JanitorManager.java
@@ -26,22 +26,20 @@ import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
-public class NodeJanitorManager {
+public class JanitorManager {
private static final Logger LOG =
- LoggerFactory.getLogger(NodeJanitorManager.class.getCanonicalName());
+ LoggerFactory.getLogger(JanitorManager.class.getCanonicalName());
- private ScheduledExecutorService scheduledExecutorService = null;
+ private static JanitorManager instance = null;
- private static NodeJanitorManager instance = null;
+ private JanitorManager() {}
- private NodeJanitorManager() {}
-
- public static NodeJanitorManager getInstance() {
+ public static JanitorManager getInstance() {
if (instance == null) {
- synchronized (NodeJanitorManager.class) {
+ synchronized (JanitorManager.class) {
if (instance == null)
- instance = new NodeJanitorManager();
+ instance = new JanitorManager();
}
}
return instance;
@@ -51,8 +49,9 @@ public class NodeJanitorManager {
public void run() {
LOG.debug("Create Janitor scheduler");
- scheduledExecutorService = Executors.newScheduledThreadPool(1);
- scheduledExecutorService.scheduleAtFixedRate(pruneDocker, 30, NodeControllerConfig.INSTANCE.getPruningFreq(), TimeUnit.MINUTES);
+ ScheduledExecutorService scheduledExecutorService = Executors.newScheduledThreadPool(1);
+ scheduledExecutorService.scheduleAtFixedRate(pruneDocker, 30,
+ NodeControllerConfig.INSTANCE.getPruningFreq(), TimeUnit.MINUTES);
}
private final Runnable pruneDocker = () -> {
diff --git a/streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/management/node/NodeManager.java b/streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/management/node/NodeManager.java
index 910aae4..a5f57e1 100644
--- a/streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/management/node/NodeManager.java
+++ b/streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/management/node/NodeManager.java
@@ -17,6 +17,7 @@
*/
package org.apache.streampipes.node.controller.container.management.node;
+import com.spotify.docker.client.messages.Container;
import org.apache.streampipes.model.node.*;
import org.apache.streampipes.model.node.resources.hardware.HardwareResource;
import org.apache.streampipes.model.node.resources.hardware.CPU;
@@ -39,6 +40,8 @@ import oshi.software.os.OSFileStore;
import oshi.software.os.OperatingSystem;
import java.util.*;
+import java.util.function.Function;
+import java.util.stream.Collectors;
public class NodeManager {
@@ -71,7 +74,7 @@ public class NodeManager {
return nodeInfo;
}
- public static void init() {
+ public void init() {
NodeInfo nodeInfo = NodeInfoBuilder.create(getNodeControllerId())
.withNodeControllerPort(getNodeControllerPort())
@@ -81,12 +84,36 @@ public class NodeManager {
.withNodeModel(getNodeModel())
.withNodeResources(getNodeResources())
.withNodeBroker(getNodeBrokerHost(), getNodeBrokerPort())
+ .withRegisteredDockerContainer(getRegisteredDockerContainer())
.withSupportedPipelineElements(getSupportedPipelineElements())
.build();
NodeManager.getInstance().add(nodeInfo);
}
+ private static List<DockerContainer> getRegisteredDockerContainer() {
+ List<DockerContainer> containers = new ArrayList<>();
+ DockerUtils.getInstance().getRunningStreamPipesContainer()
+ .forEach(rc -> {
+ DockerContainer c = new DockerContainer();
+ c.setContainerName(rc.names().get(0));
+ c.setImageURI(rc.image());
+
+ Optional<String> serviceId = rc.labels().entrySet().stream()
+ .filter(l -> l.getKey().contains("org.apache.streampipes.service.id"))
+ .map(Map.Entry::getValue)
+ .findFirst();
+
+ serviceId.ifPresent(c::setServiceId);
+
+ c.setLabels(rc.labels().entrySet().stream()
+ .filter(l -> l.getKey().contains("org.apache.streampipes"))
+ .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue)));
+ containers.add(c);
+ });
+ return containers;
+ }
+
private static String getNodeType() {
return NodeControllerConfig.INSTANCE.getNodeType();
}
@@ -201,7 +228,6 @@ public class NodeManager {
}
private static Docker getDocker() {
-
Docker docker = new Docker();
docker.setHasDocker(true);
docker.setHasNvidiaRuntime(DockerInfo.isHasNvidiaRuntime());
diff --git a/streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/management/orchestrator/ContainerStatus.java b/streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/management/orchestrator/ContainerStatus.java
deleted file mode 100644
index 00a2c75..0000000
--- a/streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/management/orchestrator/ContainerStatus.java
+++ /dev/null
@@ -1,26 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-package org.apache.streampipes.node.controller.container.management.orchestrator;
-
-public enum ContainerStatus {
- DEPLOYED,
- RUNNING,
- STOPPED,
- REMOVED,
- UNKNOWN
-}
diff --git a/streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/management/orchestrator/docker/ContainerType.java b/streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/management/orchestrator/docker/ContainerType.java
deleted file mode 100644
index e5e3924..0000000
--- a/streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/management/orchestrator/docker/ContainerType.java
+++ /dev/null
@@ -1,22 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-package org.apache.streampipes.node.controller.container.management.orchestrator.docker;
-
-public enum ContainerType {
- EXTENSIONS, BROKER
-}
diff --git a/streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/management/orchestrator/docker/DockerConstants.java b/streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/management/orchestrator/docker/DockerConstants.java
new file mode 100644
index 0000000..d5b0a98
--- /dev/null
+++ b/streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/management/orchestrator/docker/DockerConstants.java
@@ -0,0 +1,114 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.apache.streampipes.node.controller.container.management.orchestrator.docker;
+
+import org.apache.streampipes.model.node.DockerContainer;
+import org.apache.streampipes.model.node.DockerContainerBuilder;
+import org.apache.streampipes.node.controller.container.config.ConfigKeys;
+import org.apache.streampipes.node.controller.container.config.NodeControllerConfig;
+
+import java.util.*;
+
+public class DockerConstants {
+
+ public static final String SP_DOCKER_CONTAINER_REPOSITORY = "apachestreampipes";
+ public static final String SP_VERSION = NodeControllerConfig.INSTANCE.getSpVersion();
+ public static final String SP_DOCKER_CONTAINER_NAME_PREFIX = "streampipes_";
+ public static final String SP_DOCKER_NETWORK_NAME = "spnet";
+
+ // extensions container
+ public static final String SP_EXTENSIONS_ID = "pe/org.apache.streampipes.extensions.all.jvm";
+ public static final String SP_DOCKER_IMAGE_EXTENSIONS_NAME = "extensions-all-jvm";
+ public static final String SP_DOCKER_CONTAINER_EXTENSIONS_IMG_TAG = SP_DOCKER_CONTAINER_REPOSITORY +
+ "/" + SP_DOCKER_IMAGE_EXTENSIONS_NAME + ":" + SP_VERSION;
+ public static final String SP_DOCKER_CONTAINER_EXTENSIONS_NAME = SP_DOCKER_CONTAINER_NAME_PREFIX + "extensions";
+ public static final String[] SP_DOCKER_CONTAINER_EXTENSIONS_PORT = new String[]{"7025"};
+
+ //broker container
+ public static final String SP_BROKER_ID = "pe/org.apache.streampipes.node.broker";
+ public static final String SP_DOCKER_IMAGE_BROKER_NAME = "eclipse-mosquitto";
+ public static final String SP_DOCKER_IMAGE_BROKER_VERSION = "1.6.12";
+ public static final String SP_DOCKER_CONTAINER_BROKER_IMG_TAG =
+ SP_DOCKER_IMAGE_BROKER_NAME + ":" + SP_DOCKER_IMAGE_BROKER_VERSION;
+ public static final String SP_DOCKER_CONTAINER_BROKER_NAME = SP_DOCKER_CONTAINER_NAME_PREFIX + "broker";
+ public static final String[] SP_DOCKER_CONTAINER_BROKER_PORT =
+ System.getenv(ConfigKeys.NODE_BROKER_CONTAINER_PORT) != null ?
+ new String[]{System.getenv(ConfigKeys.NODE_BROKER_CONTAINER_PORT)} : new String[]{"1883"};
+
+ public static final List<String> SP_DOCKER_CONTAINER_ENV_VARIABLES = Arrays.asList(
+ ConfigKeys.NODE_CONTROLLER_ID + "=" + NodeControllerConfig.INSTANCE.getNodeControllerId(),
+ ConfigKeys.NODE_CONTROLLER_CONTAINER_HOST + "=" + NodeControllerConfig.INSTANCE.getNodeHostName(),
+ ConfigKeys.NODE_CONTROLLER_CONTAINER_PORT + "=" + NodeControllerConfig.INSTANCE.getNodeControllerPort(),
+ ConfigKeys.NODE_BROKER_CONTAINER_HOST + "=" + NodeControllerConfig.INSTANCE.getNodeBrokerHost(),
+ ConfigKeys.NODE_BROKER_CONTAINER_PORT + "=" + NodeControllerConfig.INSTANCE.getNodeBrokerPort()
+ );
+
+ public static final Map<String, String> SP_DOCKER_CONTAINER_EXTENSIONS_LABELS = new HashMap<String, String>() {{
+ put("org.apache.streampipes.service.id", SP_EXTENSIONS_ID);
+ put("org.apache.streampipes.node.type", NodeControllerConfig.INSTANCE.getNodeType());
+ put("org.apache.streampipes.container.type", "extensions");
+ }};
+
+ public static final Map<String, String> SP_DOCKER_CONTAINER_BROKER_LABELS = new HashMap<String, String>() {{
+ put("org.apache.streampipes.service.id", SP_BROKER_ID);
+ put("org.apache.streampipes.node.type", NodeControllerConfig.INSTANCE.getNodeType());
+ put("org.apache.streampipes.container.type", "broker");
+ }};
+
+
+ public enum ContainerStatus {
+ DEPLOYED, RUNNING, STOPPED, REMOVED, UNKNOWN
+ }
+
+ public enum ContainerType {
+ EXTENSIONS, BROKER
+ }
+
+ public enum NodeContainer {
+ INSTANCE;
+
+ public List<DockerContainer> getAllStreamPipesContainer() {
+ List<DockerContainer> nodeContainers = new ArrayList<>();
+
+ // Extensions container
+ DockerContainer extensions = DockerContainerBuilder.create(SP_EXTENSIONS_ID)
+ .withImage(DockerConstants.SP_DOCKER_CONTAINER_EXTENSIONS_IMG_TAG)
+ .withName(DockerConstants.SP_DOCKER_CONTAINER_EXTENSIONS_NAME)
+ .withExposedPorts(DockerConstants.SP_DOCKER_CONTAINER_EXTENSIONS_PORT)
+ .withEnvironmentVariables(SP_DOCKER_CONTAINER_ENV_VARIABLES)
+ .withLabels(DockerConstants.SP_DOCKER_CONTAINER_EXTENSIONS_LABELS)
+ .build();
+
+ // Node broker container
+ DockerContainer nodeBroker = DockerContainerBuilder.create(SP_BROKER_ID)
+ .withImage(DockerConstants.SP_DOCKER_CONTAINER_BROKER_IMG_TAG)
+ .withName(DockerConstants.SP_DOCKER_CONTAINER_BROKER_NAME)
+ .withExposedPorts(DockerConstants.SP_DOCKER_CONTAINER_BROKER_PORT)
+ .withEnvironmentVariables(SP_DOCKER_CONTAINER_ENV_VARIABLES)
+ .withLabels(SP_DOCKER_CONTAINER_BROKER_LABELS)
+ .build();
+
+ nodeContainers.add(extensions);
+ nodeContainers.add(nodeBroker);
+
+ return nodeContainers;
+ }
+
+ }
+
+}
diff --git a/streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/management/orchestrator/docker/DockerContainerOrchestrator.java b/streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/management/orchestrator/docker/DockerContainerManager.java
similarity index 84%
rename from streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/management/orchestrator/docker/DockerContainerOrchestrator.java
rename to streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/management/orchestrator/docker/DockerContainerManager.java
index fff9158..f07446a 100644
--- a/streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/management/orchestrator/docker/DockerContainerOrchestrator.java
+++ b/streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/management/orchestrator/docker/DockerContainerManager.java
@@ -26,7 +26,6 @@ import org.apache.commons.lang.StringUtils;
import org.apache.streampipes.container.util.ConsulUtil;
import org.apache.streampipes.model.node.DockerContainer;
import org.apache.streampipes.node.controller.container.management.orchestrator.ContainerOrchestrator;
-import org.apache.streampipes.node.controller.container.management.orchestrator.ContainerStatus;
import org.apache.streampipes.node.controller.container.management.orchestrator.docker.utils.DockerUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -34,21 +33,21 @@ import org.slf4j.LoggerFactory;
import java.io.Serializable;
import java.util.*;
-public class DockerContainerOrchestrator implements ContainerOrchestrator {
+public class DockerContainerManager implements ContainerOrchestrator {
private static final Logger LOG =
- LoggerFactory.getLogger(DockerContainerOrchestrator.class.getCanonicalName());
+ LoggerFactory.getLogger(DockerContainerManager.class.getCanonicalName());
private final DockerUtils docker = DockerUtils.getInstance();
- private static DockerContainerOrchestrator instance = null;
+ private static DockerContainerManager instance = null;
- private DockerContainerOrchestrator() {}
+ private DockerContainerManager() {}
- public static DockerContainerOrchestrator getInstance() {
+ public static DockerContainerManager getInstance() {
if (instance == null) {
- synchronized (DockerContainerOrchestrator.class) {
+ synchronized (DockerContainerManager.class) {
if (instance == null)
- instance = new DockerContainerOrchestrator();
+ instance = new DockerContainerManager();
}
}
return instance;
@@ -56,7 +55,7 @@ public class DockerContainerOrchestrator implements ContainerOrchestrator {
@Override
public void init() {
- StreamPipesNodeContainer.INSTANCE.get().forEach(this::deploy);
+ DockerConstants.NodeContainer.INSTANCE.getAllStreamPipesContainer().forEach(this::deploy);
}
@Override
@@ -76,14 +75,14 @@ public class DockerContainerOrchestrator implements ContainerOrchestrator {
ImmutableMap<String, ? extends Serializable> m = ImmutableMap.of(
"pipelineElementContainer", p.getContainerName(),
"containerId", containerId,
- "status", ContainerStatus.DEPLOYED
+ "status", DockerConstants.ContainerStatus.DEPLOYED
);
return new Gson().toJson(m);
}
LOG.info("Container already running {}", p.getContainerName());
ImmutableMap<String, ? extends Serializable> m = ImmutableMap.of(
"message", "Pipeline element container already running",
- "status", ContainerStatus.RUNNING
+ "status", DockerConstants.ContainerStatus.RUNNING
);
return new Gson().toJson(m);
}
@@ -92,7 +91,7 @@ public class DockerContainerOrchestrator implements ContainerOrchestrator {
public String remove(DockerContainer p) {
LOG.info("Remove pipeline element container: {}", p.getImageURI());
- Optional<Container> containerOptional = docker.getContainer(p.getContainerName());
+ Optional<com.spotify.docker.client.messages.Container> containerOptional = docker.getContainer(p.getContainerName());
if(containerOptional.isPresent()) {
docker.forceRemove(p.getContainerName());
@@ -104,14 +103,14 @@ public class DockerContainerOrchestrator implements ContainerOrchestrator {
ImmutableMap<String, ? extends Serializable> m = ImmutableMap.of(
"message",
"Pipeline element container removed",
- "status", ContainerStatus.REMOVED
+ "status", DockerConstants.ContainerStatus.REMOVED
);
return new Gson().toJson(m);
}
ImmutableMap<String, ? extends Serializable> m = ImmutableMap.of(
"message",
"Pipeline element container does not exist",
- "status", ContainerStatus.UNKNOWN
+ "status", DockerConstants.ContainerStatus.UNKNOWN
);
return new Gson().toJson(m);
}
@@ -120,7 +119,7 @@ public class DockerContainerOrchestrator implements ContainerOrchestrator {
public String list() {
LOG.info("List running pipeline element container");
- List<Container> containerList = docker.getRunningPipelineElementContainer();
+ List<Container> containerList = docker.getRunningStreamPipesContainer();
HashMap<String, Object> m = new HashMap<>();
if (containerList.size() > 0) {
for (Container c: containerList) {
diff --git a/streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/management/orchestrator/docker/StreamPipesNodeContainer.java b/streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/management/orchestrator/docker/StreamPipesNodeContainer.java
deleted file mode 100644
index 1a1ae30..0000000
--- a/streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/management/orchestrator/docker/StreamPipesNodeContainer.java
+++ /dev/null
@@ -1,95 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-package org.apache.streampipes.node.controller.container.management.orchestrator.docker;
-
-import org.apache.streampipes.commons.exceptions.SpRuntimeException;
-import org.apache.streampipes.model.node.DockerContainer;
-import org.apache.streampipes.model.node.DockerContainerBuilder;
-import org.apache.streampipes.node.controller.container.config.ConfigKeys;
-import org.apache.streampipes.node.controller.container.config.NodeControllerConfig;
-
-import java.util.*;
-
-public enum StreamPipesNodeContainer {
- INSTANCE;
-
- private static final String LABEL_CONTAINER_TYPE_KEY = "org.apache.streampipes.container.type";
- private static final String LABEL_CONTAINER_TYPE_EXTENSIONS_VALUE = "pipeline-element";
- private static final String LABEL_CONTAINER_TYPE_BROKER_VALUE = "node-broker";
- private static final String LABEL_NODE_TYPE_KEY = "org.apache.streampipes.node.type";
- private static final String LABEL_NODE_TYPE_VALUE = NodeControllerConfig.INSTANCE.getNodeType();
-
- private static final String SP_DOCKER_REPOSITORY = "apachestreampipes";
- // TODO: get version from backend when registering node controller
- private static final String SP_VERSION = NodeControllerConfig.INSTANCE.getSpVersion();
- private static final String SP_CONTAINER_NAME_PREFIX = "streampipes_";
-
- public List<DockerContainer> get() {
- //NodeInfoStorage.getInstance().retrieveNodeInfo().getNodeResources().getHardwareResource().getCpu().getArch();
- List<DockerContainer> nodeContainers = new ArrayList<>();
-
- // Extensions container
- DockerContainer extensions = DockerContainerBuilder.create("pe/org.apache.streampipes.extensions.all.jvm")
- .withImage(SP_DOCKER_REPOSITORY + "/" + "extensions-all-jvm" + ":" + SP_VERSION)
- .withName(SP_CONTAINER_NAME_PREFIX + "extensions-all-jvm")
- .withExposedPorts(new String[]{"7023"})
- .withEnvironmentVariables(makeContainerEnvVars())
- .withLabels(makeLabels(ContainerType.EXTENSIONS))
- .build();
-
- // Node broker container
- DockerContainer nodeBroker = DockerContainerBuilder.create("pe/org.apache.streampipes.node.broker")
- .withImage("eclipse-mosquitto:1.6.12")
- .withName(SP_CONTAINER_NAME_PREFIX + "node-broker")
- .withExposedPorts(new String[]{"1883"})
- .withEnvironmentVariables(makeContainerEnvVars())
- .withLabels(makeLabels(ContainerType.BROKER))
- .build();
-
- nodeContainers.add(extensions);
- nodeContainers.add(nodeBroker);
-
- return nodeContainers;
- }
-
- private Map<String, String> makeLabels(ContainerType containerType) {
- switch (containerType) {
- case EXTENSIONS:
- return new HashMap<String,String>(){{
- put(LABEL_CONTAINER_TYPE_KEY, LABEL_CONTAINER_TYPE_EXTENSIONS_VALUE);
- put(LABEL_NODE_TYPE_KEY, LABEL_NODE_TYPE_VALUE);}};
- case BROKER:
- return new HashMap<String,String>(){{
- put(LABEL_CONTAINER_TYPE_KEY, LABEL_CONTAINER_TYPE_BROKER_VALUE);
- put(LABEL_NODE_TYPE_KEY, LABEL_NODE_TYPE_VALUE);}};
- default:
- break;
- }
- throw new SpRuntimeException("Unsupported container type: " + containerType);
- }
-
- private List<String> makeContainerEnvVars() {
- return Arrays.asList(
- ConfigKeys.NODE_CONTROLLER_ID + "=" + NodeControllerConfig.INSTANCE.getNodeControllerId(),
- ConfigKeys.NODE_CONTROLLER_CONTAINER_HOST + "=" + NodeControllerConfig.INSTANCE.getNodeHostName(),
- ConfigKeys.NODE_CONTROLLER_CONTAINER_PORT + "=" + NodeControllerConfig.INSTANCE.getNodeControllerPort(),
- ConfigKeys.NODE_BROKER_CONTAINER_HOST + "=" + NodeControllerConfig.INSTANCE.getNodeBrokerHost(),
- ConfigKeys.NODE_BROKER_CONTAINER_PORT + "=" + NodeControllerConfig.INSTANCE.getNodeControllerPort()
- );
- }
-}
diff --git a/streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/management/orchestrator/docker/utils/DockerUtils.java b/streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/management/orchestrator/docker/utils/DockerUtils.java
index 29c16ac..b91d9c5 100644
--- a/streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/management/orchestrator/docker/utils/DockerUtils.java
+++ b/streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/management/orchestrator/docker/utils/DockerUtils.java
@@ -26,6 +26,7 @@ import com.spotify.docker.client.exceptions.DockerException;
import com.spotify.docker.client.messages.*;
import com.spotify.docker.client.shaded.com.google.common.collect.ImmutableList;
import org.apache.streampipes.model.node.DockerContainer;
+import org.apache.streampipes.node.controller.container.management.orchestrator.docker.DockerConstants;
import org.apache.streampipes.node.controller.container.management.orchestrator.docker.DockerInfo;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -39,10 +40,8 @@ import java.util.stream.Collectors;
public class DockerUtils {
private static final Logger LOG = LoggerFactory.getLogger(DockerUtils.class.getCanonicalName());
- private static final String SP_NETWORK = "spnet";
- private static final String SP_CONTAINER_PREFIX = "streampipes_";
- private static final String DOCKER_UNIX_SOCK = "/var/run/docker.sock";
private static final String BLANK_SPACE = " ";
+ private static final String DOCKER_UNIX_SOCK = "/var/run/docker.sock";
private static DockerUtils instance;
private static DockerClient docker;
@@ -129,8 +128,8 @@ public class DockerUtils {
}
private String verifyContainerName(String containerName) {
- return containerName.startsWith(SP_CONTAINER_PREFIX) ?
- containerName : SP_CONTAINER_PREFIX + containerName;
+ return containerName.startsWith(DockerConstants.SP_DOCKER_CONTAINER_NAME_PREFIX) ?
+ containerName : DockerConstants.SP_DOCKER_CONTAINER_NAME_PREFIX + containerName;
}
private ContainerConfig getContainerConfig(DockerContainer p) {
@@ -140,8 +139,8 @@ public class DockerUtils {
.image(p.getImageURI())
.labels(p.getLabels())
.env(p.getEnvVars())
- .hostConfig(getHostConfig(SP_NETWORK, p.getContainerPorts()))
- .networkingConfig(getNetworkingConfig(SP_NETWORK, p.getContainerName()))
+ .hostConfig(getHostConfig(DockerConstants.SP_DOCKER_NETWORK_NAME, p.getContainerPorts()))
+ .networkingConfig(getNetworkingConfig(DockerConstants.SP_DOCKER_NETWORK_NAME, p.getContainerName()))
.build();
}
@@ -207,7 +206,7 @@ public class DockerUtils {
}
public static void prune() {
- //List<String> o = Arrays.asList("images", "containers", "volumes", "networks");
+ //List<String> pruneItems = Arrays.asList("images", "containers", "volumes", "networks");
List<String> pruneItems = Collections.singletonList("images");
for (String i: pruneItems) {
@@ -219,7 +218,7 @@ public class DockerUtils {
+ BLANK_SPACE
+ DOCKER_UNIX_SOCK
+ BLANK_SPACE
- + "http:/v" + getApiVersion() + "/" + i + "/prune");
+ + "http:/v" + apiVersion() + "/" + i + "/prune");
BufferedReader stdInput = new BufferedReader(new
InputStreamReader(p.getInputStream()));
@@ -257,7 +256,7 @@ public class DockerUtils {
return dockerInfo;
}
- private static String getApiVersion() {
+ private static String apiVersion() {
String version = "";
try {
version = docker.version().apiVersion();
@@ -279,10 +278,19 @@ public class DockerUtils {
.isPresent();
}
- public List<Container> getRunningPipelineElementContainer() {
+ public List<Container> getRunningStreamPipesContainer() {
+
+ Map<String,String> allContainerLabels = new HashMap<>();
+ allContainerLabels.putAll(DockerConstants.SP_DOCKER_CONTAINER_EXTENSIONS_LABELS);
+ allContainerLabels.putAll(DockerConstants.SP_DOCKER_CONTAINER_BROKER_LABELS);
+
return getContainerList()
.stream()
- .filter(c -> c.labels().containsValue("pipeline-element"))
+ .filter(c -> c.labels().values().stream()
+ .anyMatch(v -> allContainerLabels.values().stream()
+ .anyMatch(cl -> cl.equals(v))
+ )
+ )
.collect(Collectors.toList());
}
@@ -294,7 +302,7 @@ public class DockerUtils {
+ BLANK_SPACE
+ DOCKER_UNIX_SOCK
+ BLANK_SPACE
- + "http:/v" + getApiVersion() + "/info");
+ + "http:/v" + apiVersion() + "/info");
BufferedReader stdInput = new BufferedReader(new
InputStreamReader(p.getInputStream()));
diff --git a/streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/rest/InvocableEntityResource.java b/streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/rest/InvocableEntityResource.java
index 5cc29cd..fb09cc0 100644
--- a/streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/rest/InvocableEntityResource.java
+++ b/streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/rest/InvocableEntityResource.java
@@ -25,7 +25,7 @@ import org.apache.streampipes.model.graph.DataProcessorInvocation;
import org.apache.streampipes.model.graph.DataSinkInvocation;
import org.apache.streampipes.model.grounding.TransportProtocol;
import org.apache.streampipes.model.node.DockerContainer;
-import org.apache.streampipes.node.controller.container.management.orchestrator.docker.DockerContainerOrchestrator;
+import org.apache.streampipes.node.controller.container.management.orchestrator.docker.DockerContainerManager;
import org.apache.streampipes.node.controller.container.management.pe.InvocableElementManager;
import org.apache.streampipes.node.controller.container.management.pe.RunningInvocableInstances;
import org.apache.streampipes.node.controller.container.management.relay.EventRelay;
@@ -48,14 +48,14 @@ public class InvocableEntityResource extends AbstractResource {
@GET
@Produces(MediaType.APPLICATION_JSON)
public Response getPipelineElementContainer(){
- return ok(DockerContainerOrchestrator.getInstance().list());
+ return ok(DockerContainerManager.getInstance().list());
}
@POST
@Path("/deploy")
@Consumes(MediaType.APPLICATION_JSON)
public Response deployPipelineElementContainer(DockerContainer container) {
- return ok(DockerContainerOrchestrator.getInstance().deploy(container));
+ return ok(DockerContainerManager.getInstance().deploy(container));
}
@POST
@@ -155,6 +155,6 @@ public class InvocableEntityResource extends AbstractResource {
@Consumes(MediaType.APPLICATION_JSON)
public Response removePipelineElementContainer(DockerContainer container) {
InvocableElementManager.getInstance().unregister();
- return ok(DockerContainerOrchestrator.getInstance().remove(container));
+ return ok(DockerContainerManager.getInstance().remove(container));
}
}