You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@streampipes.apache.org by wi...@apache.org on 2020/11/23 10:57:57 UTC
[incubator-streampipes] branch edge-extensions updated: refactor
node-controller and add initial work on local container management
This is an automated email from the ASF dual-hosted git repository.
wiener pushed a commit to branch edge-extensions
in repository https://gitbox.apache.org/repos/asf/incubator-streampipes.git
The following commit(s) were added to refs/heads/edge-extensions by this push:
new 84d72fa refactor node-controller and add initial work on local container management
84d72fa is described below
commit 84d72fa4fde52d4ae3c252d44d1f183b8a3cc5e0
Author: Patrick Wiener <wi...@fzi.de>
AuthorDate: Mon Nov 23 11:57:34 2020 +0100
refactor node-controller and add initial work on local container management
---
.idea/runConfigurations/all_pe_jvm__edge_01_.xml | 35 +++++
.idea/runConfigurations/all_pe_jvm__edge_02_.xml | 35 +++++
.idea/runConfigurations/all_pe_jvm__fog_.xml | 35 +++++
.idea/runConfigurations/all_pe_jvm__primary_.xml | 21 +++
.idea/runConfigurations/node_controller_cloud.xml | 32 ++++
.../runConfigurations/node_controller_edge_01.xml | 34 +++++
.../runConfigurations/node_controller_edge_02.xml | 33 +++++
.idea/runConfigurations/node_controller_fog.xml | 32 ++++
.../standalone/init/StandaloneModelSubmitter.java | 28 ++--
.../model/node/InvocableRegistration.java | 52 +++++++
.../container/util/NodeControllerUtil.java | 154 +++++++++++---------
.../streampipes/model/node/NodeInfoBuilder.java | 5 +
.../streampipes/model/node/NodeMetadata.java | 12 +-
.../model/node/PipelineElementDockerContainer.java | 17 ++-
.../PipelineElementDockerContainerBuilder.java | 73 ++++++++++
streampipes-node-controller-container/pom.xml | 10 ++
...ainer.java => NodeControllerContainerInit.java} | 15 +-
.../controller/container/config/ConfigKeys.java | 31 ++--
.../container/config/NodeControllerConfig.java | 161 ++++++++-------------
...ContainerStatus.java => IRunningInstances.java} | 17 ++-
.../container/management/info/NodeInfoStorage.java | 9 +-
.../management/node/NodeJanitorManager.java | 2 +-
.../ContainerOrchestrator.java | 7 +-
.../ContainerStatus.java | 3 +-
.../RunningContainerInstances.java} | 24 +--
.../docker/DockerContainerOrchestrator.java} | 44 +++---
.../docker}/DockerInfo.java | 3 +-
.../orchestrator/docker/DockerNodeContainer.java | 67 +++++++++
.../docker}/DockerUtils.java | 31 ++--
.../management/pe/PipelineElementManager.java | 64 +++++++-
.../management/relay/RunningRelayInstances.java | 14 +-
.../management/resource/ResourceManager.java | 1 -
.../container/rest/DebugRelayResource.java | 2 +-
...ource.java => InvocableManagementResource.java} | 69 ++++-----
.../rest/NodeControllerResourceConfig.java | 2 +-
.../manager/execution/http/GraphSubmitter.java | 8 +-
.../manager/matching/InvocationGraphBuilder.java | 6 +-
.../save-pipeline/save-pipeline.component.html | 2 +-
.../save-pipeline/save-pipeline.component.ts | 4 +-
39 files changed, 864 insertions(+), 330 deletions(-)
diff --git a/.idea/runConfigurations/all_pe_jvm__edge_01_.xml b/.idea/runConfigurations/all_pe_jvm__edge_01_.xml
new file mode 100644
index 0000000..ae3a99a
--- /dev/null
+++ b/.idea/runConfigurations/all_pe_jvm__edge_01_.xml
@@ -0,0 +1,35 @@
+<component name="ProjectRunConfigurationManager">
+ <configuration default="false" name="all-pe-jvm (edge-01)" type="Application" factoryName="Application" folderName="edge-extensions">
+ <envs>
+ <env name="SP_PORT" value="7024" />
+ <env name="SP_HOST" value="edge-01.fzi.de" />
+ <env name="SP_DEBUG" value="true" />
+ <env name="SP_COUCHDB_HOST" value="localhost" />
+ <env name="SP_JMS_HOST" value="localhost" />
+ <env name="SP_JMS_PORT" value="61616" />
+ <env name="SP_DATA_LAKE_HOST" value="localhost" />
+ <env name="SP_DATA_LAKE_PORT" value="8086" />
+ <env name="SP_BACKEND_HOST" value="localhost" />
+ <env name="SP_BACKEND_PORT" value="8030" />
+ <env name="SP_NODE_ID" value="edge-01.node-controller" />
+ <env name="SP_NODE_CONTROLLER_HOST" value="edge-01.fzi.de" />
+ <env name="SP_NODE_CONTROLLER_PORT" value="7077" />
+ </envs>
+ <option name="MAIN_CLASS_NAME" value="org.apache.streampipes.pe.jvm.AllPipelineElementsInit" />
+ <module name="streampipes-pipeline-elements-all-jvm" />
+ <extension name="net.ashald.envfile">
+ <option name="IS_ENABLED" value="true" />
+ <option name="IS_SUBST" value="false" />
+ <option name="IS_PATH_MACRO_SUPPORTED" value="false" />
+ <option name="IS_IGNORE_MISSING_FILES" value="false" />
+ <option name="IS_ENABLE_EXPERIMENTAL_INTEGRATIONS" value="false" />
+ <ENTRIES>
+ <ENTRY IS_ENABLED="true" PARSER="runconfig" />
+ <ENTRY IS_ENABLED="true" PARSER="env" PATH="streampipes-pipeline-elements-all-jvm/development/env.secondary" />
+ </ENTRIES>
+ </extension>
+ <method v="2">
+ <option name="Make" enabled="true" />
+ </method>
+ </configuration>
+</component>
\ No newline at end of file
diff --git a/.idea/runConfigurations/all_pe_jvm__edge_02_.xml b/.idea/runConfigurations/all_pe_jvm__edge_02_.xml
new file mode 100644
index 0000000..15893a6
--- /dev/null
+++ b/.idea/runConfigurations/all_pe_jvm__edge_02_.xml
@@ -0,0 +1,35 @@
+<component name="ProjectRunConfigurationManager">
+ <configuration default="false" name="all-pe-jvm (edge-02)" type="Application" factoryName="Application" folderName="edge-extensions">
+ <envs>
+ <env name="SP_PORT" value="7025" />
+ <env name="SP_HOST" value="edge-02.fzi.de" />
+ <env name="SP_DEBUG" value="true" />
+ <env name="SP_COUCHDB_HOST" value="localhost" />
+ <env name="SP_JMS_HOST" value="localhost" />
+ <env name="SP_JMS_PORT" value="61616" />
+ <env name="SP_DATA_LAKE_HOST" value="localhost" />
+ <env name="SP_DATA_LAKE_PORT" value="8086" />
+ <env name="SP_BACKEND_HOST" value="localhost" />
+ <env name="SP_BACKEND_PORT" value="8030" />
+ <env name="SP_NODE_ID" value="edge-02.node-controller" />
+ <env name="SP_NODE_CONTROLLER_HOST" value="edge-02.fzi.de" />
+ <env name="SP_NODE_CONTROLLER_PORT" value="7078" />
+ </envs>
+ <option name="MAIN_CLASS_NAME" value="org.apache.streampipes.pe.jvm.AllPipelineElementsInit" />
+ <module name="streampipes-pipeline-elements-all-jvm" />
+ <extension name="net.ashald.envfile">
+ <option name="IS_ENABLED" value="true" />
+ <option name="IS_SUBST" value="false" />
+ <option name="IS_PATH_MACRO_SUPPORTED" value="false" />
+ <option name="IS_IGNORE_MISSING_FILES" value="false" />
+ <option name="IS_ENABLE_EXPERIMENTAL_INTEGRATIONS" value="false" />
+ <ENTRIES>
+ <ENTRY IS_ENABLED="true" PARSER="runconfig" />
+ <ENTRY IS_ENABLED="true" PARSER="env" PATH="streampipes-pipeline-elements-all-jvm/development/env.secondary" />
+ </ENTRIES>
+ </extension>
+ <method v="2">
+ <option name="Make" enabled="true" />
+ </method>
+ </configuration>
+</component>
\ No newline at end of file
diff --git a/.idea/runConfigurations/all_pe_jvm__fog_.xml b/.idea/runConfigurations/all_pe_jvm__fog_.xml
new file mode 100644
index 0000000..0ece8ec
--- /dev/null
+++ b/.idea/runConfigurations/all_pe_jvm__fog_.xml
@@ -0,0 +1,35 @@
+<component name="ProjectRunConfigurationManager">
+ <configuration default="false" name="all-pe-jvm (fog)" type="Application" factoryName="Application" folderName="edge-extensions">
+ <envs>
+ <env name="SP_PORT" value="7026" />
+ <env name="SP_HOST" value="fog.fzi.de" />
+ <env name="SP_DEBUG" value="true" />
+ <env name="SP_COUCHDB_HOST" value="localhost" />
+ <env name="SP_JMS_HOST" value="localhost" />
+ <env name="SP_JMS_PORT" value="61616" />
+ <env name="SP_DATA_LAKE_HOST" value="localhost" />
+ <env name="SP_DATA_LAKE_PORT" value="8086" />
+ <env name="SP_BACKEND_HOST" value="localhost" />
+ <env name="SP_BACKEND_PORT" value="8030" />
+ <env name="SP_NODE_ID" value="fog.node-controller" />
+ <env name="SP_NODE_CONTROLLER_HOST" value="fog.fzi.de" />
+ <env name="SP_NODE_CONTROLLER_PORT" value="7079" />
+ </envs>
+ <option name="MAIN_CLASS_NAME" value="org.apache.streampipes.pe.jvm.AllPipelineElementsInit" />
+ <module name="streampipes-pipeline-elements-all-jvm" />
+ <extension name="net.ashald.envfile">
+ <option name="IS_ENABLED" value="true" />
+ <option name="IS_SUBST" value="false" />
+ <option name="IS_PATH_MACRO_SUPPORTED" value="false" />
+ <option name="IS_IGNORE_MISSING_FILES" value="false" />
+ <option name="IS_ENABLE_EXPERIMENTAL_INTEGRATIONS" value="false" />
+ <ENTRIES>
+ <ENTRY IS_ENABLED="true" PARSER="runconfig" />
+ <ENTRY IS_ENABLED="true" PARSER="env" PATH="streampipes-pipeline-elements-all-jvm/development/env.secondary" />
+ </ENTRIES>
+ </extension>
+ <method v="2">
+ <option name="Make" enabled="true" />
+ </method>
+ </configuration>
+</component>
\ No newline at end of file
diff --git a/.idea/runConfigurations/all_pe_jvm__primary_.xml b/.idea/runConfigurations/all_pe_jvm__primary_.xml
new file mode 100644
index 0000000..ce8acb6
--- /dev/null
+++ b/.idea/runConfigurations/all_pe_jvm__primary_.xml
@@ -0,0 +1,21 @@
+<component name="ProjectRunConfigurationManager">
+ <configuration default="false" name="all-pe-jvm (primary)" type="Application" factoryName="Application" folderName="edge-extensions">
+ <option name="MAIN_CLASS_NAME" value="org.apache.streampipes.pe.jvm.AllPipelineElementsInit" />
+ <module name="streampipes-pipeline-elements-all-jvm" />
+ <envs>
+ <env name="SP_PORT" value="7023" />
+ <env name="SP_HOST" value="host.docker.internal" />
+ <env name="SP_DEBUG" value="true" />
+ <env name="SP_COUCHDB_HOST" value="localhost" />
+ <env name="SP_JMS_HOST" value="localhost" />
+ <env name="SP_JMS_PORT" value="61616" />
+ <env name="SP_DATA_LAKE_HOST" value="localhost" />
+ <env name="SP_DATA_LAKE_PORT" value="8086" />
+ <env name="SP_BACKEND_HOST" value="localhost" />
+ <env name="SP_BACKEND_PORT" value="8030" />
+ </envs>
+ <method v="2">
+ <option name="Make" enabled="true" />
+ </method>
+ </configuration>
+</component>
\ No newline at end of file
diff --git a/.idea/runConfigurations/node_controller_cloud.xml b/.idea/runConfigurations/node_controller_cloud.xml
new file mode 100644
index 0000000..6cfc672
--- /dev/null
+++ b/.idea/runConfigurations/node_controller_cloud.xml
@@ -0,0 +1,32 @@
+<component name="ProjectRunConfigurationManager">
+ <configuration default="false" name="node-controller-cloud" type="SpringBootApplicationConfigurationType" factoryName="Spring Boot" folderName="edge-extensions">
+ <module name="streampipes-node-controller-container" />
+ <extension name="net.ashald.envfile">
+ <option name="IS_ENABLED" value="true" />
+ <option name="IS_SUBST" value="false" />
+ <option name="IS_PATH_MACRO_SUPPORTED" value="false" />
+ <option name="IS_IGNORE_MISSING_FILES" value="false" />
+ <option name="IS_ENABLE_EXPERIMENTAL_INTEGRATIONS" value="false" />
+ <ENTRIES>
+ <ENTRY IS_ENABLED="true" PARSER="runconfig" />
+ <ENTRY IS_ENABLED="true" PARSER="env" PATH="$PROJECT_DIR$/../incubator-streampipes/streampipes-node-controller-container/development/env" />
+ </ENTRIES>
+ </extension>
+ <option name="SPRING_BOOT_MAIN_CLASS" value="org.apache.streampipes.node.controller.container.NodeControllerContainerInit" />
+ <option name="ALTERNATIVE_JRE_PATH" />
+ <envs>
+ <env name="SP_BACKEND_HOST" value="host.docker.internal" />
+ <env name="CONSUL_LOCATION" value="host.docker.internal" />
+ <env name="SP_NODE_CONTROLLER_ID" value="cloud.node-controller" />
+ <env name="SP_NODE_CONTROLLER_PORT" value="7080" />
+ <env name="SP_NODE_HOST" value="cloud.fzi.de" />
+ <env name="SP_NODE_TYPE" value="cloud" />
+ <env name="SP_NODE_HAS_GPU" value="true" />
+ <env name="SP_NODE_LOCATION_CLOUD" value="bwcloud" />
+ <env name="SP_DEBUG" value="true" />
+ </envs>
+ <method v="2">
+ <option name="Make" enabled="true" />
+ </method>
+ </configuration>
+</component>
\ No newline at end of file
diff --git a/.idea/runConfigurations/node_controller_edge_01.xml b/.idea/runConfigurations/node_controller_edge_01.xml
new file mode 100644
index 0000000..450f314
--- /dev/null
+++ b/.idea/runConfigurations/node_controller_edge_01.xml
@@ -0,0 +1,34 @@
+<component name="ProjectRunConfigurationManager">
+ <configuration default="false" name="node-controller-edge-01" type="SpringBootApplicationConfigurationType" factoryName="Spring Boot" folderName="edge-extensions">
+ <module name="streampipes-node-controller-container" />
+ <extension name="net.ashald.envfile">
+ <option name="IS_ENABLED" value="true" />
+ <option name="IS_SUBST" value="false" />
+ <option name="IS_PATH_MACRO_SUPPORTED" value="false" />
+ <option name="IS_IGNORE_MISSING_FILES" value="false" />
+ <option name="IS_ENABLE_EXPERIMENTAL_INTEGRATIONS" value="false" />
+ <ENTRIES>
+ <ENTRY IS_ENABLED="true" PARSER="runconfig" />
+ <ENTRY IS_ENABLED="true" PARSER="env" PATH="$PROJECT_DIR$/../incubator-streampipes/streampipes-node-controller-container/development/env" />
+ </ENTRIES>
+ </extension>
+ <option name="SPRING_BOOT_MAIN_CLASS" value="org.apache.streampipes.node.controller.container.NodeControllerContainerInit" />
+ <option name="ALTERNATIVE_JRE_PATH" />
+ <envs>
+ <env name="SP_BACKEND_HOST" value="host.docker.internal" />
+ <env name="CONSUL_LOCATION" value="host.docker.internal" />
+ <env name="SP_NODE_CONTROLLER_ID" value="edge-01.node-controller" />
+ <env name="SP_NODE_CONTROLLER_PORT" value="7077" />
+ <env name="SP_NODE_HOST" value="edge-01.fzi.de" />
+ <env name="SP_NODE_TYPE" value="edge" />
+ <env name="SP_NODE_ACCESSIBLE_SENSOR_ACTUATOR_ROS" value="ros;universalrobot;ipe-k3s-pi1.fzi.de;network" />
+ <env name="SP_NODE_HAS_GPU" value="false" />
+ <env name="SP_NODE_LOCATION_BUILDING" value="holl" />
+ <env name="SP_NODE_LOCATION_MACHINE" value="UR" />
+ <env name="SP_DEBUG" value="true" />
+ </envs>
+ <method v="2">
+ <option name="Make" enabled="true" />
+ </method>
+ </configuration>
+</component>
\ No newline at end of file
diff --git a/.idea/runConfigurations/node_controller_edge_02.xml b/.idea/runConfigurations/node_controller_edge_02.xml
new file mode 100644
index 0000000..c97ce50
--- /dev/null
+++ b/.idea/runConfigurations/node_controller_edge_02.xml
@@ -0,0 +1,33 @@
+<component name="ProjectRunConfigurationManager">
+ <configuration default="false" name="node-controller-edge-02" type="SpringBootApplicationConfigurationType" factoryName="Spring Boot" folderName="edge-extensions">
+ <module name="streampipes-node-controller-container" />
+ <extension name="net.ashald.envfile">
+ <option name="IS_ENABLED" value="true" />
+ <option name="IS_SUBST" value="false" />
+ <option name="IS_PATH_MACRO_SUPPORTED" value="false" />
+ <option name="IS_IGNORE_MISSING_FILES" value="false" />
+ <option name="IS_ENABLE_EXPERIMENTAL_INTEGRATIONS" value="false" />
+ <ENTRIES>
+ <ENTRY IS_ENABLED="true" PARSER="runconfig" />
+ <ENTRY IS_ENABLED="true" PARSER="env" PATH="$PROJECT_DIR$/../incubator-streampipes/streampipes-node-controller-container/development/env" />
+ </ENTRIES>
+ </extension>
+ <option name="SPRING_BOOT_MAIN_CLASS" value="org.apache.streampipes.node.controller.container.NodeControllerContainerInit" />
+ <option name="ALTERNATIVE_JRE_PATH" />
+ <envs>
+ <env name="SP_BACKEND_HOST" value="host.docker.internal" />
+ <env name="CONSUL_LOCATION" value="host.docker.internal" />
+ <env name="SP_NODE_CONTROLLER_ID" value="edge-02.node-controller" />
+ <env name="SP_NODE_CONTROLLER_PORT" value="7078" />
+ <env name="SP_NODE_HOST" value="edge-02.fzi.de" />
+ <env name="SP_NODE_TYPE" value="edge" />
+ <env name="SP_NODE_ACCESSIBLE_SENSOR_ACTUATOR_ZED" value="zed;sensor;/dev/video1;usb" />
+ <env name="SP_NODE_HAS_GPU" value="true" />
+ <env name="SP_NODE_LOCATION_BUILDING" value="holl" />
+ <env name="SP_DEBUG" value="true" />
+ </envs>
+ <method v="2">
+ <option name="Make" enabled="true" />
+ </method>
+ </configuration>
+</component>
\ No newline at end of file
diff --git a/.idea/runConfigurations/node_controller_fog.xml b/.idea/runConfigurations/node_controller_fog.xml
new file mode 100644
index 0000000..041e321
--- /dev/null
+++ b/.idea/runConfigurations/node_controller_fog.xml
@@ -0,0 +1,32 @@
+<component name="ProjectRunConfigurationManager">
+ <configuration default="false" name="node-controller-fog" type="SpringBootApplicationConfigurationType" factoryName="Spring Boot" folderName="edge-extensions">
+ <module name="streampipes-node-controller-container" />
+ <extension name="net.ashald.envfile">
+ <option name="IS_ENABLED" value="true" />
+ <option name="IS_SUBST" value="false" />
+ <option name="IS_PATH_MACRO_SUPPORTED" value="false" />
+ <option name="IS_IGNORE_MISSING_FILES" value="false" />
+ <option name="IS_ENABLE_EXPERIMENTAL_INTEGRATIONS" value="false" />
+ <ENTRIES>
+ <ENTRY IS_ENABLED="true" PARSER="runconfig" />
+ <ENTRY IS_ENABLED="true" PARSER="env" PATH="$PROJECT_DIR$/../incubator-streampipes/streampipes-node-controller-container/development/env" />
+ </ENTRIES>
+ </extension>
+ <option name="SPRING_BOOT_MAIN_CLASS" value="org.apache.streampipes.node.controller.container.NodeControllerContainerInit" />
+ <option name="ALTERNATIVE_JRE_PATH" />
+ <envs>
+ <env name="SP_BACKEND_HOST" value="host.docker.internal" />
+ <env name="CONSUL_LOCATION" value="host.docker.internal" />
+ <env name="SP_NODE_CONTROLLER_ID" value="fog.node-controller" />
+ <env name="SP_NODE_CONTROLLER_PORT" value="7079" />
+ <env name="SP_NODE_HOST" value="fog.fzi.de" />
+ <env name="SP_NODE_TYPE" value="fog" />
+ <env name="SP_NODE_HAS_GPU" value="true" />
+ <env name="SP_NODE_LOCATION_BUILDING" value="main" />
+ <env name="SP_DEBUG" value="true" />
+ </envs>
+ <method v="2">
+ <option name="Make" enabled="true" />
+ </method>
+ </configuration>
+</component>
\ No newline at end of file
diff --git a/streampipes-container-standalone/src/main/java/org/apache/streampipes/container/standalone/init/StandaloneModelSubmitter.java b/streampipes-container-standalone/src/main/java/org/apache/streampipes/container/standalone/init/StandaloneModelSubmitter.java
index 377042f..0d28466 100644
--- a/streampipes-container-standalone/src/main/java/org/apache/streampipes/container/standalone/init/StandaloneModelSubmitter.java
+++ b/streampipes-container-standalone/src/main/java/org/apache/streampipes/container/standalone/init/StandaloneModelSubmitter.java
@@ -19,6 +19,7 @@
package org.apache.streampipes.container.standalone.init;
+import org.apache.streampipes.container.util.NodeControllerUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.boot.SpringApplication;
@@ -56,17 +57,22 @@ public abstract class StandaloneModelSubmitter extends ModelSubmitter {
app.setDefaultProperties(Collections.singletonMap("server.port", peConfig.getPort()));
app.run();
- ConsulUtil.registerPeService(
- peConfig.getId(),
- peConfig.getHost(),
- peConfig.getPort()
- );
-
-// NodeUtil.registerPeService(
-// peConfig.getId(),
-// peConfig.getHost(),
-// peConfig.getPort()
-// );
+ // check wether pipeline element is managed by node controller
+ if (System.getenv("SP_NODE_ID") != null) {
+ // secondary
+ // register pipeline element service via node controller
+ NodeControllerUtil.register(
+ peConfig.getId(),
+ peConfig.getHost(),
+ peConfig.getPort(),
+ DeclarersSingleton.getInstance().getEpaDeclarers());
+ } else {
+ // primary
+ ConsulUtil.registerPeService(
+ peConfig.getId(),
+ peConfig.getHost(),
+ peConfig.getPort());
+ }
}
@PreDestroy
diff --git a/streampipes-container/src/main/java/org/apache/streampipes/container/model/node/InvocableRegistration.java b/streampipes-container/src/main/java/org/apache/streampipes/container/model/node/InvocableRegistration.java
new file mode 100644
index 0000000..4605b6c
--- /dev/null
+++ b/streampipes-container/src/main/java/org/apache/streampipes/container/model/node/InvocableRegistration.java
@@ -0,0 +1,52 @@
+package org.apache.streampipes.container.model.node;/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+import org.apache.streampipes.container.model.consul.ConsulServiceRegistrationBody;
+
+import java.util.List;
+
+public class InvocableRegistration {
+
+ private ConsulServiceRegistrationBody consulServiceRegistrationBody;
+ private List<String> supportedPipelineElementAppIds;
+
+ public InvocableRegistration() {
+ }
+
+ public InvocableRegistration(ConsulServiceRegistrationBody consulServiceRegistrationBody,
+ List<String> supportedPipelineElementAppIds) {
+ this.consulServiceRegistrationBody = consulServiceRegistrationBody;
+ this.supportedPipelineElementAppIds = supportedPipelineElementAppIds;
+ }
+
+ public ConsulServiceRegistrationBody getConsulServiceRegistrationBody() {
+ return consulServiceRegistrationBody;
+ }
+
+ public void setConsulServiceRegistrationBody(ConsulServiceRegistrationBody consulServiceRegistrationBody) {
+ this.consulServiceRegistrationBody = consulServiceRegistrationBody;
+ }
+
+ public List<String> getSupportedPipelineElementAppIds() {
+ return supportedPipelineElementAppIds;
+ }
+
+ public void setSupportedPipelineElementAppIds(List<String> supportedPipelineElementAppIds) {
+ this.supportedPipelineElementAppIds = supportedPipelineElementAppIds;
+ }
+}
diff --git a/streampipes-container/src/main/java/org/apache/streampipes/container/util/NodeControllerUtil.java b/streampipes-container/src/main/java/org/apache/streampipes/container/util/NodeControllerUtil.java
index b74a422..fd99736 100644
--- a/streampipes-container/src/main/java/org/apache/streampipes/container/util/NodeControllerUtil.java
+++ b/streampipes-container/src/main/java/org/apache/streampipes/container/util/NodeControllerUtil.java
@@ -16,105 +16,117 @@ package org.apache.streampipes.container.util;/*
*
*/
-import com.google.gson.Gson;
-import org.apache.http.HttpHeaders;
-import org.apache.http.client.HttpClient;
-import org.apache.http.client.methods.HttpUriRequest;
-import org.apache.http.client.methods.RequestBuilder;
-import org.apache.http.entity.StringEntity;
-import org.apache.http.impl.client.HttpClients;
+import com.fasterxml.jackson.core.JsonProcessingException;
+import org.apache.http.client.fluent.Request;
+import org.apache.http.entity.ContentType;
+import org.apache.streampipes.container.declarer.SemanticEventProcessingAgentDeclarer;
+import org.apache.streampipes.container.model.node.InvocableRegistration;
import org.apache.streampipes.container.model.consul.ConsulServiceRegistrationBody;
import org.apache.streampipes.container.model.consul.HealthCheckConfiguration;
+import org.apache.streampipes.serializers.json.JacksonSerializer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
-import java.net.MalformedURLException;
-import java.net.URL;
-import java.util.Arrays;
-import java.util.List;
-import java.util.Map;
+import java.util.*;
public class NodeControllerUtil {
static Logger LOG = LoggerFactory.getLogger(NodeControllerUtil.class);
private static final String PROTOCOL = "http://";
-
+ private static final String COLON = ":";
+ private static final String SLASH = "/";
private static final String HEALTH_CHECK_INTERVAL = "10s";
private static final String PE_SERVICE_NAME = "pe";
- private static final String PRIMARY_PE_IDENTIFIER = "primary";
- private static final String SECONDARY_PE_IDENTIFIER = "secondary";
- private static final String NODE_ID_IDENTIFIER = "SP_NODE_ID";
- private static final String SLASH = "/";
+ private static final String PE_SECONDARY_TAG = "secondary";
+ private static final String NODE_CONTROLLER_REGISTER_SVC_URL = "node/container/register";
+
+ public static void register(String serviceID, String host, int port,
+ Map<String, SemanticEventProcessingAgentDeclarer> epaDeclarers) {
+ register(PE_SERVICE_NAME, makeSvcId(host, serviceID), host, port,
+ Arrays.asList(PE_SERVICE_NAME, PE_SECONDARY_TAG), epaDeclarers);
+ }
+
+ public static void register(String svcName, String svcId, String url, int port, List<String> tag,
+ Map<String, SemanticEventProcessingAgentDeclarer> epaDeclarers) {
- private static final String NODE_CONTROLLER_LOCATION = "SP_NODE_ID";
- private static final String NODE_CONTROLLER_REGISTER_URL = "node/pe/container/register";
+ boolean connected = false;
- // dummy class to route consul registration request to node controller instead of consul
+ while (!connected) {
+ LOG.info("Trying to register pipeline element container at node controller: " + makeRegistrationEndpoint());
+ String body = createSvcBody(svcName, svcId, url, port, tag, epaDeclarers);
+ connected = registerSvcHttpClient(body);
- public static void registerPeService(String serviceID, String url, int port) {
- String serviceLocationTag = System.getenv(NODE_ID_IDENTIFIER) == null ? PRIMARY_PE_IDENTIFIER : SECONDARY_PE_IDENTIFIER;
- String uniquePEServiceId = url + SLASH + serviceID;
- registerService(PE_SERVICE_NAME, uniquePEServiceId, url, port, Arrays.asList("pe", serviceLocationTag));
+ if (!connected) {
+ LOG.info("Retrying in 5 seconds");
+ try {
+ Thread.sleep(5000);
+ } catch (InterruptedException e) {
+ e.printStackTrace();
+ }
+ }
+ }
+ LOG.info("Successfully registered pipeline element container: " + svcId);
}
- public static void registerService(String serviceName, String serviceID, String url, int port, List<String> tag) {
- String body = createServiceRegisterBody(serviceName, serviceID, url, port, tag);
+ private static boolean registerSvcHttpClient(String body) {
+ String endpoint = makeRegistrationEndpoint();
try {
- registerServiceHttpClient(body);
- LOG.info("Register service " + serviceID +" successful");
+ Request.Post(makeRegistrationEndpoint())
+ .bodyString(body, ContentType.APPLICATION_JSON)
+ .connectTimeout(1000)
+ .socketTimeout(100000)
+ .execute();
+ return true;
} catch (IOException e) {
- LOG.error("Register service: " + serviceID, " - " + e.toString());
+ LOG.error("Could not register at " + endpoint);
}
+ return false;
}
- private static void registerServiceHttpClient(String body) throws IOException {
-// return Request.Post("http://localhost:7077/node/pe/container/register")
-// .addHeader("accept", "application/json")
-// .body(new StringEntity(body))
-// .execute()
-// .returnResponse()
-// .getStatusLine().getStatusCode();
- HttpClient client = HttpClients.custom().build();
- HttpUriRequest request = RequestBuilder.post()
- .setUri(nodeControllerURL().toString() + SLASH + NODE_CONTROLLER_REGISTER_URL)
- .setEntity(new StringEntity(body))
- .setHeader(HttpHeaders.CONTENT_TYPE, "application/json")
- .build();
- client.execute(request);
- }
+ private static String createSvcBody(String name, String id, String url, int port, List<String> tags,
+ Map<String, SemanticEventProcessingAgentDeclarer> epaDeclarers) {
+ try {
+ ConsulServiceRegistrationBody body = new ConsulServiceRegistrationBody();
+ String healthCheckURL = PROTOCOL + url + COLON + port;
+ body.setID(id);
+ body.setName(name);
+ body.setTags(tags);
+ body.setAddress(PROTOCOL + url);
+ body.setPort(port);
+ body.setEnableTagOverride(true);
+ body.setCheck(new HealthCheckConfiguration("GET", healthCheckURL, HEALTH_CHECK_INTERVAL));
- private static String createServiceRegisterBody(String name, String id, String url, int port, List<String> tags) {
- String healthCheckURL = PROTOCOL + url + ":" + port;
- ConsulServiceRegistrationBody body = new ConsulServiceRegistrationBody();
- body.setID(id);
- body.setName(name);
- body.setTags(tags);
- body.setAddress(PROTOCOL + url);
- body.setPort(port);
- body.setEnableTagOverride(true);
- body.setCheck(new HealthCheckConfiguration("GET", healthCheckURL, HEALTH_CHECK_INTERVAL));
+ InvocableRegistration svcBody = new InvocableRegistration();
+ svcBody.setConsulServiceRegistrationBody(body);
+ svcBody.setSupportedPipelineElementAppIds(new ArrayList<>(epaDeclarers.keySet()));
- return new Gson().toJson(body);
+ return JacksonSerializer.getObjectMapper().writeValueAsString(svcBody);
+ } catch (JsonProcessingException e) {
+ e.printStackTrace();
+ }
+ throw new IllegalArgumentException("Failure");
}
- private static URL nodeControllerURL() {
- Map<String, String> env = System.getenv();
- URL url = null;
-
- if (env.containsKey(NODE_CONTROLLER_LOCATION)) {
- try {
- url = new URL("http", env.get(NODE_CONTROLLER_LOCATION), 7077, "");
- } catch (MalformedURLException e) {
- e.printStackTrace();
- }
+ private static String makeRegistrationEndpoint() {
+ if (System.getenv("SP_NODE_CONTROLLER_HOST") != null) {
+ return PROTOCOL
+ + System.getenv("SP_NODE_CONTROLLER_HOST")
+ + COLON
+ + System.getenv("SP_NODE_CONTROLLER_PORT")
+ + SLASH
+ + NODE_CONTROLLER_REGISTER_SVC_URL;
} else {
- try {
- url = new URL("http", "localhost", 7077, "");
- } catch (MalformedURLException e) {
- e.printStackTrace();
- }
+ return PROTOCOL
+ + "localhost"
+ + COLON
+ + "7077"
+ + SLASH
+ + NODE_CONTROLLER_REGISTER_SVC_URL;
}
- return url;
+ }
+
+ private static String makeSvcId(String host, String serviceID) {
+ return host + SLASH + serviceID;
}
}
diff --git a/streampipes-model/src/main/java/org/apache/streampipes/model/node/NodeInfoBuilder.java b/streampipes-model/src/main/java/org/apache/streampipes/model/node/NodeInfoBuilder.java
index d8a62ea..2986d4f 100644
--- a/streampipes-model/src/main/java/org/apache/streampipes/model/node/NodeInfoBuilder.java
+++ b/streampipes-model/src/main/java/org/apache/streampipes/model/node/NodeInfoBuilder.java
@@ -60,6 +60,11 @@ public class NodeInfoBuilder {
return this;
}
+ public NodeInfoBuilder withNodeType(String nodeType) {
+ this.nodeMetadata.setNodeType(nodeType);
+ return this;
+ }
+
public NodeInfoBuilder withNodeLocation(List<String> nodeLocationTags) {
this.nodeMetadata.setNodeLocationTags(nodeLocationTags);
return this;
diff --git a/streampipes-model/src/main/java/org/apache/streampipes/model/node/NodeMetadata.java b/streampipes-model/src/main/java/org/apache/streampipes/model/node/NodeMetadata.java
index 9195389..098d02b 100644
--- a/streampipes-model/src/main/java/org/apache/streampipes/model/node/NodeMetadata.java
+++ b/streampipes-model/src/main/java/org/apache/streampipes/model/node/NodeMetadata.java
@@ -26,17 +26,27 @@ public class NodeMetadata {
private String nodeAddress;
private String nodeModel;
+ private String nodeType;
private List<String> nodeLocationTags;
public NodeMetadata() {
}
- public NodeMetadata(String nodeAddress, String nodeModel, List<String> nodeLocationTags) {
+ public NodeMetadata(String nodeAddress, String nodeModel, String nodeType, List<String> nodeLocationTags) {
this.nodeAddress = nodeAddress;
this.nodeModel = nodeModel;
+ this.nodeType = nodeType;
this.nodeLocationTags = nodeLocationTags;
}
+ public String getNodeType() {
+ return nodeType;
+ }
+
+ public void setNodeType(String nodeType) {
+ this.nodeType = nodeType;
+ }
+
public String getNodeAddress() {
return nodeAddress;
}
diff --git a/streampipes-model/src/main/java/org/apache/streampipes/model/node/PipelineElementDockerContainer.java b/streampipes-model/src/main/java/org/apache/streampipes/model/node/PipelineElementDockerContainer.java
index 17ad3f3..77641fa 100644
--- a/streampipes-model/src/main/java/org/apache/streampipes/model/node/PipelineElementDockerContainer.java
+++ b/streampipes-model/src/main/java/org/apache/streampipes/model/node/PipelineElementDockerContainer.java
@@ -1,4 +1,4 @@
-package org.apache.streampipes.model.node;/*
+/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
@@ -15,10 +15,12 @@ package org.apache.streampipes.model.node;/*
* limitations under the License.
*
*/
+package org.apache.streampipes.model.node;
import io.fogsy.empire.annotations.RdfProperty;
import io.fogsy.empire.annotations.RdfsClass;
import org.apache.streampipes.model.base.UnnamedStreamPipesEntity;
+import org.apache.streampipes.model.shared.annotation.TsModel;
import org.apache.streampipes.vocabulary.StreamPipes;
import javax.persistence.Entity;
@@ -28,6 +30,7 @@ import java.util.Map;
@RdfsClass(StreamPipes.PE_DOCKER_CONTAINER)
@Entity
+@TsModel
public class PipelineElementDockerContainer extends UnnamedStreamPipesEntity {
@RdfProperty(StreamPipes.PE_DOCKER_CONTAINER_IMAGE_URI)
@@ -68,12 +71,12 @@ public class PipelineElementDockerContainer extends UnnamedStreamPipesEntity {
public PipelineElementDockerContainer(PipelineElementDockerContainer other) {
super(other);
- this.imageURI = imageURI;
- this.containerName = containerName;
- this.serviceId = serviceId;
- this.containerPorts = containerPorts;
- this.envVars = envVars;
- this.labels = labels;
+ this.imageURI = other.getImageURI();
+ this.containerName = other.getContainerName();
+ this.serviceId = other.getServiceId();
+ this.containerPorts = other.getContainerPorts();
+ this.envVars = other.getEnvVars();
+ this.labels = other.getLabels();
}
public String getImageURI() {
diff --git a/streampipes-model/src/main/java/org/apache/streampipes/model/node/PipelineElementDockerContainerBuilder.java b/streampipes-model/src/main/java/org/apache/streampipes/model/node/PipelineElementDockerContainerBuilder.java
new file mode 100644
index 0000000..5e9222b
--- /dev/null
+++ b/streampipes-model/src/main/java/org/apache/streampipes/model/node/PipelineElementDockerContainerBuilder.java
@@ -0,0 +1,73 @@
+package org.apache.streampipes.model.node;/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+import java.util.*;
+
+public class PipelineElementDockerContainerBuilder {
+
+ private PipelineElementDockerContainer pipelineElementDockerContainer;
+ private String imageURI;
+ private String containerName;
+ private String serviceId;
+ private final String [] containerPorts;
+ private List<String> envVars;
+ private Map<String, String> labels;
+
+ public PipelineElementDockerContainerBuilder(String id) {
+ this.pipelineElementDockerContainer = new PipelineElementDockerContainer();
+ this.pipelineElementDockerContainer.setServiceId(id);
+ this.imageURI = "";
+ this.containerName = "";
+ this.containerPorts = new String[]{};
+ this.envVars = new ArrayList<>();
+ this.labels = new HashMap<>();
+ }
+
+ public static PipelineElementDockerContainerBuilder create(String id) {
+ return new PipelineElementDockerContainerBuilder(id);
+ }
+
+ public PipelineElementDockerContainerBuilder withImage(String imageUri) {
+ this.pipelineElementDockerContainer.setImageURI(imageUri);
+ return this;
+ }
+
+ public PipelineElementDockerContainerBuilder withName(String name) {
+ this.pipelineElementDockerContainer.setContainerName(name);
+ return this;
+ }
+
+ public PipelineElementDockerContainerBuilder withExposedPorts(String[] ports) {
+ this.pipelineElementDockerContainer.setContainerPorts(ports);
+ return this;
+ }
+
+ public PipelineElementDockerContainerBuilder withEnvironmentVariables(List<String> envs) {
+ this.pipelineElementDockerContainer.setEnvVars(envs);
+ return this;
+ }
+
+ public PipelineElementDockerContainerBuilder withLabels(Map<String, String> labels) {
+ this.pipelineElementDockerContainer.setLabels(labels);
+ return this;
+ }
+
+ public PipelineElementDockerContainer build() {
+ return pipelineElementDockerContainer;
+ }
+
+}
diff --git a/streampipes-node-controller-container/pom.xml b/streampipes-node-controller-container/pom.xml
index 9f274d6..aba22fd 100644
--- a/streampipes-node-controller-container/pom.xml
+++ b/streampipes-node-controller-container/pom.xml
@@ -117,6 +117,16 @@
<artifactId>streampipes-messaging-kafka</artifactId>
<version>0.68.0-SNAPSHOT</version>
</dependency>
+ <dependency>
+ <groupId>org.apache.streampipes</groupId>
+ <artifactId>streampipes-rest-shared</artifactId>
+ <version>0.68.0-SNAPSHOT</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.streampipes</groupId>
+ <artifactId>streampipes-rest-shared</artifactId>
+ <version>0.68.0-SNAPSHOT</version>
+ </dependency>
</dependencies>
<build>
diff --git a/streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/NodeControllerContainer.java b/streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/NodeControllerContainerInit.java
similarity index 81%
rename from streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/NodeControllerContainer.java
rename to streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/NodeControllerContainerInit.java
index 8210dc4..ece533b 100644
--- a/streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/NodeControllerContainer.java
+++ b/streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/NodeControllerContainerInit.java
@@ -18,6 +18,9 @@ package org.apache.streampipes.node.controller.container;
*/
import org.apache.streampipes.container.util.ConsulUtil;
+import org.apache.streampipes.model.node.PipelineElementDockerContainer;
+import org.apache.streampipes.node.controller.container.management.orchestrator.docker.DockerContainerOrchestrator;
+import org.apache.streampipes.node.controller.container.management.pe.PipelineElementManager;
import org.apache.streampipes.node.controller.container.rest.NodeControllerResourceConfig;
import org.apache.streampipes.node.controller.container.config.NodeControllerConfig;
import org.apache.streampipes.node.controller.container.management.info.NodeInfoStorage;
@@ -36,16 +39,16 @@ import java.util.Collections;
@Configuration
@EnableAutoConfiguration
@Import({ NodeControllerResourceConfig.class })
-public class NodeControllerContainer {
+public class NodeControllerContainerInit {
private static final Logger LOG =
- LoggerFactory.getLogger(NodeControllerContainer.class.getCanonicalName());
+ LoggerFactory.getLogger(NodeControllerContainerInit.class.getCanonicalName());
public static void main(String [] args) {
NodeControllerConfig nodeConfig = NodeControllerConfig.INSTANCE;
- SpringApplication app = new SpringApplication(NodeControllerContainer.class);
+ SpringApplication app = new SpringApplication(NodeControllerContainerInit.class);
app.setDefaultProperties(Collections.singletonMap("server.port", nodeConfig.getNodeControllerPort()));
app.run();
@@ -58,13 +61,17 @@ public class NodeControllerContainer {
LOG.info("Start Node Resource manager");
ResourceManager.getInstance().run();
+ if (!"true".equals(System.getenv("SP_DEBUG"))) {
+ LOG.info("Auto-deploy StreamPipes node container");
+ DockerContainerOrchestrator.getInstance().init();
+ }
+
// registration with consul here
ConsulUtil.registerNodeControllerService(
nodeConfig.getNodeServiceId(),
nodeConfig.getNodeHostName(),
nodeConfig.getNodeControllerPort()
);
-
}
@PreDestroy
diff --git a/streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/config/ConfigKeys.java b/streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/config/ConfigKeys.java
index e3ea78e..b2eef3c 100644
--- a/streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/config/ConfigKeys.java
+++ b/streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/config/ConfigKeys.java
@@ -1,4 +1,3 @@
-package org.apache.streampipes.node.controller.container.config;
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
@@ -16,20 +15,22 @@ package org.apache.streampipes.node.controller.container.config;
* limitations under the License.
*
*/
+package org.apache.streampipes.node.controller.container.config;
public class ConfigKeys {
- final static String NODE_CONTROLLER_ID_KEY = "SP_NODE_CONTROLLER_ID";
- final static String NODE_CONTROLLER_PORT_KEY = "SP_NODE_CONTROLLER_PORT";
- final static String NODE_HOST_KEY = "SP_NODE_HOST";
- final static String NODE_BROKER_HOST_KEY = "SP_NODE_BROKER_HOST";
- final static String NODE_BROKER_PORT_KEY = "SP_NODE_BROKER_PORT";
- final static String NODE_LOCATION_KEY = "SP_NODE_LOCATION";
- final static String NODE_HAS_GPU_KEY = "SP_NODE_HAS_GPU";
- final static String NODE_GPU_CUDA_CORES_KEY = "SP_NODE_GPU_CUDA_CORES";
- final static String NODE_GPU_TYPE_KEY = "SP_NODE_GPU_TYPE";
- final static String NODE_ACCESSIBLE_SENSOR_ACTUATOR_KEY = "SP_NODE_ACCESSIBLE_SENSOR_ACTUATOR";
- final static String NODE_SUPPORTED_PE_APP_ID_KEY = "SP_NODE_SUPPORTED_PE_APP_ID";
- final static String DOCKER_PRUNING_FREQ_SECS_KEY = "SP_DOCKER_PRUNING_FREQ_SECS";
- final static String NODE_RESOURCE_UPDATE_FREQ_SECS_KEY = "SP_NODE_RESOURCE_UPDATE_FREQ_SECS";
- final static String NODE_EVENT_BUFFER_SIZE = "SP_NODE_EVENT_BUFFER_SIZE";
+ public static final String NODE_TYPE = "SP_NODE_TYPE";
+ public static final String NODE_CONTROLLER_ID_KEY = "SP_NODE_CONTROLLER_ID";
+ public static final String NODE_CONTROLLER_PORT_KEY = "SP_NODE_CONTROLLER_PORT";
+ public static final String NODE_HOST_NAME_KEY = "SP_NODE_HOST";
+ public static final String NODE_BROKER_HOST_KEY = "SP_NODE_BROKER_HOST";
+ public static final String NODE_BROKER_PORT_KEY = "SP_NODE_BROKER_PORT";
+ public static final String NODE_LOCATION_KEY = "SP_NODE_LOCATION";
+ public static final String NODE_HAS_GPU_KEY = "SP_NODE_HAS_GPU";
+ public static final String NODE_GPU_CUDA_CORES_KEY = "SP_NODE_GPU_CUDA_CORES";
+ public static final String NODE_GPU_TYPE_KEY = "SP_NODE_GPU_TYPE";
+ public static final String NODE_ACCESSIBLE_SENSOR_ACTUATOR_KEY = "SP_NODE_ACCESSIBLE_SENSOR_ACTUATOR";
+ public static final String NODE_SUPPORTED_PE_APP_ID_KEY = "SP_NODE_SUPPORTED_PE_APP_ID";
+ public static final String DOCKER_PRUNING_FREQ_SECS_KEY = "SP_DOCKER_PRUNING_FREQ_SECS";
+ public static final String NODE_RESOURCE_UPDATE_FREQ_SECS_KEY = "SP_NODE_RESOURCE_UPDATE_FREQ_SECS";
+ public static final String NODE_EVENT_BUFFER_SIZE = "SP_NODE_EVENT_BUFFER_SIZE";
}
diff --git a/streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/config/NodeControllerConfig.java b/streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/config/NodeControllerConfig.java
index 4f9e32f..139d88d 100644
--- a/streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/config/NodeControllerConfig.java
+++ b/streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/config/NodeControllerConfig.java
@@ -28,87 +28,69 @@ public enum NodeControllerConfig {
private SpConfig config;
private static final String SLASH = "/";
- private static final String DOT = ".";
- private static final String DEFAULT_NODE_BROKER_NAME_SUFFIX = "broker";
- private static final String node_service_id = "node/org.apache.streampipes.node.controller";
+ private static final String NODE_SERVICE_ID = "node/org.apache.streampipes.node.controller";
private static final String DEFAULT_NODE_CONTROLLER_ID = "node-controller";
private static final int DEFAULT_NODE_CONTROLLER_PORT = 7077;
+ private static final String DEFAULT_NODE_TYPE = "edge";
private static final String DEFAULT_NODE_BROKER_HOST = "node-broker";
private static final int DEFAULT_NODE_BROKER_PORT = 1883;
private static final String DEFAULT_NODE_HOST_NAME = "host.docker.internal";
// Node controller configs
- private static final int DEFAULT_DOCKER_PRUNING_FREQ_SECS = 60;
+ private static final int DEFAULT_DOCKER_PRUNING_FREQ_SECS = 3600;
private static final int DEFAULT_NODE_RESOURCE_UPDATE_FREQ_SECS = 30;
private static final int DEFAULT_EVENT_BUFFER_SIZE = 1000;
NodeControllerConfig() {
- config = SpConfig.getSpConfig(node_service_id + SLASH + getNodeHostName());
+ config = SpConfig.getSpConfig(NODE_SERVICE_ID + SLASH + getNodeHostName());
config.register(ConfigKeys.NODE_CONTROLLER_ID_KEY, DEFAULT_NODE_CONTROLLER_ID, "node controller id");
- config.register(ConfigKeys.NODE_CONTROLLER_PORT_KEY,DEFAULT_NODE_CONTROLLER_PORT, "node controller port");
- config.register(ConfigKeys.NODE_HOST_KEY, "host.docker.internal", "node host name");
- config.register(ConfigKeys.NODE_LOCATION_KEY, "", "node location");
+ config.register(ConfigKeys.NODE_CONTROLLER_PORT_KEY, DEFAULT_NODE_CONTROLLER_PORT, "node controller port");
+ config.register(ConfigKeys.NODE_HOST_NAME_KEY, "host.docker.internal", "node host name");
+ config.register(ConfigKeys.NODE_TYPE, "edge", "node type");
config.register(ConfigKeys.NODE_BROKER_HOST_KEY, DEFAULT_NODE_BROKER_HOST, "node broker host");
config.register(ConfigKeys.NODE_BROKER_PORT_KEY, DEFAULT_NODE_BROKER_PORT, "node broker port");
-
}
public String getNodeServiceId() {
- return node_service_id;
+ return NODE_SERVICE_ID;
}
- /**
- *
- * @return node host name (physical DNS name or IP)
- */
public String getNodeHostName(){
- return envExist(ConfigKeys.NODE_HOST_KEY) ? getEnvAsString(ConfigKeys.NODE_HOST_KEY) :
- DEFAULT_NODE_HOST_NAME;
+ return getEnvOrDefault(ConfigKeys.NODE_HOST_NAME_KEY,
+ DEFAULT_NODE_HOST_NAME,
+ String.class);
}
- /**
- *
- * @return node-controller id
- */
public String getNodeControllerId() {
- return envExist(ConfigKeys.NODE_CONTROLLER_ID_KEY) ? getEnvAsString(ConfigKeys.NODE_CONTROLLER_ID_KEY) :
- DEFAULT_NODE_BROKER_HOST;
+ return getEnvOrDefault(ConfigKeys.NODE_CONTROLLER_ID_KEY,
+ DEFAULT_NODE_CONTROLLER_ID,
+ String.class);
}
- /**
- *
- * @return node-controller port
- */
public int getNodeControllerPort(){
- return envExist(ConfigKeys.NODE_CONTROLLER_PORT_KEY) ? getEnvAsInteger(ConfigKeys.NODE_CONTROLLER_PORT_KEY) :
- DEFAULT_NODE_CONTROLLER_PORT;
+ return getEnvOrDefault(
+ ConfigKeys.NODE_CONTROLLER_PORT_KEY,
+ DEFAULT_NODE_CONTROLLER_PORT,
+ Integer.class);
}
- /**
- *
- * @return node broker host
- */
public String getNodeBrokerHost() {
- return envExist(ConfigKeys.NODE_BROKER_HOST_KEY) ? getEnvAsString(ConfigKeys.NODE_BROKER_HOST_KEY) :
- DEFAULT_NODE_BROKER_HOST;
+ return getEnvOrDefault(
+ ConfigKeys.NODE_BROKER_HOST_KEY,
+ DEFAULT_NODE_BROKER_HOST,
+ String.class);
}
- /**
- *
- * @return node broker port
- */
// TODO: should be flexibly set due to node broker technology used
public int getNodeBrokerPort() {
- return envExist(ConfigKeys.NODE_BROKER_PORT_KEY) ? getEnvAsInteger(ConfigKeys.NODE_BROKER_PORT_KEY) :
- DEFAULT_NODE_BROKER_PORT;
+ return getEnvOrDefault(
+ ConfigKeys.NODE_BROKER_PORT_KEY,
+ DEFAULT_NODE_BROKER_PORT,
+ Integer.class);
}
- /**
- *
- * @return node location tags
- */
public List<String> getNodeLocations() {
return System.getenv()
.entrySet()
@@ -118,10 +100,6 @@ public enum NodeControllerConfig {
.collect(Collectors.toList());
}
- /**
- *
- * @return supported pipeline elements that can run on this node
- */
// TODO: get supported PE programmatically instead of environment variables
public List<String> getSupportedPipelineElements() {
return System.getenv()
@@ -132,10 +110,6 @@ public enum NodeControllerConfig {
.collect(Collectors.toList());
}
- /**
- *
- * @return return sensors/actuators that are accessible
- */
public List<AccessibleSensorActuatorResource> getAccessibleSensorActuator(){
return System.getenv()
.entrySet()
@@ -152,71 +126,62 @@ public enum NodeControllerConfig {
.collect(Collectors.toList());
}
- /**
- *
- * @return true if node has cuda capable GPU
- */
public boolean hasNodeGpu(){
- return envExist(ConfigKeys.NODE_HAS_GPU_KEY) ? getEnvAsBoolean(ConfigKeys.NODE_HAS_GPU_KEY) : false;
+ return getEnvOrDefault(
+ ConfigKeys.NODE_HAS_GPU_KEY,
+ false,
+ Boolean.class);
}
- /**
- *
- * @return number of cuda cores
- */
public int getGpuCores() {
- return envExist(ConfigKeys.NODE_GPU_CUDA_CORES_KEY) ? getEnvAsInteger(ConfigKeys.NODE_GPU_CUDA_CORES_KEY) : 0;
+ return getEnvOrDefault(
+ ConfigKeys.NODE_GPU_CUDA_CORES_KEY,
+ 0,
+ Integer.class);
}
- /**
- *
- * @return specific node GPU type
- */
public String getGpuType() {
- return envExist(ConfigKeys.NODE_GPU_TYPE_KEY) ? getEnvAsString(ConfigKeys.NODE_GPU_TYPE_KEY) : "n/a";
+ return getEnvOrDefault(
+ ConfigKeys.NODE_GPU_TYPE_KEY,
+ "n/a",
+ String.class);
}
- /**
- *
- * @return docker pruning frequency
- */
public int getPruningFreq() {
- return envExist(ConfigKeys.DOCKER_PRUNING_FREQ_SECS_KEY) ? getEnvAsInteger(ConfigKeys.DOCKER_PRUNING_FREQ_SECS_KEY) :
- DEFAULT_DOCKER_PRUNING_FREQ_SECS;
+ return getEnvOrDefault(
+ ConfigKeys.DOCKER_PRUNING_FREQ_SECS_KEY,
+ DEFAULT_DOCKER_PRUNING_FREQ_SECS,
+ Integer.class);
}
- /**
- *
- * @return node resource update frequency
- */
public int getNodeResourceUpdateFreqSecs() {
- return envExist(ConfigKeys.NODE_RESOURCE_UPDATE_FREQ_SECS_KEY) ? getEnvAsInteger(ConfigKeys.NODE_RESOURCE_UPDATE_FREQ_SECS_KEY) :
- DEFAULT_NODE_RESOURCE_UPDATE_FREQ_SECS;
+ return getEnvOrDefault(
+ ConfigKeys.NODE_RESOURCE_UPDATE_FREQ_SECS_KEY,
+ DEFAULT_NODE_RESOURCE_UPDATE_FREQ_SECS,
+ Integer.class);
}
public int getEventBufferSize() {
- return envExist(ConfigKeys.NODE_EVENT_BUFFER_SIZE) ? getEnvAsInteger(ConfigKeys.NODE_EVENT_BUFFER_SIZE) :
- DEFAULT_EVENT_BUFFER_SIZE;
- }
-
- private boolean envExist(String key) {
- return System.getenv(key) != null;
- }
-
- private String getEnv(String key) {
- return System.getenv(key);
- }
-
- private String getEnvAsString(String key) {
- return String.valueOf(getEnv(key));
+ return getEnvOrDefault(
+ ConfigKeys.NODE_EVENT_BUFFER_SIZE,
+ DEFAULT_EVENT_BUFFER_SIZE,
+ Integer.class);
}
- private int getEnvAsInteger(String key) {
- return Integer.parseInt(getEnv(key));
+ public String getNodeType() {
+ return getEnvOrDefault(
+ ConfigKeys.NODE_TYPE,
+ DEFAULT_NODE_TYPE,
+ String.class);
}
- private boolean getEnvAsBoolean(String key) {
- return Boolean.parseBoolean(getEnv(key));
+ private <T> T getEnvOrDefault(String k, T defaultValue, Class<T> type) {
+ if(type.equals(Integer.class)) {
+ return System.getenv(k) != null ? (T) Integer.valueOf(System.getenv(k)) : defaultValue;
+ } else if(type.equals(Boolean.class)) {
+ return System.getenv(k) != null ? (T) Boolean.valueOf(System.getenv(k)) : defaultValue;
+ } else {
+ return System.getenv(k) != null ? type.cast(System.getenv(k)) : defaultValue;
+ }
}
-
}
diff --git a/streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/management/container/ContainerStatus.java b/streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/management/IRunningInstances.java
similarity index 83%
copy from streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/management/container/ContainerStatus.java
copy to streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/management/IRunningInstances.java
index 7961894..e049a80 100644
--- a/streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/management/container/ContainerStatus.java
+++ b/streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/management/IRunningInstances.java
@@ -1,4 +1,4 @@
-package org.apache.streampipes.node.controller.container.management.container;/*
+package org.apache.streampipes.node.controller.container.management;/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
@@ -16,10 +16,13 @@ package org.apache.streampipes.node.controller.container.management.container;/*
*
*/
-public enum ContainerStatus {
- DEPLOYED,
- RUNNING,
- STOPPED,
- REMOVED,
- UNKNOWN
+public interface IRunningInstances<T> {
+
+ void add(String id, T value);
+
+ boolean isRunning(String id);
+
+ T get(String id);
+
+ void remove(String id);
}
diff --git a/streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/management/info/NodeInfoStorage.java b/streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/management/info/NodeInfoStorage.java
index 246dd37..8c6f628 100644
--- a/streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/management/info/NodeInfoStorage.java
+++ b/streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/management/info/NodeInfoStorage.java
@@ -26,8 +26,8 @@ import org.apache.streampipes.model.node.resources.interfaces.AccessibleSensorAc
import org.apache.streampipes.model.node.resources.software.SoftwareResource;
import org.apache.streampipes.model.node.resources.software.Docker;
import org.apache.streampipes.node.controller.container.config.NodeControllerConfig;
-import org.apache.streampipes.node.controller.container.management.container.DockerInfo;
-import org.apache.streampipes.node.controller.container.management.container.DockerUtils;
+import org.apache.streampipes.node.controller.container.management.orchestrator.docker.DockerInfo;
+import org.apache.streampipes.node.controller.container.management.orchestrator.docker.DockerUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import oshi.SystemInfo;
@@ -74,6 +74,7 @@ public class NodeInfoStorage {
NodeInfo nodeInfo = NodeInfoBuilder.create(getNodeControllerId())
.withNodeControllerPort(getNodeControllerPort())
+ .withNodeType(getNodeType())
.withNodeHost(getNodeHost())
.withNodeLocation(getNodeLocation())
.withNodeModel(getNodeModel())
@@ -85,6 +86,10 @@ public class NodeInfoStorage {
NodeInfoStorage.getInstance().add(nodeInfo);
}
+ private static String getNodeType() {
+ return NodeControllerConfig.INSTANCE.getNodeType();
+ }
+
private static String getNodeControllerId(){
return NodeControllerConfig.INSTANCE.getNodeControllerId();
}
diff --git a/streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/management/node/NodeJanitorManager.java b/streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/management/node/NodeJanitorManager.java
index ba36aab..74deddc 100644
--- a/streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/management/node/NodeJanitorManager.java
+++ b/streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/management/node/NodeJanitorManager.java
@@ -17,7 +17,7 @@ package org.apache.streampipes.node.controller.container.management.node;/*
*/
import org.apache.streampipes.node.controller.container.config.NodeControllerConfig;
-import org.apache.streampipes.node.controller.container.management.container.DockerUtils;
+import org.apache.streampipes.node.controller.container.management.orchestrator.docker.DockerUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
diff --git a/streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/management/container/ContainerOrchestrator.java b/streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/management/orchestrator/ContainerOrchestrator.java
similarity index 95%
rename from streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/management/container/ContainerOrchestrator.java
rename to streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/management/orchestrator/ContainerOrchestrator.java
index 5f59c9f..fda9ae2 100644
--- a/streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/management/container/ContainerOrchestrator.java
+++ b/streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/management/orchestrator/ContainerOrchestrator.java
@@ -1,4 +1,4 @@
-package org.apache.streampipes.node.controller.container.management.container;/*
+/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
@@ -15,13 +15,18 @@ package org.apache.streampipes.node.controller.container.management.container;/*
* limitations under the License.
*
*/
+package org.apache.streampipes.node.controller.container.management.orchestrator;
import org.apache.streampipes.model.node.PipelineElementDockerContainer;
public interface ContainerOrchestrator {
+ void init();
+
String deploy(PipelineElementDockerContainer p);
String remove(PipelineElementDockerContainer p);
+ String list();
+
}
diff --git a/streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/management/container/ContainerStatus.java b/streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/management/orchestrator/ContainerStatus.java
similarity index 97%
rename from streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/management/container/ContainerStatus.java
rename to streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/management/orchestrator/ContainerStatus.java
index 7961894..00a2c75 100644
--- a/streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/management/container/ContainerStatus.java
+++ b/streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/management/orchestrator/ContainerStatus.java
@@ -1,4 +1,4 @@
-package org.apache.streampipes.node.controller.container.management.container;/*
+/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
@@ -15,6 +15,7 @@ package org.apache.streampipes.node.controller.container.management.container;/*
* limitations under the License.
*
*/
+package org.apache.streampipes.node.controller.container.management.orchestrator;
public enum ContainerStatus {
DEPLOYED,
diff --git a/streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/management/relay/RunningRelayInstances.java b/streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/management/orchestrator/RunningContainerInstances.java
similarity index 63%
copy from streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/management/relay/RunningRelayInstances.java
copy to streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/management/orchestrator/RunningContainerInstances.java
index 276f488..58a88b1 100644
--- a/streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/management/relay/RunningRelayInstances.java
+++ b/streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/management/orchestrator/RunningContainerInstances.java
@@ -1,4 +1,4 @@
-package org.apache.streampipes.node.controller.container.management.relay;/*
+package org.apache.streampipes.node.controller.container.management.orchestrator;/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
@@ -16,30 +16,34 @@ package org.apache.streampipes.node.controller.container.management.relay;/*
*
*/
+import org.apache.streampipes.model.node.PipelineElementDockerContainer;
+import org.apache.streampipes.node.controller.container.management.IRunningInstances;
+
import java.util.HashMap;
import java.util.Map;
-public enum RunningRelayInstances {
+public enum RunningContainerInstances implements IRunningInstances<PipelineElementDockerContainer> {
INSTANCE;
- private final Map<String, EventRelayManager> runningInstances = new HashMap<>();
+ private final Map<String, PipelineElementDockerContainer> runningInstances = new HashMap<>();
- // TODO: persist active relays to support failure handling
- public void addRelay(String id, EventRelayManager eventRelayManager) {
- runningInstances.put(id, eventRelayManager);
+ @Override
+ public void add(String id, PipelineElementDockerContainer container) {
+ runningInstances.put(id, container);
}
+ @Override
public boolean isRunning(String id) {
return runningInstances.get(id) != null;
}
- public EventRelayManager get(String id) {
+ @Override
+ public PipelineElementDockerContainer get(String id) {
return isRunning(id) ? runningInstances.get(id) : null;
}
- public EventRelayManager removeRelay(String id) {
- EventRelayManager eventRelayManager = runningInstances.get(id);
+ @Override
+ public void remove(String id) {
runningInstances.remove(id);
- return eventRelayManager;
}
}
diff --git a/streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/management/container/DockerOrchestratorManager.java b/streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/management/orchestrator/docker/DockerContainerOrchestrator.java
similarity index 78%
rename from streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/management/container/DockerOrchestratorManager.java
rename to streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/management/orchestrator/docker/DockerContainerOrchestrator.java
index b47a218..943a5bd 100644
--- a/streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/management/container/DockerOrchestratorManager.java
+++ b/streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/management/orchestrator/docker/DockerContainerOrchestrator.java
@@ -1,4 +1,4 @@
-package org.apache.streampipes.node.controller.container.management.container;/*
+package org.apache.streampipes.node.controller.container.management.orchestrator.docker;/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
@@ -18,44 +18,48 @@ package org.apache.streampipes.node.controller.container.management.container;/*
import com.google.common.collect.ImmutableMap;
import com.google.gson.Gson;
-import com.google.gson.JsonArray;
import com.spotify.docker.client.exceptions.DockerException;
import com.spotify.docker.client.exceptions.NotFoundException;
import com.spotify.docker.client.messages.Container;
import org.apache.commons.lang.StringUtils;
import org.apache.streampipes.container.util.ConsulUtil;
import org.apache.streampipes.model.node.PipelineElementDockerContainer;
+import org.apache.streampipes.node.controller.container.management.orchestrator.ContainerOrchestrator;
+import org.apache.streampipes.node.controller.container.management.orchestrator.ContainerStatus;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.Serializable;
import java.util.*;
-public class DockerOrchestratorManager implements ContainerOrchestrator {
+public class DockerContainerOrchestrator implements ContainerOrchestrator {
private static final Logger LOG =
- LoggerFactory.getLogger(DockerOrchestratorManager.class.getCanonicalName());
+ LoggerFactory.getLogger(DockerContainerOrchestrator.class.getCanonicalName());
- private DockerUtils docker = DockerUtils.getInstance();
+ private final DockerUtils docker = DockerUtils.getInstance();
+ private static DockerContainerOrchestrator instance = null;
- private static DockerOrchestratorManager instance = null;
+ private DockerContainerOrchestrator() {}
- private DockerOrchestratorManager() {}
-
- public static DockerOrchestratorManager getInstance() {
+ public static DockerContainerOrchestrator getInstance() {
if (instance == null) {
- synchronized (DockerOrchestratorManager.class) {
+ synchronized (DockerContainerOrchestrator.class) {
if (instance == null)
- instance = new DockerOrchestratorManager();
+ instance = new DockerContainerOrchestrator();
}
}
return instance;
}
+ @Override
+ public void init() {
+ DockerNodeContainer.INSTANCE.get().forEach(this::deploy);
+ }
@Override
public String deploy(PipelineElementDockerContainer p) {
- LOG.info("Pull image and deploy pipeline element container {}", p.getContainerName());
+ LOG.info("Pull image and deploy pipeline element container {}", p.getImageURI());
Optional<Container> containerOptional = docker.getContainer(p.getContainerName());
if (!containerOptional.isPresent()) {
@@ -84,7 +88,7 @@ public class DockerOrchestratorManager implements ContainerOrchestrator {
@Override
public String remove(PipelineElementDockerContainer p) {
- LOG.info("Remove pipeline element container: {}", p.getContainerName());
+ LOG.info("Remove pipeline element container: {}", p.getImageURI());
Optional<Container> containerOptional = docker.getContainer(p.getContainerName());
if(containerOptional.isPresent()) {
@@ -110,19 +114,23 @@ public class DockerOrchestratorManager implements ContainerOrchestrator {
return new Gson().toJson(m);
}
+ @Override
public String list() {
LOG.info("List running pipeline element container");
List<Container> containerList = docker.getRunningPipelineElementContainer();
-
+ HashMap<String, Object> m = new HashMap<>();
if (containerList.size() > 0) {
- List<String> l = new ArrayList<>();
for (Container c: containerList) {
- l.add(StringUtils.remove(c.names().get(0), "/"));
+ m.put("containerName", StringUtils.remove(c.names().get(0), "/"));
+ m.put("containerId", c.id());
+ m.put("image", c.image());
+ m.put("state", c.state());
+ m.put("status", c.status());
+ m.put("labels", c.labels());
}
- return new Gson().toJson(l);
}
- return new Gson().toJson(new JsonArray());
+ return new Gson().toJson(m);
}
private String deployPipelineElementContainer(PipelineElementDockerContainer p) throws Exception {
diff --git a/streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/management/container/DockerInfo.java b/streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/management/orchestrator/docker/DockerInfo.java
similarity index 99%
rename from streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/management/container/DockerInfo.java
rename to streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/management/orchestrator/docker/DockerInfo.java
index 7a2d887..50b5cef 100644
--- a/streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/management/container/DockerInfo.java
+++ b/streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/management/orchestrator/docker/DockerInfo.java
@@ -1,4 +1,4 @@
-package org.apache.streampipes.node.controller.container.management.container;/*
+/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
@@ -15,6 +15,7 @@ package org.apache.streampipes.node.controller.container.management.container;/*
* limitations under the License.
*
*/
+package org.apache.streampipes.node.controller.container.management.orchestrator.docker;
public class DockerInfo {
diff --git a/streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/management/orchestrator/docker/DockerNodeContainer.java b/streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/management/orchestrator/docker/DockerNodeContainer.java
new file mode 100644
index 0000000..ed5dd38
--- /dev/null
+++ b/streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/management/orchestrator/docker/DockerNodeContainer.java
@@ -0,0 +1,67 @@
+package org.apache.streampipes.node.controller.container.management.orchestrator.docker;/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+import org.apache.streampipes.model.node.PipelineElementDockerContainer;
+import org.apache.streampipes.model.node.PipelineElementDockerContainerBuilder;
+import org.apache.streampipes.node.controller.container.config.NodeControllerConfig;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+
+public enum DockerNodeContainer {
+ INSTANCE;
+
+ public List<PipelineElementDockerContainer> get() {
+ //NodeInfoStorage.getInstance().retrieveNodeInfo().getNodeResources().getHardwareResource().getCpu().getArch();
+ List<PipelineElementDockerContainer> nodeContainers = new ArrayList<>();
+
+ // PE: pipeline element JVM all container
+ PipelineElementDockerContainer processor =
+ PipelineElementDockerContainerBuilder.create("pe/org.apache.streampipes.processors.all.jvm")
+ .withImage("apachestreampipes/pipeline-elements-all-jvm:0.68.0-SNAPSHOT")
+ .withName("streampipes_pipeline-elements-all-jvm")
+ .withExposedPorts(new String[]{"7023"})
+ .withEnvironmentVariables(Arrays.asList(
+ "SP_NODE_ID=" + NodeControllerConfig.INSTANCE.getNodeBrokerHost(),
+ "SP_NODE_CONTROLLER_HOST=" + NodeControllerConfig.INSTANCE.getNodeControllerId(),
+ "SP_NODE_CONTROLLER_PORT=" + NodeControllerConfig.INSTANCE.getNodeControllerPort()
+ ))
+ .withLabels(new HashMap<String,String>(){{
+ put("org.apache.streampipes.container.type", "pipeline-element");
+ put("org.apache.streampipes.container.node.type", "edge");}})
+ .build();
+
+ // Node broker: Mosquitto
+ PipelineElementDockerContainer nodeBroker =
+ PipelineElementDockerContainerBuilder.create("pe/org.apache.streampipes.node.broker")
+ .withImage("eclipse-mosquitto:1.6.12")
+ .withName("streampipes_node-broker")
+ .withExposedPorts(new String[]{"1883"})
+ .withLabels(new HashMap<String,String>(){{
+ put("org.apache.streampipes.pe.container.type", "broker");
+ put("org.apache.streampipes.pe.container.location", "edge");}})
+ .build();
+
+ nodeContainers.add(processor);
+ nodeContainers.add(nodeBroker);
+
+ return nodeContainers;
+ }
+}
diff --git a/streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/management/container/DockerUtils.java b/streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/management/orchestrator/docker/DockerUtils.java
similarity index 93%
rename from streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/management/container/DockerUtils.java
rename to streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/management/orchestrator/docker/DockerUtils.java
index 1a91296..8793685 100644
--- a/streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/management/container/DockerUtils.java
+++ b/streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/management/orchestrator/docker/DockerUtils.java
@@ -1,4 +1,4 @@
-package org.apache.streampipes.node.controller.container.management.container;/*
+package org.apache.streampipes.node.controller.container.management.orchestrator.docker;/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
@@ -16,8 +16,6 @@ package org.apache.streampipes.node.controller.container.management.container;/*
*
*/
-import com.google.gson.Gson;
-import com.google.gson.GsonBuilder;
import com.google.gson.JsonObject;
import com.google.gson.JsonParser;
import com.spotify.docker.client.DefaultDockerClient;
@@ -37,15 +35,12 @@ import java.util.*;
import java.util.stream.Collectors;
public class DockerUtils {
-
- private static final Logger LOG =
- LoggerFactory.getLogger(DockerUtils.class.getCanonicalName());
+ private static final Logger LOG = LoggerFactory.getLogger(DockerUtils.class.getCanonicalName());
private static final String SP_NETWORK = "spnet";
private static final String SP_CONTAINER_PREFIX = "streampipes_";
private static final String DOCKER_UNIX_SOCK = "/var/run/docker.sock";
private static final String BLANK_SPACE = " ";
-
private static DockerUtils instance;
private static DockerClient docker;
@@ -108,10 +103,10 @@ public class DockerUtils {
private String getContainerIdByName(String containerName) {
LOG.info("Get containerId by container name");
try {
- List<Container> containerList = docker.listContainers();
- return containerList.stream()
+ return docker.listContainers()
+ .stream()
.findAny()
- .filter(c -> c.names().get(0).contains(containerName))
+ .filter(c -> c.names().get(0).contains(verifyContainerName(containerName)))
.get()
.id();
} catch (DockerException | InterruptedException e) {
@@ -123,16 +118,19 @@ public class DockerUtils {
public String createContainer(PipelineElementDockerContainer p) {
LOG.info("Create pipeline element container {}", p.getContainerName());
- ContainerCreation creation;
try {
- creation = docker.createContainer(getContainerConfig(p), SP_CONTAINER_PREFIX + p.getContainerName());
- return creation.id();
+ return docker.createContainer(getContainerConfig(p), verifyContainerName(p.getContainerName())).id();
} catch (DockerException | InterruptedException e) {
LOG.error("Pipeline element container could not be created. {}", e.toString());
}
return "";
}
+ private String verifyContainerName(String containerName) {
+ return containerName.startsWith(SP_CONTAINER_PREFIX) ?
+ containerName : SP_CONTAINER_PREFIX + containerName;
+ }
+
private ContainerConfig getContainerConfig(PipelineElementDockerContainer p) {
return ContainerConfig.builder()
.hostname(p.getContainerName())
@@ -146,10 +144,9 @@ public class DockerUtils {
}
public Optional<Container> getContainer(String containerName) {
- LOG.info("Get container: {}", containerName);
- List<Container> containers = getContainerList();
- return containers.stream()
- .filter(c -> c.names().get(0).contains(containerName))
+ return getContainerList()
+ .stream()
+ .filter(c -> c.names().get(0).contains(verifyContainerName(containerName)))
.findAny();
}
diff --git a/streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/management/pe/PipelineElementManager.java b/streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/management/pe/PipelineElementManager.java
index f574410..cc648f4 100644
--- a/streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/management/pe/PipelineElementManager.java
+++ b/streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/management/pe/PipelineElementManager.java
@@ -16,24 +16,29 @@ package org.apache.streampipes.node.controller.container.management.pe;/*
*
*/
-import com.google.gson.Gson;
-import com.google.gson.JsonSyntaxException;
import org.apache.http.client.fluent.Request;
import org.apache.http.client.fluent.Response;
import org.apache.http.entity.ContentType;
-import org.apache.streampipes.model.pipeline.PipelineElementStatus;
+import org.apache.http.entity.StringEntity;
+import org.apache.streampipes.container.model.node.InvocableRegistration;
+import org.apache.streampipes.node.controller.container.management.info.NodeInfoStorage;
+import org.apache.streampipes.serializers.json.JacksonSerializer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
+import java.util.Collections;
public class PipelineElementManager {
private static final Logger LOG =
LoggerFactory.getLogger(PipelineElementManager.class.getCanonicalName());
+ private static final String PROTOCOL = "http://";
+ private static final String COLON = ":";
+ private static final String SLASH = "/";
+ private static final String ENV_CONSUL_LOCATION = "CONSUL_LOCATION";
private static final Integer CONNECT_TIMEOUT = 10000;
-
private static PipelineElementManager instance = null;
private PipelineElementManager() {}
@@ -49,11 +54,34 @@ public class PipelineElementManager {
}
/**
- * registeration of newly started pipeline element runtime container
+ * Register pipeline element container
+ *
+ * @param invocableRegistration
*/
+ public void registerPipelineElements(InvocableRegistration invocableRegistration) {
+ try {
+ Request.Put(makeConsulRegistrationEndpoint())
+ .addHeader("accept", "application/json")
+ .body(new StringEntity(JacksonSerializer
+ .getObjectMapper()
+ .writeValueAsString(invocableRegistration.getConsulServiceRegistrationBody())))
+ .execute();
+
+ // TODO: persistent storage to survive failures
+ NodeInfoStorage.getInstance()
+ .retrieveNodeInfo()
+ .setSupportedPipelineElementAppIds(invocableRegistration.getSupportedPipelineElementAppIds());
+ } catch (IOException e) {
+ e.printStackTrace();
+ }
+ }
/**
- * invokes pipeline elements when pipeline is started
+ * Invoke pipeline elements when pipeline is started
+ *
+ * @param pipelineElementEndpoint
+ * @param payload
+ * @return
*/
public String invokePipelineElement(String pipelineElementEndpoint, String payload) {
LOG.info("Invoking element: {}", pipelineElementEndpoint);
@@ -78,4 +106,28 @@ public class PipelineElementManager {
}
+ public void unregisterPipelineElements(){
+ NodeInfoStorage.getInstance()
+ .retrieveNodeInfo()
+ .setSupportedPipelineElementAppIds(Collections.emptyList());
+ }
+
+ private String makeConsulRegistrationEndpoint() {
+ if (System.getenv(ENV_CONSUL_LOCATION) != null) {
+ return PROTOCOL
+ + System.getenv(ENV_CONSUL_LOCATION)
+ + COLON
+ + "8500"
+ + SLASH
+ + "v1/agent/service/register";
+ } else {
+ return PROTOCOL
+ + "localhost"
+ + COLON
+ + "8500"
+ + SLASH
+ + "v1/agent/service/register";
+ }
+ }
+
}
diff --git a/streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/management/relay/RunningRelayInstances.java b/streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/management/relay/RunningRelayInstances.java
index 276f488..09e373f 100644
--- a/streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/management/relay/RunningRelayInstances.java
+++ b/streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/management/relay/RunningRelayInstances.java
@@ -16,30 +16,34 @@ package org.apache.streampipes.node.controller.container.management.relay;/*
*
*/
+import org.apache.streampipes.node.controller.container.management.IRunningInstances;
+
import java.util.HashMap;
import java.util.Map;
-public enum RunningRelayInstances {
+public enum RunningRelayInstances implements IRunningInstances<EventRelayManager> {
INSTANCE;
private final Map<String, EventRelayManager> runningInstances = new HashMap<>();
// TODO: persist active relays to support failure handling
- public void addRelay(String id, EventRelayManager eventRelayManager) {
+ @Override
+ public void add(String id, EventRelayManager eventRelayManager) {
runningInstances.put(id, eventRelayManager);
}
+ @Override
public boolean isRunning(String id) {
return runningInstances.get(id) != null;
}
+ @Override
public EventRelayManager get(String id) {
return isRunning(id) ? runningInstances.get(id) : null;
}
- public EventRelayManager removeRelay(String id) {
- EventRelayManager eventRelayManager = runningInstances.get(id);
+ @Override
+ public void remove(String id) {
runningInstances.remove(id);
- return eventRelayManager;
}
}
diff --git a/streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/management/resource/ResourceManager.java b/streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/management/resource/ResourceManager.java
index e9b5c31..0affe45 100644
--- a/streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/management/resource/ResourceManager.java
+++ b/streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/management/resource/ResourceManager.java
@@ -18,7 +18,6 @@ package org.apache.streampipes.node.controller.container.management.resource;/*
import com.google.gson.Gson;
import org.apache.streampipes.node.controller.container.config.NodeControllerConfig;
-import org.apache.streampipes.node.controller.container.management.container.DockerUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import oshi.SystemInfo;
diff --git a/streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/rest/DebugRelayResource.java b/streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/rest/DebugRelayResource.java
index 0ff1ca6..00dface 100644
--- a/streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/rest/DebugRelayResource.java
+++ b/streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/rest/DebugRelayResource.java
@@ -35,7 +35,7 @@ public class DebugRelayResource extends AbstractNodeContainerResource {
System.out.println(msg);
EventRelayManager eventRelayManager = new EventRelayManager();
eventRelayManager.start();
- RunningRelayInstances.INSTANCE.addRelay(eventRelayManager.getRelayedTopic(), eventRelayManager);
+ RunningRelayInstances.INSTANCE.add(eventRelayManager.getRelayedTopic(), eventRelayManager);
return ok();
}
diff --git a/streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/rest/PELifeCycleResource.java b/streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/rest/InvocableManagementResource.java
similarity index 70%
rename from streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/rest/PELifeCycleResource.java
rename to streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/rest/InvocableManagementResource.java
index 091331b..e1e302c 100644
--- a/streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/rest/PELifeCycleResource.java
+++ b/streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/rest/InvocableManagementResource.java
@@ -1,4 +1,4 @@
-package org.apache.streampipes.node.controller.container.rest;/*
+/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
@@ -15,18 +15,21 @@ package org.apache.streampipes.node.controller.container.rest;/*
* limitations under the License.
*
*/
+package org.apache.streampipes.node.controller.container.rest;
import org.apache.streampipes.commons.exceptions.SpRuntimeException;
+import org.apache.streampipes.container.model.node.InvocableRegistration;
import org.apache.streampipes.container.transform.Transformer;
import org.apache.streampipes.container.util.Util;
import org.apache.streampipes.model.base.InvocableStreamPipesEntity;
import org.apache.streampipes.model.graph.DataProcessorInvocation;
import org.apache.streampipes.model.graph.DataSinkInvocation;
import org.apache.streampipes.model.node.PipelineElementDockerContainer;
-import org.apache.streampipes.node.controller.container.management.container.DockerOrchestratorManager;
+import org.apache.streampipes.node.controller.container.management.orchestrator.docker.DockerContainerOrchestrator;
import org.apache.streampipes.node.controller.container.management.pe.PipelineElementManager;
import org.apache.streampipes.node.controller.container.management.relay.EventRelayManager;
import org.apache.streampipes.node.controller.container.management.relay.RunningRelayInstances;
+import org.apache.streampipes.serializers.json.JacksonSerializer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -36,64 +39,44 @@ import javax.ws.rs.core.Response;
import java.io.IOException;
@Path("/node/container")
-public class PELifeCycleResource<I extends InvocableStreamPipesEntity> extends AbstractNodeContainerResource{
- private static final Logger LOG =
- LoggerFactory.getLogger(PELifeCycleResource.class.getCanonicalName());
+public class InvocableManagementResource<I extends InvocableStreamPipesEntity> extends AbstractNodeContainerResource{
+ private static final Logger LOG = LoggerFactory.getLogger(InvocableManagementResource.class.getCanonicalName());
- private static final String COLON = ":";
-
- /**
- *
- * @return a list of currently running Docker Containers
- */
@GET
@Produces(MediaType.APPLICATION_JSON)
public Response getPipelineElementContainer(){
- return ok(DockerOrchestratorManager.getInstance().list());
+ return ok(DockerContainerOrchestrator.getInstance().list());
}
- /**
- * Deploys a new Docker Container
- *
- * @param container to be deployed
- * @return deployment status
- */
@POST
@Path("/deploy")
@Consumes(MediaType.APPLICATION_JSON)
- @Produces(MediaType.APPLICATION_JSON)
public Response deployPipelineElementContainer(PipelineElementDockerContainer container) {
- return ok(DockerOrchestratorManager.getInstance().deploy(container));
+ return ok(DockerContainerOrchestrator.getInstance().deploy(container));
}
- /**
- * Register pipeline elements in consul
- * @param message
- * @return
- */
@POST
@Path("/register")
- @Consumes(MediaType.APPLICATION_JSON)
- @Produces(MediaType.APPLICATION_JSON)
- public Response registerPipelineElementInConsul(String message) {
- // TODO implement
+ public void register(String body) {
+ try {
+ InvocableRegistration invocableRegistration = JacksonSerializer
+ .getObjectMapper()
+ .readValue(body, InvocableRegistration.class);
-// HttpClient client = HttpClients.custom().build();
-// HttpUriRequest request = RequestBuilder.put()
-// .setUri("http://localhost:8500/v1/agent/service/register")
-// .setEntity(new StringEntity(message))
-// .setHeader(HttpHeaders.CONTENT_TYPE, "application/json")
-// .build();
-// client.execute(request);
+ // register pipeline elements at consul and node controller
+ PipelineElementManager.getInstance().registerPipelineElements(invocableRegistration);
+ LOG.info("Sucessfully registered pipeline element container");
- return ok();
+ } catch (IOException e) {
+ LOG.error("Could not register pipeline element container - " + e.toString());
+ }
}
@POST
@Path("/invoke/{identifier}/{elementId}")
@Consumes(MediaType.APPLICATION_JSON)
@Produces(MediaType.APPLICATION_JSON)
- public Response invokePipelineElement(@PathParam("identifier") String identifier, @PathParam("elementId") String elementId, String payload) {
+ public Response invoke(@PathParam("identifier") String identifier, @PathParam("elementId") String elementId, String payload) {
// TODO implement
String pipelineElementEndpoint;
@@ -105,7 +88,7 @@ public class PELifeCycleResource<I extends InvocableStreamPipesEntity> extends A
// TODO: start event relay to remote broker
// EventRelayManager eventRelayManager = new EventRelayManager(graph);
// eventRelayManager.start();
-// RunningRelayInstances.INSTANCE.addRelay(eventRelayManager.getRelayedTopic(), eventRelayManager);
+// RunningRelayInstances.INSTANCE.add(eventRelayManager.getRelayedTopic(), eventRelayManager);
PipelineElementManager.getInstance().invokePipelineElement(graph.getBelongsTo(), payload);
}
@@ -141,8 +124,10 @@ public class PELifeCycleResource<I extends InvocableStreamPipesEntity> extends A
// TODO implement
// TODO: stop event relay to remote broker
- EventRelayManager relay = RunningRelayInstances.INSTANCE.removeRelay(appId);
+ EventRelayManager relay = RunningRelayInstances.INSTANCE.get(appId);
+ assert relay != null;
relay.stop();
+ RunningRelayInstances.INSTANCE.remove(appId);
return ok();
}
@@ -150,9 +135,9 @@ public class PELifeCycleResource<I extends InvocableStreamPipesEntity> extends A
@DELETE
@Path("/remove")
@Consumes(MediaType.APPLICATION_JSON)
- @Produces(MediaType.APPLICATION_JSON)
public Response removePipelineElementContainer(PipelineElementDockerContainer container) {
- return ok(DockerOrchestratorManager.getInstance().remove(container));
+ PipelineElementManager.getInstance().unregisterPipelineElements();
+ return ok(DockerContainerOrchestrator.getInstance().remove(container));
}
}
diff --git a/streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/rest/NodeControllerResourceConfig.java b/streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/rest/NodeControllerResourceConfig.java
index 68930be..2f6f06d 100644
--- a/streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/rest/NodeControllerResourceConfig.java
+++ b/streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/rest/NodeControllerResourceConfig.java
@@ -26,7 +26,7 @@ public class NodeControllerResourceConfig extends ResourceConfig {
public NodeControllerResourceConfig() {
register(HealthCheckResource.class);
register(InfoStatusResource.class);
- register(PELifeCycleResource.class);
+ register(InvocableManagementResource.class);
// TODO remove later - only for local relay tests
register(DebugRelayResource.class);
diff --git a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/http/GraphSubmitter.java b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/http/GraphSubmitter.java
index 1899ee1..e29de56 100644
--- a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/http/GraphSubmitter.java
+++ b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/http/GraphSubmitter.java
@@ -54,9 +54,9 @@ public class GraphSubmitter {
graphs.forEach(g -> status.addPipelineElementStatus(new HttpRequestBuilder(g, g.getBelongsTo()).invoke()));
if (status.getElementStatus().stream().allMatch(PipelineElementStatus::isSuccess)) {
- dataSets.forEach(dataSet ->
+ dataSets.forEach(dataSet ->
status.addPipelineElementStatus
- (new HttpRequestBuilder(dataSet, dataSet.getUri()).invoke()));
+ (new HttpRequestBuilder(dataSet, dataSet.getElementId()).invoke()));
}
status.setSuccess(status.getElementStatus().stream().allMatch(PipelineElementStatus::isSuccess));
@@ -91,8 +91,8 @@ public class GraphSubmitter {
status.setPipelineId(pipelineId);
status.setPipelineName(pipelineName);
- graphs.forEach(g -> status.addPipelineElementStatus(new HttpRequestBuilder(g, g.getUri()).detach()));
- dataSets.forEach(dataSet -> status.addPipelineElementStatus(new HttpRequestBuilder(dataSet, dataSet.getUri() +
+ graphs.forEach(g -> status.addPipelineElementStatus(new HttpRequestBuilder(g, g.getElementId()).detach()));
+ dataSets.forEach(dataSet -> status.addPipelineElementStatus(new HttpRequestBuilder(dataSet, dataSet.getElementId() +
"/" +dataSet.getDatasetInvocationId())
.detach()));
status.setSuccess(status.getElementStatus().stream().allMatch(PipelineElementStatus::isSuccess));
diff --git a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/matching/InvocationGraphBuilder.java b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/matching/InvocationGraphBuilder.java
index 80c9acd..f1fe90f 100644
--- a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/matching/InvocationGraphBuilder.java
+++ b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/matching/InvocationGraphBuilder.java
@@ -119,9 +119,9 @@ public class InvocationGraphBuilder {
}
}
} else {
- t.getInputStreams()
- .get(getIndex(source.getDOM(), t))
- .setEventGrounding(inputGrounding);
+ t.getInputStreams()
+ .get(getIndex(source.getDOM(), t))
+ .setEventGrounding(inputGrounding);
}
} else {
t.getInputStreams()
diff --git a/ui/src/app/editor/dialog/save-pipeline/save-pipeline.component.html b/ui/src/app/editor/dialog/save-pipeline/save-pipeline.component.html
index fb900b7..d045ad5 100644
--- a/ui/src/app/editor/dialog/save-pipeline/save-pipeline.component.html
+++ b/ui/src/app/editor/dialog/save-pipeline/save-pipeline.component.html
@@ -49,7 +49,7 @@
<mat-slide-toggle color="primary" [(ngModel)]="advancedSettings">
Choose deployment options
</mat-slide-toggle>
- <mat-divider *ngIf="advancedSettings" style="margin: 2em 0 2em 0;"></mat-divider>
+<!-- <mat-divider *ngIf="advancedSettings" style="margin: 2em 0 2em 0;"></mat-divider>-->
<!-- <div *ngIf="advancedSettings">-->
<!-- <div fxFlex="100" fxLayout="row">-->
<!-- <div fxFlex="50" fxLayout="row" fxLayoutAlign="center center">-->
diff --git a/ui/src/app/editor/dialog/save-pipeline/save-pipeline.component.ts b/ui/src/app/editor/dialog/save-pipeline/save-pipeline.component.ts
index cf9b9f3..b40f136 100644
--- a/ui/src/app/editor/dialog/save-pipeline/save-pipeline.component.ts
+++ b/ui/src/app/editor/dialog/save-pipeline/save-pipeline.component.ts
@@ -122,7 +122,9 @@ export class SavePipelineComponent implements OnInit {
this.deploymentOptions[pipelineElement.appId] = [];
this.deploymentOptions[pipelineElement.appId].push(this.makeDefaultNodeInfo());
edgeNodes.forEach(nodeInfo => {
- if (nodeInfo.supportedPipelineElementAppIds.some(appId => appId === pipelineElement.appId)) {
+ // only show nodes that actually have supported pipeline elements registered
+ if (nodeInfo.supportedPipelineElementAppIds.length != 0 &&
+ nodeInfo.supportedPipelineElementAppIds.some(appId => appId === pipelineElement.appId)) {
this.deploymentOptions[pipelineElement.appId].push(nodeInfo);
}
})