You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@streampipes.apache.org by wi...@apache.org on 2021/01/08 23:00:57 UTC
[incubator-streampipes] 02/02: merged dev and refactored node
controller api
This is an automated email from the ASF dual-hosted git repository.
wiener pushed a commit to branch edge-extensions
in repository https://gitbox.apache.org/repos/asf/incubator-streampipes.git
commit 503ebb814caa093970b05dbeb4add7584c0d360c
Merge: 9049d91 5efd810
Author: Patrick Wiener <wi...@fzi.de>
AuthorDate: Fri Jan 8 23:59:18 2021 +0100
merged dev and refactored node controller api
README.md | 6 +-
pom.xml | 36 ++++-
.../backend/StreamPipesBackendApplication.java | 54 ++++++-
.../backend/StreamPipesResourceConfig.java | 1 +
streampipes-code-generation/pom.xml | 9 ++
streampipes-commons/pom.xml | 8 -
.../java/org/apache/streampipes/commons/Utils.java | 36 -----
.../master/management/AdapterMasterManagement.java | 15 +-
.../container/master/rest/SourcesResource.java | 33 ++--
.../connect/container/worker/utils/Utils.java | 38 +----
streampipes-connect-container/pom.xml | 2 +-
.../connect/init/AdapterContainerConfig.java | 4 +-
.../connect/management/AdapterDeserializer.java | 41 -----
.../connect/management/AdapterUtilsTest.java | 2 +-
.../utils/ConnectContainerResourceTest.java | 175 ---------------------
streampipes-connect/pom.xml | 2 +-
.../extensions/ExtensionsResourceConfig.java | 9 +-
.../PipelineElementContainerResourceConfig.java | 19 +--
streampipes-container/pom.xml | 8 +-
...t.java => AbstractPipelineElementResource.java} | 132 ++++++++--------
...a => DataProcessorPipelineElementResource.java} | 16 +-
...t.java => DataSinkPipelineElementResource.java} | 56 +------
...java => DataSourcePipelineElementResource.java} | 53 +++----
....java => InvocablePipelineElementResource.java} | 58 +++----
...t.java => PipelineElementTemplateResource.java} | 5 +-
.../container/init/DeclarersSingleton.java | 16 +-
.../container/transform/Transformer.java | 49 ------
.../streampipes/container/util/DeclarerUtils.java | 46 ------
.../container/util/NodeControllerUtil.java | 2 +-
.../apache/streampipes/container/util/Util.java | 17 +-
.../org/streampipes/container/util/UtilTest.java | 46 ------
streampipes-model/pom.xml | 12 +-
.../org/apache/streampipes/model/Response.java | 30 ++--
.../java/org/apache/streampipes/model/Tuple2.java | 30 ++--
.../model/base/AbstractStreamPipesEntity.java | 27 +++-
.../model/base/NamedStreamPipesEntity.java | 17 --
.../model/base/UnnamedStreamPipesEntity.java | 17 --
.../streampipes/model/pipeline/Pipeline.java | 10 ++
.../apache/streampipes/model/util/ModelUtils.java | 14 --
.../org/apache/streampipes/model/ResponseTest.java | 45 ------
streampipes-node-controller-container/pom.xml | 1 -
.../controller/container/NodeControllerInit.java | 2 +-
.../container/{rest => api}/AbstractResource.java | 2 +-
.../api/AdapterDataStreamRelayResource.java | 46 ++++++
.../container/{rest => api}/ConnectResource.java | 5 +-
.../container/api/ContainerResource.java | 50 ++++++
.../api/DataProcessorPipelineElementResource.java | 20 ++-
.../api/DataSinkPipelineElementResource.java | 20 +--
.../{rest => api}/HealthCheckResource.java | 4 +-
.../{rest => api}/InfoStatusResource.java | 37 +----
.../container/api/InvocableEntityResource.java | 107 +++++++++++++
.../NodeControllerResourceConfig.java | 8 +-
.../management/connect/ConnectManager.java | 1 -
.../management/pe/InvocableElementManager.java | 34 ++--
.../management/pe/InvocableLifeCycle.java | 2 +-
.../management/relay/DataStreamRelayManager.java | 109 +++++++++++++
.../container/rest/DataStreamRelayResource.java | 80 ----------
.../container/rest/InvocableEntityResource.java | 160 -------------------
streampipes-pipeline-management/pom.xml | 10 ++
.../manager/endpoint/EndpointItemFetcher.java | 14 +-
.../manager/endpoint/EndpointItemParser.java | 20 ++-
.../manager/execution/http/HttpRequestBuilder.java | 23 ++-
.../http/InvocableEntityUrlGenerator.java | 11 +-
.../matching/output/ListOutputSchemaGenerator.java | 4 +-
.../matching/output/PropertyDuplicateRemover.java | 8 +-
.../manager/node/NodeClusterManager.java | 2 +-
.../manager/verification/ElementVerifier.java | 22 ++-
.../manager/verification/SepVerifier.java | 5 +-
.../verification/extractor/TypeExtractor.java | 63 ++++----
.../streampipes/manager/matching/v2/TestUtils.java | 16 +-
streampipes-rest-shared/pom.xml | 2 +-
.../streampipes/rest/shared/util/SpMediaType.java | 5 +-
streampipes-rest/pom.xml | 10 ++
.../rest/api/IMeasurementUnitResource.java | 2 +
.../rest/impl/AbstractRestInterface.java | 16 +-
.../apache/streampipes/rest/impl/Deployment.java | 32 ++--
.../rest/impl/MeasurementUnitResource.java | 9 ++
.../rest}/serializer/JsonLdProvider.java | 4 +-
.../apache/streampipes/rest}/util/JsonLdUtils.java | 7 +-
.../streampipes/rest}/util/JsonLdUtilsTest.java | 4 +-
.../streampipes/sdk/helpers/EpProperties.java | 1 -
.../pom.xml | 13 +-
.../serializers/json/AdapterSerializer.java | 0
.../serializers/json/AdapterTypeAdapter.java | 0
.../serializers/json/EcTypeAdapter.java | 0
.../serializers/json/EpaTypeAdapter.java | 0
.../serializers/json/GsonSerializer.java | 0
.../serializers/json/JacksonSerializer.java | 0
.../serializers/json/JsonLdSerializer.java | 0
.../serializers/json/PeTypeAdapter.java | 0
.../json/ProcessingElementSerializer.java | 0
.../serializers/json/RangeSerializer.java | 34 ++--
.../json/RuntimeTypeAdapterFactory.java | 0
.../json/TransformationRuleSerializer.java | 0
.../serializers/json/UriSerializer.java | 0
.../apache/streampipes/serializers/json/Utils.java | 0
.../pom.xml | 20 ++-
.../jsonld/CustomAnnotationProvider.java | 0
.../serializers/jsonld/JsonLdTransformer.java | 0
.../serializers/jsonld/JsonLdUtils.java | 52 ++----
.../serializers/jsonld/RdfTransformer.java | 0
streampipes-storage-api/pom.xml | 8 +
streampipes-storage-couchdb/pom.xml | 2 +-
.../storage/couchdb/dao/AbstractDao.java | 6 +-
.../storage/couchdb/dao/PersistCommand.java | 8 +-
streampipes-storage-rdf4j/pom.xml | 2 +-
ui/package.json | 50 +++---
.../configuration/configuration.component.spec.ts | 8 +-
.../consul-configs-password.component.spec.ts | 10 +-
.../consul-service.component.spec.ts | 20 +--
.../edit-unit-transformation.component.css | 4 -
.../edit-unit-transformation.component.html | 2 +-
.../apis/measurement-units.service.ts | 32 ++--
ui/src/app/platform-services/platform.module.ts | 4 +-
ui/src/tsconfig.app.json | 2 +-
ui/src/tsconfig.spec.json | 2 +-
ui/tsconfig.base.json | 26 ---
ui/tsconfig.json | 39 +++--
ui/tsconfig.spec.json | 2 +-
119 files changed, 1004 insertions(+), 1516 deletions(-)
diff --cc pom.xml
index 7f2520f,ec9c0d5..28b9c5c
--- a/pom.xml
+++ b/pom.xml
@@@ -56,8 -56,7 +56,9 @@@
<geojson-jackson.version>1.8</geojson-jackson.version>
<guava.version>27.1-jre</guava.version>
<hibernate-validator.version>6.1.5.Final</hibernate-validator.version>
+ <httpclient.version>4.5.13</httpclient.version>
+ <hawtbuf.version>1.11</hawtbuf.version>
+ <httpclient.version>4.5.10</httpclient.version>
<httpcore.version>4.4.9</httpcore.version>
<httpcore-osgi.version>4.4.9</httpcore-osgi.version>
<influxdb.version>2.16</influxdb.version>
@@@ -327,6 -326,6 +328,11 @@@
</dependency>
<dependency>
<groupId>org.apache.httpcomponents</groupId>
++ <artifactId>httpclient-cache</artifactId>
++ <version>${httpclient.version}</version>
++ </dependency>
++ <dependency>
++ <groupId>org.apache.httpcomponents</groupId>
<artifactId>httpcore-osgi</artifactId>
<version>${httpcore-osgi.version}</version>
</dependency>
diff --cc streampipes-container/src/main/java/org/apache/streampipes/container/api/AbstractPipelineElementResource.java
index 9018628,5b87557..1797897
--- a/streampipes-container/src/main/java/org/apache/streampipes/container/api/AbstractPipelineElementResource.java
+++ b/streampipes-container/src/main/java/org/apache/streampipes/container/api/AbstractPipelineElementResource.java
@@@ -195,14 -181,10 +181,14 @@@ public abstract class AbstractPipelineE
}
} else if (desc instanceof ConsumableStreamPipesEntity) {
Collection<TransportProtocol> supportedProtocols =
- declarersSingleton.getSupportedProtocols();
+ DeclarersSingleton.getInstance().getSupportedProtocols();
Collection<TransportFormat> supportedFormats =
- declarersSingleton.getSupportedFormats();
+ DeclarersSingleton.getInstance().getSupportedFormats();
- ((ConsumableStreamPipesEntity) desc).setElementEndpointHostname(declarersSingleton.getHostname());
- ((ConsumableStreamPipesEntity) desc).setElementEndpointPort(declarersSingleton.getPort());
- ((ConsumableStreamPipesEntity) desc).setElementEndpointServiceName(declarersSingleton.getServiceName());
++ ((ConsumableStreamPipesEntity) desc).setElementEndpointHostname(DeclarersSingleton.getInstance().getHostname());
++ ((ConsumableStreamPipesEntity) desc).setElementEndpointPort(DeclarersSingleton.getInstance().getPort());
++ ((ConsumableStreamPipesEntity) desc).setElementEndpointServiceName(DeclarersSingleton.getInstance().getServiceName());
+
if (supportedProtocols.size() > 0 && supportedFormats.size() > 0) {
// Overwrite existing grounding from default provided by declarers singleton
((ConsumableStreamPipesEntity) desc)
diff --cc streampipes-container/src/main/java/org/apache/streampipes/container/api/InvocablePipelineElementResource.java
index eb79cce,5664a57..a83b595
--- a/streampipes-container/src/main/java/org/apache/streampipes/container/api/InvocablePipelineElementResource.java
+++ b/streampipes-container/src/main/java/org/apache/streampipes/container/api/InvocablePipelineElementResource.java
@@@ -71,18 -61,17 +61,18 @@@ public abstract class InvocablePipeline
InvocableDeclarer declarer = (InvocableDeclarer) getDeclarerById(elementId);
if (declarer != null) {
- String runningInstanceId = getInstanceId(graph.getElementId(), elementId);
+ //String runningInstanceId = getInstanceId(graph.getElementId(), elementId);
+ String runningInstanceId = graph.getDeploymentRunningInstanceId();
RunningInstances.INSTANCE.add(runningInstanceId, graph, declarer.getClass().newInstance());
Response resp = RunningInstances.INSTANCE.getInvocation(runningInstanceId).invokeRuntime(graph);
- return Util.toResponseString(resp);
+ return ok(resp);
}
- } catch (RDFParseException | IOException | RepositoryException | InstantiationException | IllegalAccessException e) {
+ } catch (InstantiationException | IllegalAccessException e) {
e.printStackTrace();
- return Util.toResponseString(new Response(elementId, false, e.getMessage()));
+ return ok(new Response(elementId, false, e.getMessage()));
}
- return Util.toResponseString(elementId, false, "Could not find the element with id: " + elementId);
+ return ok(new Response(elementId, false, "Could not find the element with id: " + elementId));
}
@POST
diff --cc streampipes-container/src/main/java/org/apache/streampipes/container/util/NodeControllerUtil.java
index 903321d,0000000..be7ee90
mode 100644,000000..100644
--- a/streampipes-container/src/main/java/org/apache/streampipes/container/util/NodeControllerUtil.java
+++ b/streampipes-container/src/main/java/org/apache/streampipes/container/util/NodeControllerUtil.java
@@@ -1,134 -1,0 +1,134 @@@
+package org.apache.streampipes.container.util;/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+import org.apache.http.client.fluent.Request;
+import org.apache.http.entity.ContentType;
+import org.apache.streampipes.container.declarer.SemanticEventProcessingAgentDeclarer;
+import org.apache.streampipes.container.model.node.InvocableRegistration;
+import org.apache.streampipes.container.model.consul.ConsulServiceRegistrationBody;
+import org.apache.streampipes.container.model.consul.HealthCheckConfiguration;
+import org.apache.streampipes.serializers.json.JacksonSerializer;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.*;
+
+public class NodeControllerUtil {
+ static Logger LOG = LoggerFactory.getLogger(NodeControllerUtil.class);
+
+ private static final String HTTP_PROTOCOL = "http://";
+ private static final String COLON = ":";
+ private static final String SLASH = "/";
+ private static final String HEALTH_CHECK_INTERVAL = "10s";
+ private static final String PE_TAG = "pe";
+ private static final String SECONDARY_PE_IDENTIFIER_TAG = "secondary";
- private static final String NODE_CONTROLLER_REGISTER_SVC_URL = "api/v2/node/container/register";
++ private static final String NODE_CONTROLLER_REGISTER_SVC_URL = "api/v2/node/element/register";
+
+ private static final String NODE_CONTROLLER_CONTAINER_HOST = "SP_NODE_CONTROLLER_CONTAINER_HOST";
+ private static final String NODE_CONTROLLER_CONTAINER_PORT = "SP_NODE_CONTROLLER_CONTAINER_PORT";
+
+ public static void register(String serviceID, String host, int port,
+ Map<String, SemanticEventProcessingAgentDeclarer> epaDeclarers) {
+ register(PE_TAG, makeSvcId(host, serviceID), host, port,
+ Arrays.asList(PE_TAG, SECONDARY_PE_IDENTIFIER_TAG), epaDeclarers);
+ }
+
+ public static void register(String svcName, String svcId, String host, int port, List<String> tag,
+ Map<String, SemanticEventProcessingAgentDeclarer> epaDeclarers) {
+ boolean connected = false;
+
+ while (!connected) {
+ LOG.info("Trying to register pipeline element container at node controller: " + makeRegistrationEndpoint());
+ String body = createSvcBody(svcName, svcId, host, port, tag, epaDeclarers);
+ connected = registerSvcHttpClient(body);
+
+ if (!connected) {
+ LOG.info("Retrying in 5 seconds");
+ try {
+ Thread.sleep(5000);
+ } catch (InterruptedException e) {
+ e.printStackTrace();
+ }
+ }
+ }
+ LOG.info("Successfully registered pipeline element container: " + svcId);
+ }
+
+ private static boolean registerSvcHttpClient(String body) {
+ String endpoint = makeRegistrationEndpoint();
+ try {
+ Request.Post(makeRegistrationEndpoint())
+ .bodyString(body, ContentType.APPLICATION_JSON)
+ .connectTimeout(1000)
+ .socketTimeout(100000)
+ .execute();
+ return true;
+ } catch (IOException e) {
+ LOG.error("Could not register at " + endpoint);
+ }
+ return false;
+ }
+
+ private static String createSvcBody(String name, String id, String host, int port, List<String> tags,
+ Map<String, SemanticEventProcessingAgentDeclarer> epaDeclarers) {
+ try {
+ ConsulServiceRegistrationBody body = new ConsulServiceRegistrationBody();
+ String healthCheckURL = HTTP_PROTOCOL + host + COLON + port;
+ body.setID(id);
+ body.setName(name);
+ body.setTags(tags);
+ body.setAddress(HTTP_PROTOCOL + host);
+ body.setPort(port);
+ body.setEnableTagOverride(true);
+ body.setCheck(new HealthCheckConfiguration("GET", healthCheckURL, HEALTH_CHECK_INTERVAL));
+
+ InvocableRegistration svcBody = new InvocableRegistration();
+ svcBody.setConsulServiceRegistrationBody(body);
+ svcBody.setSupportedPipelineElementAppIds(new ArrayList<>(epaDeclarers.keySet()));
+
+ return JacksonSerializer.getObjectMapper().writeValueAsString(svcBody);
+ } catch (JsonProcessingException e) {
+ e.printStackTrace();
+ }
+ throw new IllegalArgumentException("Failure");
+ }
+
+ private static String makeRegistrationEndpoint() {
+ if (System.getenv(NODE_CONTROLLER_CONTAINER_HOST) != null) {
+ return HTTP_PROTOCOL
+ + System.getenv(NODE_CONTROLLER_CONTAINER_HOST)
+ + COLON
+ + System.getenv(NODE_CONTROLLER_CONTAINER_PORT)
+ + SLASH
+ + NODE_CONTROLLER_REGISTER_SVC_URL;
+ } else {
+ return HTTP_PROTOCOL
+ + "localhost"
+ + COLON
+ + "7077"
+ + SLASH
+ + NODE_CONTROLLER_REGISTER_SVC_URL;
+ }
+ }
+
+ private static String makeSvcId(String host, String serviceID) {
+ return host + SLASH + serviceID;
+ }
+}
diff --cc streampipes-container/src/main/java/org/apache/streampipes/container/util/Util.java
index 5b39877,0761f12..572b00d
--- a/streampipes-container/src/main/java/org/apache/streampipes/container/util/Util.java
+++ b/streampipes-container/src/main/java/org/apache/streampipes/container/util/Util.java
@@@ -26,11 -26,11 +26,11 @@@ public class Util
private static final String Slash = "/";
-- public static String getInstanceId(String url, String type, String elemntId) {
++ public static String getInstanceId(String url, String type, String elementId) {
return url.replace(DeclarersSingleton.getInstance().getBaseUri()
+ type
+ Slash
-- + elemntId
++ + elementId
+ Slash, "");
}
diff --cc streampipes-model/src/main/java/org/apache/streampipes/model/pipeline/Pipeline.java
index 58774ca,2e1ad1a..8394db8
--- a/streampipes-model/src/main/java/org/apache/streampipes/model/pipeline/Pipeline.java
+++ b/streampipes-model/src/main/java/org/apache/streampipes/model/pipeline/Pipeline.java
@@@ -134,14 -135,14 +136,22 @@@ public class Pipeline extends ElementCo
this.createdAt = createdAt;
}
+ public String getEventRelayStrategy() {
+ return eventRelayStrategy;
+ }
+
+ public void setEventRelayStrategy(String eventRelayStrategy) {
+ this.eventRelayStrategy = eventRelayStrategy;
+ }
+
+ public boolean isRestartOnSystemReboot() {
+ return restartOnSystemReboot;
+ }
+
+ public void setRestartOnSystemReboot(boolean restartOnSystemReboot) {
+ this.restartOnSystemReboot = restartOnSystemReboot;
+ }
+
public Pipeline clone() {
Pipeline pipeline = new Pipeline();
pipeline.setName(name);
diff --cc streampipes-node-controller-container/pom.xml
index a593487,d9444f2..4076064
--- a/streampipes-node-controller-container/pom.xml
+++ b/streampipes-node-controller-container/pom.xml
@@@ -75,79 -108,31 +75,78 @@@
<version>0.68.0-SNAPSHOT</version>
</dependency>
- <!-- External dependencies -->
- <dependency>
- <groupId>org.apache.maven.shared</groupId>
- <artifactId>maven-invoker</artifactId>
- </dependency>
- <dependency>
- <groupId>org.jgrapht</groupId>
- <artifactId>jgrapht-core</artifactId>
- </dependency>
- <dependency>
- <groupId>org.objenesis</groupId>
- <artifactId>objenesis</artifactId>
- </dependency>
-
- <!-- Test dependencies -->
- <dependency>
- <groupId>junit</groupId>
- <artifactId>junit</artifactId>
- <scope>test</scope>
- </dependency>
- <dependency>
- <groupId>org.apache.streampipes</groupId>
- <artifactId>streampipes-measurement-units</artifactId>
- <version>0.68.0-SNAPSHOT</version>
- <scope>test</scope>
+ <!--external dependencies-->
+ <!-- parse yaml config -->
+<!-- <dependency>-->
+<!-- <groupId>org.yaml</groupId>-->
+<!-- <artifactId>snakeyaml</artifactId>-->
+<!-- <version>1.21</version>-->
+<!-- </dependency>-->
+ <!-- docker client for java -->
+ <dependency>
+ <groupId>com.spotify</groupId>
+ <artifactId>docker-client</artifactId>
+ <classifier>shaded</classifier>
+ <version>8.16.0</version>
+ <exclusions>
+ <exclusion>
+ <groupId>org.ow2.asm</groupId>
+ <artifactId>asm-tree</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>org.ow2.asm</groupId>
+ <artifactId>asm-util</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>org.ow2.asm</groupId>
+ <artifactId>asm-commons</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>org.ow2.asm</groupId>
+ <artifactId>asm-analysis</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>com.fasterxml.jackson.module</groupId>
+ <artifactId>jackson-module-jaxb-annotations</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>com.fasterxml.jackson.datatype</groupId>
+ <artifactId>jackson-datatype-guava</artifactId>
+ </exclusion>
+ </exclusions>
+ </dependency>
+<!-- </dependency>-->
+ <!-- used for gathering system information-->
+ <dependency>
+ <groupId>com.github.oshi</groupId>
+ <artifactId>oshi-core</artifactId>
+ <version>5.3.6</version>
+ </dependency>
+ <dependency>
+ <groupId>org.eclipse.paho</groupId>
+ <artifactId>org.eclipse.paho.client.mqttv3</artifactId>
+ <version>1.2.4</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.kafka</groupId>
+ <artifactId>kafka-clients</artifactId>
- <version>2.2.0</version>
</dependency>
</dependencies>
+
+ <build>
+ <plugins>
+ <plugin>
+ <groupId>org.springframework.boot</groupId>
+ <artifactId>spring-boot-maven-plugin</artifactId>
+ <executions>
+ <execution>
+ <goals>
+ <goal>repackage</goal>
+ </goals>
+ </execution>
+ </executions>
+ </plugin>
+ </plugins>
+ <finalName>streampipes-node-controller-container</finalName>
+ </build>
</project>
diff --cc streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/NodeControllerInit.java
index be6c385,0000000..48e2e88
mode 100644,000000..100644
--- a/streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/NodeControllerInit.java
+++ b/streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/NodeControllerInit.java
@@@ -1,78 -1,0 +1,78 @@@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.apache.streampipes.node.controller.container;
+
+import org.apache.streampipes.container.util.ConsulUtil;
+import org.apache.streampipes.node.controller.container.management.orchestrator.docker.DockerContainerManager;
- import org.apache.streampipes.node.controller.container.rest.NodeControllerResourceConfig;
++import org.apache.streampipes.node.controller.container.api.NodeControllerResourceConfig;
+import org.apache.streampipes.node.controller.container.config.NodeControllerConfig;
+import org.apache.streampipes.node.controller.container.management.node.NodeManager;
+import org.apache.streampipes.node.controller.container.management.janitor.JanitorManager;
+import org.apache.streampipes.node.controller.container.management.resource.ResourceManager;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.boot.SpringApplication;
+import org.springframework.boot.autoconfigure.EnableAutoConfiguration;
+import org.springframework.context.annotation.Configuration;
+import org.springframework.context.annotation.Import;
+
+import javax.annotation.PreDestroy;
+import java.util.Collections;
+
+@Configuration
+@EnableAutoConfiguration
+@Import({ NodeControllerResourceConfig.class })
+public class NodeControllerInit {
+
+ private static final Logger LOG =
+ LoggerFactory.getLogger(NodeControllerInit.class.getCanonicalName());
+
+ public static void main(String [] args) {
+
+ NodeControllerConfig conf = NodeControllerConfig.INSTANCE;
+
+ SpringApplication app = new SpringApplication(NodeControllerInit.class);
+ app.setDefaultProperties(Collections.singletonMap("server.port", conf.getNodeControllerPort()));
+ app.run();
+
+ LOG.info("Load node info");
+ NodeManager.getInstance().init();
+
+ LOG.info("Start Node resource manager");
+ ResourceManager.getInstance().run();
+
+ if (!"true".equals(System.getenv("SP_DEBUG"))) {
+ LOG.info("Auto-deploy StreamPipes node container");
+ DockerContainerManager.getInstance().init();
+
+ LOG.info("Start Janitor manager");
+ JanitorManager.getInstance().run();
+ }
+
+ // registration with consul here
+ ConsulUtil.registerNodeService(
+ conf.getNodeServiceId(),
+ conf.getNodeHostName(),
+ conf.getNodeControllerPort()
+ );
+ }
+
+ @PreDestroy
+ public void onExit(){
+ }
+}
diff --cc streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/api/AbstractResource.java
index 4d25c34,0000000..dc18520
mode 100644,000000..100644
--- a/streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/api/AbstractResource.java
+++ b/streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/api/AbstractResource.java
@@@ -1,61 -1,0 +1,61 @@@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
- package org.apache.streampipes.node.controller.container.rest;
++package org.apache.streampipes.node.controller.container.api;
+
+import org.apache.streampipes.model.message.Message;
+
+import javax.ws.rs.core.Response;
+
+
+public abstract class AbstractResource {
+
+ protected <T> Response ok(T entity) {
+ return Response
+ .ok()
+ .entity(entity)
+ .build();
+ }
+
+ protected <T> Response ok() {
+ return Response
+ .ok()
+ .build();
+ }
+
+ protected <T> Response error(T entity) {
+ return Response
+ .status(500)
+ .entity(entity)
+ .build();
+ }
+
+ protected Response statusMessage(Message message) {
+ return ok(message);
+ }
+
+ protected Response fail() {
+ return Response.serverError().build();
+ }
+
+ protected <T> Response fail(T entity) {
+ return Response
+ .serverError()
+ .entity(entity)
+ .build();
+ }
+}
diff --cc streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/api/AdapterDataStreamRelayResource.java
index 0000000,0000000..00bb34a
new file mode 100644
--- /dev/null
+++ b/streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/api/AdapterDataStreamRelayResource.java
@@@ -1,0 -1,0 +1,46 @@@
++/*
++ * Licensed to the Apache Software Foundation (ASF) under one or more
++ * contributor license agreements. See the NOTICE file distributed with
++ * this work for additional information regarding copyright ownership.
++ * The ASF licenses this file to You under the Apache License, Version 2.0
++ * (the "License"); you may not use this file except in compliance with
++ * the License. You may obtain a copy of the License at
++ *
++ * http://www.apache.org/licenses/LICENSE-2.0
++ *
++ * Unless required by applicable law or agreed to in writing, software
++ * distributed under the License is distributed on an "AS IS" BASIS,
++ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
++ * See the License for the specific language governing permissions and
++ * limitations under the License.
++ *
++ */
++package org.apache.streampipes.node.controller.container.api;
++
++import org.apache.streampipes.model.SpDataStreamRelayContainer;
++import org.apache.streampipes.node.controller.container.management.relay.DataStreamRelayManager;
++import org.apache.streampipes.rest.shared.annotation.JacksonSerialized;
++
++import javax.ws.rs.*;
++import javax.ws.rs.core.MediaType;
++
++@Path("/api/v2/node/stream/relay")
++public class AdapterDataStreamRelayResource extends AbstractResource {
++
++ @POST
++ @JacksonSerialized
++ @Path("/invoke")
++ @Consumes(MediaType.APPLICATION_JSON)
++ @Produces(MediaType.APPLICATION_JSON)
++ public javax.ws.rs.core.Response invoke(SpDataStreamRelayContainer graph) {
++ return ok(DataStreamRelayManager.getInstance().startAdapterDataStreamRelay(graph));
++ }
++
++ @DELETE
++ @Path("/detach/{runningInstanceId}")
++ @Consumes(MediaType.APPLICATION_JSON)
++ @Produces(MediaType.APPLICATION_JSON)
++ public javax.ws.rs.core.Response detach(@PathParam("runningInstanceId") String runningInstanceId) {
++ return ok(DataStreamRelayManager.getInstance().stopAdapterDataStreamRelay(runningInstanceId));
++ }
++}
diff --cc streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/api/ConnectResource.java
index 0439633,0000000..2865cc1
mode 100644,000000..100644
--- a/streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/api/ConnectResource.java
+++ b/streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/api/ConnectResource.java
@@@ -1,151 -1,0 +1,148 @@@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
- package org.apache.streampipes.node.controller.container.rest;
++package org.apache.streampipes.node.controller.container.api;
+
+import org.apache.streampipes.model.connect.adapter.AdapterDescription;
+import org.apache.streampipes.model.connect.adapter.AdapterSetDescription;
+import org.apache.streampipes.model.connect.adapter.AdapterStreamDescription;
+import org.apache.streampipes.model.connect.worker.ConnectWorkerContainer;
+import org.apache.streampipes.model.runtime.RuntimeOptionsRequest;
+import org.apache.streampipes.node.controller.container.management.connect.ConnectManager;
+import org.apache.streampipes.rest.shared.annotation.JacksonSerialized;
- import org.slf4j.Logger;
- import org.slf4j.LoggerFactory;
+
+import javax.ws.rs.*;
+import javax.ws.rs.core.MediaType;
+import javax.ws.rs.core.Response;
+
+
+@Path("/api/v1/{username}/worker")
+public class ConnectResource extends AbstractResource {
- private static final Logger LOG = LoggerFactory.getLogger(ConnectResource.class.getCanonicalName());
+
+ // Registration
+ @POST
+ @JacksonSerialized
+ @Path("/register")
+ @Consumes(MediaType.APPLICATION_JSON)
+ @Produces(MediaType.APPLICATION_JSON)
+ public Response register(@PathParam("username") String username, ConnectWorkerContainer cw) {
+ return ok(ConnectManager.getInstance().register(username, cw));
+ }
+
+ // AdapterResource
+ @GET
+ @Path("/adapters/{id}/assets")
+ @Produces("application/zip")
+ public Response getAdapterAssets(@PathParam("username") String username, @PathParam("id") String appId) {
+ return ok(ConnectManager.getInstance().assets(username, appId, "adapter", "/"));
+ }
+
+ @GET
+ @Path("/adapters/{id}/assets/icon")
+ @Produces("image/png")
+ public Response getAdapterIconAsset(@PathParam("username") String username, @PathParam("id") String appId) {
+ return ok(ConnectManager.getInstance().assets(username, appId, "adapter", "/icon"));
+ }
+
+ @GET
+ @Path("/adapters/{id}/assets/documentation")
+ @Produces(MediaType.TEXT_PLAIN)
+ public String getAdapterDocumentationAsset(@PathParam("username") String username, @PathParam("id") String appId) {
+ return ConnectManager.getInstance()
+ .assets(username, appId, "adapter", "/documentation").toString();
+ }
+
+ // ProtocolResource
+ @GET
+ @Path("/protocols/{id}/assets")
+ @Produces("application/zip")
+ public Response getProtocolAssets(@PathParam("username") String username, @PathParam("id") String appId) {
+ return ok(ConnectManager.getInstance().assets(username, appId, "protocol", "/"));
+ }
+
+ @GET
+ @Path("/protocols/{id}/assets/icon")
+ @Produces("image/png")
+ public Response getProtocolIconAsset(@PathParam("username") String username, @PathParam("id") String appId) {
+ return ok(ConnectManager.getInstance().assets(username, appId, "protocol", "/icon"));
+ }
+
+ @GET
+ @Path("/protocols/{id}/assets/documentation")
+ @Produces(MediaType.TEXT_PLAIN)
+ public String getProtocolDocumentationAsset(@PathParam("username") String username,
+ @PathParam("id") String appId) {
+ return ConnectManager.getInstance()
+ .assets(username, appId, "protocol", "documentation").toString();
+ }
+
+ // WorkerResource
+ @POST
+ @JacksonSerialized
+ @Path("/stream/invoke")
+ @Consumes(MediaType.APPLICATION_JSON)
+ @Produces(MediaType.APPLICATION_JSON)
+ public Response invokeStreamAdapter(@PathParam("username") String username, AdapterStreamDescription ad) {
+ return ok(ConnectManager.getInstance().invoke(username, ad));
+ }
+
+ @POST
+ @JacksonSerialized
+ @Path("/stream/stop")
+ @Consumes(MediaType.APPLICATION_JSON)
+ @Produces(MediaType.APPLICATION_JSON)
+ public Response stopStreamAdapter(@PathParam("username") String username, AdapterStreamDescription ad) {
+ return ok(ConnectManager.getInstance().stop(username, ad));
+ }
+
+ @POST
+ @JacksonSerialized
+ @Path("/set/invoke")
+ @Consumes(MediaType.APPLICATION_JSON)
+ @Produces(MediaType.APPLICATION_JSON)
+ public Response invokeSetAdapter(@PathParam("username") String username, AdapterSetDescription ad) {
+ return ok(ConnectManager.getInstance().invoke(username, ad));
+ }
+
+ @POST
+ @JacksonSerialized
+ @Path("/set/stop")
+ @Consumes(MediaType.APPLICATION_JSON)
+ @Produces(MediaType.APPLICATION_JSON)
+ public Response stopSetAdapter(@PathParam("username") String username, AdapterSetDescription ad){
+ return ok(ConnectManager.getInstance().stop(username, ad));
+ }
+
+ @POST
+ @JacksonSerialized
+ @Path("/guess/schema")
+ @Produces(MediaType.APPLICATION_JSON)
+ public Response guessSchema(@PathParam("username") String username, AdapterDescription ad) {
+ return ok(ConnectManager.getInstance().guess(username, ad));
+ }
+
+ // RuntimeResolvableResource
+ @POST
+ @Path("/resolvable/{id}/configurations")
+ @JacksonSerialized
+ @Produces(MediaType.APPLICATION_JSON)
+ @Consumes(MediaType.APPLICATION_JSON)
+ public Response fetchConfigurations(@PathParam("username") String username, @PathParam("id") String appId,
+ RuntimeOptionsRequest runtimeOptions) {
+ return ok(ConnectManager.getInstance().fetchConfigurations(username, appId, runtimeOptions));
+ }
+}
diff --cc streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/api/ContainerResource.java
index 0000000,0000000..21733ef
new file mode 100644
--- /dev/null
+++ b/streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/api/ContainerResource.java
@@@ -1,0 -1,0 +1,50 @@@
++/*
++ * Licensed to the Apache Software Foundation (ASF) under one or more
++ * contributor license agreements. See the NOTICE file distributed with
++ * this work for additional information regarding copyright ownership.
++ * The ASF licenses this file to You under the Apache License, Version 2.0
++ * (the "License"); you may not use this file except in compliance with
++ * the License. You may obtain a copy of the License at
++ *
++ * http://www.apache.org/licenses/LICENSE-2.0
++ *
++ * Unless required by applicable law or agreed to in writing, software
++ * distributed under the License is distributed on an "AS IS" BASIS,
++ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
++ * See the License for the specific language governing permissions and
++ * limitations under the License.
++ *
++ */
++package org.apache.streampipes.node.controller.container.api;
++
++import org.apache.streampipes.model.node.container.DockerContainer;
++import org.apache.streampipes.node.controller.container.management.orchestrator.docker.DockerContainerManager;
++import org.apache.streampipes.node.controller.container.management.pe.InvocableElementManager;
++
++import javax.ws.rs.*;
++import javax.ws.rs.core.MediaType;
++
++@Path("/api/v2/node/container")
++public class ContainerResource extends AbstractResource {
++
++ @GET
++ @Produces(MediaType.APPLICATION_JSON)
++ public javax.ws.rs.core.Response getPipelineElementContainer(){
++ return ok(DockerContainerManager.getInstance().list());
++ }
++
++ @POST
++ @Path("/deploy")
++ @Consumes(MediaType.APPLICATION_JSON)
++ public javax.ws.rs.core.Response deployPipelineElementContainer(DockerContainer container) {
++ return ok(DockerContainerManager.getInstance().deploy(container));
++ }
++
++ @DELETE
++ @Path("/remove")
++ @Consumes(MediaType.APPLICATION_JSON)
++ public javax.ws.rs.core.Response removePipelineElementContainer(DockerContainer container) {
++ InvocableElementManager.getInstance().unregister();
++ return ok(DockerContainerManager.getInstance().remove(container));
++ }
++}
diff --cc streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/api/DataProcessorPipelineElementResource.java
index 573d13d,f3eb16a..6e5ebb0
--- a/streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/api/DataProcessorPipelineElementResource.java
+++ b/streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/api/DataProcessorPipelineElementResource.java
@@@ -1,32 -1,29 +1,30 @@@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-
-import {StreampipesPeContainerConifgs} from "./streampipes-pe-container-configs";
-
-//ConsulService = StreampipesPeContainer ERLEDIGT
-export interface StreampipesPeContainer {
- name: string;
- mainKey: string;
- meta: {
- status: string;
- }
- configs: [StreampipesPeContainerConifgs];
-}
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
++package org.apache.streampipes.node.controller.container.api;
+
- package org.apache.streampipes.rest.shared.annotation;
++import org.apache.streampipes.model.graph.DataProcessorInvocation;
+
- import javax.ws.rs.NameBinding;
- import java.lang.annotation.ElementType;
- import java.lang.annotation.Retention;
- import java.lang.annotation.RetentionPolicy;
- import java.lang.annotation.Target;
++import javax.ws.rs.Path;
+
- @NameBinding
- @Target({ElementType.TYPE, ElementType.METHOD})
- @Retention(RetentionPolicy.RUNTIME)
- public @interface RdfRootElement {
- String value();
++@Path("/api/v2/node/element/sepa")
++public class DataProcessorPipelineElementResource extends InvocableEntityResource<DataProcessorInvocation> {
++
++ public DataProcessorPipelineElementResource() {
++ super(DataProcessorInvocation.class);
++ }
+}
diff --cc streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/api/DataSinkPipelineElementResource.java
index 0e851d6,f3eb16a..af99b25
--- a/streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/api/DataSinkPipelineElementResource.java
+++ b/streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/api/DataSinkPipelineElementResource.java
@@@ -1,30 -1,29 +1,30 @@@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-
-import {StreampipesPeContainerConifgs} from "./streampipes-pe-container-configs";
-
-//ConsulService = StreampipesPeContainer ERLEDIGT
-export interface StreampipesPeContainer {
- name: string;
- mainKey: string;
- meta: {
- status: string;
- }
- configs: [StreampipesPeContainerConifgs];
-}
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
- package org.apache.streampipes.rest.shared.annotation;
++package org.apache.streampipes.node.controller.container.api;
+
- import javax.ws.rs.NameBinding;
- import java.lang.annotation.ElementType;
- import java.lang.annotation.Retention;
- import java.lang.annotation.RetentionPolicy;
- import java.lang.annotation.Target;
++import org.apache.streampipes.model.graph.DataSinkInvocation;
+
- @NameBinding
- @Target({ElementType.TYPE, ElementType.METHOD})
- @Retention(RetentionPolicy.RUNTIME)
- public @interface JsonLdSerialized {
++import javax.ws.rs.Path;
++
++@Path("/api/v2/node/element/sec")
++public class DataSinkPipelineElementResource extends InvocableEntityResource<DataSinkInvocation> {
++
++ public DataSinkPipelineElementResource() {
++ super(DataSinkInvocation.class);
++ }
+}
diff --cc streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/api/HealthCheckResource.java
index 37796be,ff6da01..8c593a4
--- a/streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/api/HealthCheckResource.java
+++ b/streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/api/HealthCheckResource.java
@@@ -15,22 -15,19 +15,22 @@@
* limitations under the License.
*
*/
- package org.apache.streampipes.node.controller.container.rest;
++package org.apache.streampipes.node.controller.container.api;
-import {Component, Input} from '@angular/core';
-import {StreampipesPeContainerConifgs} from "../shared/streampipes-pe-container-configs";
-import {ConfigurationService} from '../shared/configuration.service';
+import org.apache.streampipes.node.controller.container.config.NodeControllerConfig;
-@Component({
- selector: 'consul-configs-number',
- templateUrl: './consul-configs-number.component.html',
- styleUrls: ['./consul-configs-number.component.css']
-})
-export class ConsulConfigsNumberComponent {
- @Input() configuration: StreampipesPeContainerConifgs
- constructor(public configService:ConfigurationService) {
- }
+import javax.ws.rs.GET;
+import javax.ws.rs.Path;
+import javax.ws.rs.Produces;
+import javax.ws.rs.core.MediaType;
+import javax.ws.rs.core.Response;
+
+@Path("/")
+public class HealthCheckResource extends AbstractResource {
-}
+ @GET
+ @Produces(MediaType.APPLICATION_JSON)
+ public Response getHealth() {
- return ok(String.format("hello from node controller: %s", NodeControllerConfig.INSTANCE.getNodeControllerId()));
++ return ok(String.format("PONG: %s", NodeControllerConfig.INSTANCE.getNodeControllerId()));
+ }
+}
diff --cc streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/api/InfoStatusResource.java
index 949d5e1,0000000..b3a01f2
mode 100644,000000..100644
--- a/streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/api/InfoStatusResource.java
+++ b/streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/api/InfoStatusResource.java
@@@ -1,82 -1,0 +1,59 @@@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
- package org.apache.streampipes.node.controller.container.rest;
++package org.apache.streampipes.node.controller.container.api;
+
- import com.fasterxml.jackson.core.JsonProcessingException;
+import org.apache.streampipes.model.node.NodeInfoDescription;
+import org.apache.streampipes.node.controller.container.management.node.NodeManager;
- import org.apache.streampipes.node.controller.container.management.relay.EventRelay;
- import org.apache.streampipes.node.controller.container.management.relay.RunningRelayInstances;
- import org.apache.streampipes.node.controller.container.management.relay.metrics.RelayMetrics;
++import org.apache.streampipes.node.controller.container.management.relay.DataStreamRelayManager;;
+import org.apache.streampipes.node.controller.container.management.resource.ResourceManager;
+import org.apache.streampipes.rest.shared.annotation.JacksonSerialized;
- import org.apache.streampipes.serializers.json.JacksonSerializer;
- import org.slf4j.Logger;
- import org.slf4j.LoggerFactory;
+
+import javax.ws.rs.*;
+import javax.ws.rs.core.MediaType;
+import javax.ws.rs.core.Response;
- import java.util.List;
- import java.util.stream.Collectors;
+
- @Path("/api/v2/node")
++@Path("/api/v2/node/info")
+public class InfoStatusResource extends AbstractResource {
- private static final Logger LOG = LoggerFactory.getLogger(InfoStatusResource.class.getCanonicalName());
+
+ @GET
- @Path("/info")
+ @Produces(MediaType.APPLICATION_JSON)
+ public Response getNodeInfo() {
+ return ok(NodeManager.getInstance().retrieveNodeInfoDescription());
+ }
+
+ @PUT
- @Path("/update")
+ @JacksonSerialized
+ @Consumes(MediaType.APPLICATION_JSON)
+ public Response updateNodeInfo(NodeInfoDescription desc) {
+ return ok(NodeManager.getInstance().updateNodeInfoDescription(desc));
+ }
+
+ @GET
- @Path("/status")
++ @Path("/resources")
+ @Produces(MediaType.APPLICATION_JSON)
+ public Response getStatus() {
+ return ok(ResourceManager.getInstance().retrieveNodeResources());
+ }
+
+ @GET
- @Path("/metrics")
++ @Path("/relays")
+ @Produces(MediaType.APPLICATION_JSON)
- public Response getMetrics() {
- try {
- List<RelayMetrics> metrics = RunningRelayInstances.INSTANCE.getRunningInstances()
- .stream()
- .map(EventRelay::getRelayMetrics)
- .collect(Collectors.toList());
-
- String metricsList = JacksonSerializer.getObjectMapper().writeValueAsString(metrics);
-
- return ok(metricsList);
- } catch (JsonProcessingException e) {
- e.printStackTrace();
- }
- return fail();
++ public Response getAllRelays() {
++ return ok(DataStreamRelayManager.getInstance().getAllRelays());
+ }
+}
diff --cc streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/api/InvocableEntityResource.java
index 0000000,0000000..0c71b57
new file mode 100644
--- /dev/null
+++ b/streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/api/InvocableEntityResource.java
@@@ -1,0 -1,0 +1,107 @@@
++/*
++ * Licensed to the Apache Software Foundation (ASF) under one or more
++ * contributor license agreements. See the NOTICE file distributed with
++ * this work for additional information regarding copyright ownership.
++ * The ASF licenses this file to You under the Apache License, Version 2.0
++ * (the "License"); you may not use this file except in compliance with
++ * the License. You may obtain a copy of the License at
++ *
++ * http://www.apache.org/licenses/LICENSE-2.0
++ *
++ * Unless required by applicable law or agreed to in writing, software
++ * distributed under the License is distributed on an "AS IS" BASIS,
++ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
++ * See the License for the specific language governing permissions and
++ * limitations under the License.
++ *
++ */
++package org.apache.streampipes.node.controller.container.api;
++
++import com.fasterxml.jackson.core.JsonProcessingException;
++import org.apache.streampipes.commons.exceptions.SpRuntimeException;
++import org.apache.streampipes.container.model.node.InvocableRegistration;
++import org.apache.streampipes.model.Response;
++import org.apache.streampipes.model.base.InvocableStreamPipesEntity;
++import org.apache.streampipes.model.graph.DataProcessorInvocation;
++import org.apache.streampipes.model.graph.DataSinkInvocation;
++import org.apache.streampipes.node.controller.container.management.pe.InvocableElementManager;
++import org.apache.streampipes.node.controller.container.management.pe.RunningInvocableInstances;
++import org.apache.streampipes.node.controller.container.management.relay.DataStreamRelayManager;
++import org.apache.streampipes.serializers.json.JacksonSerializer;
++import org.slf4j.Logger;
++import org.slf4j.LoggerFactory;
++
++import javax.ws.rs.*;
++import javax.ws.rs.core.MediaType;
++
++public abstract class InvocableEntityResource<I extends InvocableStreamPipesEntity> extends AbstractResource {
++ private static final Logger LOG = LoggerFactory.getLogger(InvocableEntityResource.class.getCanonicalName());
++
++ private static final String SLASH = "/";
++
++ protected Class<I> clazz;
++
++ public InvocableEntityResource(Class<I> clazz) {
++ this.clazz = clazz;
++ }
++
++ @POST
++ @Path("/register")
++ public void register(InvocableRegistration registration) {
++ InvocableElementManager.getInstance().register(registration);
++ }
++
++ @POST
++ @Path("{elementId}")
++ @Consumes(MediaType.APPLICATION_JSON)
++ @Produces(MediaType.APPLICATION_JSON)
++ public javax.ws.rs.core.Response invoke(@PathParam("elementId") String elementId, I graph) {
++ String endpoint;
++
++ if (graph instanceof DataProcessorInvocation) {
++ endpoint = graph.getBelongsTo();
++ DataStreamRelayManager.getInstance().startPipelineElementDataStreamRelay((DataProcessorInvocation) graph);
++ Response resp = InvocableElementManager.getInstance().invoke(endpoint, toJson(graph));
++ if (resp.isSuccess()) {
++ RunningInvocableInstances.INSTANCE.add(graph.getDeploymentRunningInstanceId(), graph);
++ }
++ return ok(resp);
++ }
++ // Currently no data sinks are registered at node controller. If we, at some point, want to also run data
++ // sinks on edge nodes we need to register there Declarer at the node controller one startup.
++ else if (graph instanceof DataSinkInvocation) {
++ endpoint = graph.getBelongsTo();
++ Response resp = InvocableElementManager.getInstance().invoke(endpoint, toJson(graph));
++ if (resp.isSuccess()) {
++ RunningInvocableInstances.INSTANCE.add(graph.getDeploymentRunningInstanceId(), graph);
++ }
++ return ok(resp);
++ }
++
++ return ok();
++ }
++
++ @DELETE
++ @Path("{elementId}/{runningInstanceId}")
++ @Produces(MediaType.APPLICATION_JSON)
++ public javax.ws.rs.core.Response detach(@PathParam("elementId") String elementId,
++ @PathParam("runningInstanceId") String runningInstanceId) {
++ LOG.info("receive stop request elementId={}, runningInstanceId={}", elementId, runningInstanceId);
++
++ String endpoint = RunningInvocableInstances.INSTANCE.get(runningInstanceId).getBelongsTo();
++ Response resp = InvocableElementManager.getInstance().detach(endpoint + SLASH + runningInstanceId);
++ RunningInvocableInstances.INSTANCE.remove(runningInstanceId);
++ DataStreamRelayManager.getInstance().stopPipelineElementDataStreamRelay(runningInstanceId);
++
++ return ok(resp);
++ }
++
++ private String toJson(I graph) {
++ try {
++ return JacksonSerializer.getObjectMapper().writeValueAsString(graph);
++ } catch (JsonProcessingException e) {
++ e.printStackTrace();
++ }
++ throw new SpRuntimeException("Could not serialize object: " + graph);
++ }
++}
diff --cc streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/api/NodeControllerResourceConfig.java
index 97b2dd2,630dcb5..8db6aba
--- a/streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/api/NodeControllerResourceConfig.java
+++ b/streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/api/NodeControllerResourceConfig.java
@@@ -15,19 -15,20 +15,21 @@@
* limitations under the License.
*
*/
- package org.apache.streampipes.node.controller.container.rest;
++package org.apache.streampipes.node.controller.container.api;
-import {Component, Input} from '@angular/core';
-import {StreampipesPeContainerConifgs} from "../shared/streampipes-pe-container-configs";
-import {ConfigurationService} from '../shared/configuration.service'
+import org.glassfish.jersey.server.ResourceConfig;
+import org.springframework.stereotype.Component;
-@Component({
- selector: 'consul-configs-boolean',
- templateUrl: './consul-configs-boolean.component.html',
- styleUrls: ['./consul-configs-boolean.component.css'],
- providers: [ConfigurationService]
-})
-export class ConsulConfigsBooleanComponent {
- @Input() configuration: StreampipesPeContainerConifgs
- constructor(public configService:ConfigurationService) {
- }
+@Component
+public class NodeControllerResourceConfig extends ResourceConfig {
-}
+ public NodeControllerResourceConfig() {
+ register(HealthCheckResource.class);
+ register(InfoStatusResource.class);
- register(InvocableEntityResource.class);
- register(DataStreamRelayResource.class);
++ register(DataProcessorPipelineElementResource.class);
++ register(DataSinkPipelineElementResource.class);
++ register(AdapterDataStreamRelayResource.class);
+ register(ConnectResource.class);
++ register(ContainerResource.class);
+ }
+}
diff --cc streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/management/connect/ConnectManager.java
index 3eee852,0000000..0756e1b
mode 100644,000000..100644
--- a/streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/management/connect/ConnectManager.java
+++ b/streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/management/connect/ConnectManager.java
@@@ -1,225 -1,0 +1,224 @@@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.apache.streampipes.node.controller.container.management.connect;
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+import org.apache.http.HttpResponse;
+import org.apache.http.client.fluent.Request;
+import org.apache.http.client.fluent.Response;
+import org.apache.http.entity.ContentType;
+import org.apache.http.util.EntityUtils;
+import org.apache.streampipes.commons.exceptions.SpRuntimeException;
+import org.apache.streampipes.model.base.UnnamedStreamPipesEntity;
+import org.apache.streampipes.model.connect.adapter.AdapterDescription;
+import org.apache.streampipes.model.connect.adapter.AdapterSetDescription;
+import org.apache.streampipes.model.connect.adapter.AdapterStreamDescription;
+import org.apache.streampipes.model.connect.guess.GuessSchema;
+import org.apache.streampipes.model.connect.worker.ConnectWorkerContainer;
+import org.apache.streampipes.model.runtime.RuntimeOptionsRequest;
+import org.apache.streampipes.node.controller.container.config.NodeControllerConfig;
+import org.apache.streampipes.serializers.json.JacksonSerializer;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+
+public class ConnectManager {
-
+ private static final Logger LOG =
+ LoggerFactory.getLogger(ConnectManager.class.getCanonicalName());
+
+ private static final String HTTP_PROTOCOL = "http://";
+ private static final String COLON = ":";
+ private static final String SLASH = "/";
+ private static final String BACKEND_HOST = NodeControllerConfig.INSTANCE.getBackendHost();
+ private static final int BACKEND_PORT = NodeControllerConfig.INSTANCE.getBackendPort();
+ private static final String BACKEND_ADMINISTRATION_ROUTE = "/streampipes-backend/api/v2/connect/{username}/master" +
+ "/administration";
+
+ // Connect adapter base route
+ // TODO: get from registered extensions or connect adapater config
+ private static final String CONNECT_WORKER_HOST = "localhost";
+ private static final int CONNECT_WORKER_PORT = 7024;
+ private static final String CONNECT_WORKER_BASE_ROUTE = "/api/v1/{username}/worker";
+ private static final String STREAM_ROUTE = "/stream";
+ private static final String SET_ROUTE = "/set";
+ private static final String INVOKE_ROUTE = "/invoke";
+ private static final String STOP_ROUTE ="/stop";
+ private static final String GUESS_ROUTE = "/guess/schema";
+ private static final String RESOLVABLE_ROUTE = "/resolvable/{id}/configurations";
+ private static final String ADAPTER_ROUTE = "/adapters/{id}/assets";
+ private static final String PROCOTOL_ROUTE = "/protocols/{id}/assets";
+
+ private static final Integer CONNECT_TIMEOUT = 10000;
+ private static final Integer SOCKET_TIMEOUT = 100000;
+ private static ConnectManager instance = null;
+
+ private ConnectManager() {}
+
+ public static ConnectManager getInstance() {
+ if (instance == null) {
+ synchronized (ConnectManager.class) {
+ if (instance == null)
+ instance = new ConnectManager();
+ }
+ }
+ return instance;
+ }
+
+ // adapter -> backend communication: registration
+
+ // MasterRestClient
+ public String register(String username, ConnectWorkerContainer wc) {
+ String endpoint = (backendUrl() + BACKEND_ADMINISTRATION_ROUTE.replace("{username}", username));
+ LOG.info("Trying to register connect worker at backend: " + endpoint);
+ return post(endpoint , jackson(wc)).toString();
+ }
+
+ // backend -> adapter communication
+
+ // WorkerResource
+ public <T extends AdapterDescription> String invoke(String username, T ad) {
+ LOG.info("Invoke adapter: appId=" + ad.getAppId() + ", name=" + ad.getName());
+ if (ad instanceof AdapterStreamDescription) {
+ return post(endpointFromDescription(username, ad, STREAM_ROUTE + INVOKE_ROUTE), jackson(ad)).toString();
+ } else if (ad instanceof AdapterSetDescription) {
+ return post(endpointFromDescription(username, ad, SET_ROUTE + INVOKE_ROUTE), jackson(ad)).toString();
+ }
+ throw new SpRuntimeException("Could not invoke adapter: " + ad.getAppId());
+ }
+
+ public <T extends AdapterDescription> String stop(String username, T ad) {
+ LOG.info("Stop adapter: appId=" + ad.getAppId() + ", name=" + ad.getName());
+ if (ad instanceof AdapterStreamDescription) {
+ return post(endpointFromDescription(username, ad, STREAM_ROUTE + STOP_ROUTE), jackson(ad)).toString();
+ } else if (ad instanceof AdapterSetDescription) {
+ return post(endpointFromDescription(username, ad, SET_ROUTE + STOP_ROUTE), jackson(ad)).toString();
+ }
+ throw new SpRuntimeException("Could not stop adapter: " + ad.getAppId());
+ }
+
+ // GuessResource
+ public GuessSchema guess(String username, AdapterDescription ad) {
+ try {
+ LOG.info("Trying to guess schema: " + ad.getAppId());
+
+ Response resp = post(endpointFromDescription(username, ad, GUESS_ROUTE), jackson(ad));
+ HttpResponse httpResponse = resp.returnResponse();
+ String responseString = EntityUtils.toString(httpResponse.getEntity());
+ return JacksonSerializer.getObjectMapper().readValue(responseString, GuessSchema.class);
+ } catch (IOException e) {
+ e.printStackTrace();
+ }
+ throw new SpRuntimeException("Could not deserialize object");
+ }
+
+ // RuntimeResolvableResource
+ public String fetchConfigurations(String username, String appId, RuntimeOptionsRequest runtimeOptions) {
+ String endpoint = endpointFromStringRoute(username, RESOLVABLE_ROUTE.replace("{id}", appId));
+ LOG.info("Trying to fetch configurations at: " + endpoint);
+ return post(endpoint, jackson(runtimeOptions)).toString();
+ }
+
+ // AdapterResource
+ public byte[] assets(String username, String appId, String assetType, String subroute) {
+ String endpoint = "";
+ if ("adapter".equals(assetType)) {
+ if (subroute.isEmpty()) {
+ endpoint = endpointFromStringRoute(username, ADAPTER_ROUTE.replace("{id}", appId));
+ } else {
+ endpoint = endpointFromStringRoute(username, (ADAPTER_ROUTE.replace("{id}", appId) + subroute));
+ }
+ } else if ("protocol".equals(assetType)) {
+ if (subroute.isEmpty()) {
+ endpoint = endpointFromStringRoute(username, PROCOTOL_ROUTE.replace("{id}", appId));
+ } else {
+ endpoint = endpointFromStringRoute(username, (PROCOTOL_ROUTE.replace("{id}", appId) + subroute));
+ }
+ }
+ return get(endpoint);
+ }
+
+ // Helper methods
+ private Response post(String endpoint, String payload) {
+ try {
+ return Request.Post(endpoint)
+ .bodyString(payload, ContentType.APPLICATION_JSON)
+ .connectTimeout(CONNECT_TIMEOUT)
+ .socketTimeout(SOCKET_TIMEOUT)
+ .execute();
+ } catch (IOException e) {
+ e.printStackTrace();
+ }
+ throw new SpRuntimeException("Post request was not successful");
+ }
+
+ private byte[] get(String endpoint) {
+ try {
+ return Request.Get(endpoint)
+ .connectTimeout(CONNECT_TIMEOUT)
+ .socketTimeout(SOCKET_TIMEOUT)
+ .execute().returnContent().asBytes();
+ } catch (IOException e) {
+ e.printStackTrace();
+ }
+ throw new SpRuntimeException("Get request was not successful");
+ }
+
+ private <T extends AdapterDescription> String jackson(T ad) {
+ try {
+ return JacksonSerializer.getObjectMapper().writeValueAsString(ad);
+ } catch (JsonProcessingException e) {
+ e.printStackTrace();
+ }
+ throw new SpRuntimeException("Could not serialize object");
+ }
+
+ private <T extends UnnamedStreamPipesEntity> String jackson(T ad) {
+ try {
+ return JacksonSerializer.getObjectMapper().writeValueAsString(ad);
+ } catch (JsonProcessingException e) {
+ e.printStackTrace();
+ }
+ throw new SpRuntimeException("Could not serialize object");
+ }
+
+ private <T extends AdapterDescription> String endpointFromDescription(String username, T ad, String subroute) {
+ return workerUrl(ad) + addUserToBaseRoute(username) + subroute;
+ }
+
+ private String endpointFromStringRoute(String username, String subroute) {
+ return workerUrl() + addUserToBaseRoute(username) + subroute;
+ }
+
+ private String addUserToBaseRoute(String username) {
+ return CONNECT_WORKER_BASE_ROUTE.replace("{username}", username);
+ }
+
+
+ private String backendUrl() {
+ return HTTP_PROTOCOL + BACKEND_HOST + COLON + BACKEND_PORT;
+ }
+
+ private String workerUrl() {
+ return HTTP_PROTOCOL + CONNECT_WORKER_HOST + COLON + CONNECT_WORKER_PORT;
+ }
+
+ private <T extends AdapterDescription> String workerUrl(T ad) {
+ return HTTP_PROTOCOL + ad.getElementEndpointHostname() + COLON + ad.getElementEndpointPort();
+ }
+}
diff --cc streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/management/pe/InvocableElementManager.java
index d89fddd,0000000..692f3b4
mode 100644,000000..100644
--- a/streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/management/pe/InvocableElementManager.java
+++ b/streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/management/pe/InvocableElementManager.java
@@@ -1,188 -1,0 +1,186 @@@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.apache.streampipes.node.controller.container.management.pe;
+
- import com.fasterxml.jackson.core.JsonProcessingException;
- import com.google.gson.Gson;
- import org.apache.http.client.ClientProtocolException;
++import com.google.gson.JsonSyntaxException;
+import org.apache.http.client.fluent.Request;
- import org.apache.http.client.fluent.Response;
+import org.apache.http.entity.ContentType;
+import org.apache.http.entity.StringEntity;
+import org.apache.streampipes.container.model.node.InvocableRegistration;
++import org.apache.streampipes.model.Response;
+import org.apache.streampipes.node.controller.container.config.NodeControllerConfig;
+import org.apache.streampipes.node.controller.container.management.node.NodeManager;
+import org.apache.streampipes.serializers.json.JacksonSerializer;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.Collections;
+
+public class InvocableElementManager implements InvocableLifeCycle {
+
+ private static final Logger LOG =
+ LoggerFactory.getLogger(InvocableElementManager.class.getCanonicalName());
+
+ private static final String HTTP_PROTOCOL = "http://";
+ private static final String COLON = ":";
+ private static final String SLASH = "/";
+ private static final String ENV_CONSUL_LOCATION = "CONSUL_LOCATION";
+ private static final Integer CONNECT_TIMEOUT = 10000;
+ private static InvocableElementManager instance = null;
+
+ private InvocableElementManager() {}
+
+ public static InvocableElementManager getInstance() {
+ if (instance == null) {
+ synchronized (InvocableElementManager.class) {
+ if (instance == null)
+ instance = new InvocableElementManager();
+ }
+ }
+ return instance;
+ }
+
+ @Override
+ public void register(InvocableRegistration registration) {
+ try {
+ Request.Put(makeConsulRegistrationEndpoint())
+ .addHeader("accept", "application/json")
+ .body(new StringEntity(JacksonSerializer
+ .getObjectMapper()
+ .writeValueAsString(registration.getConsulServiceRegistrationBody())))
+ .execute();
+
+ // TODO: persistent storage to survive failures
+ NodeManager.getInstance()
+ .retrieveNodeInfoDescription()
+ .setSupportedElements(registration.getSupportedPipelineElementAppIds());
+
+ String url = "http://"
+ + NodeControllerConfig.INSTANCE.getBackendHost()
+ + ":"
+ + NodeControllerConfig.INSTANCE.getBackendPort()
+ + "/"
+ + "streampipes-backend/api/v2/users/admin@streampipes.org/nodes"
+ + "/"
+ + NodeControllerConfig.INSTANCE.getNodeControllerId();
+
+ String desc = JacksonSerializer.getObjectMapper()
+ .writeValueAsString(NodeManager.getInstance().retrieveNodeInfoDescription());
+
+ Request.Put(url)
+ .bodyString(desc, ContentType.APPLICATION_JSON)
+// .connectTimeout(1000)
+// .socketTimeout(100000)
+ .execute();
+
+ LOG.info("Successfully registered pipeline element container");
+ } catch (IOException e) {
+ e.printStackTrace();
+ }
+ }
+
+ @Override
- public org.apache.streampipes.model.Response invoke(String endpoint, String payload) {
++ public Response invoke(String endpoint, String payload) {
+ LOG.info("Invoke pipeline element: {}", endpoint);
+ try {
- Response httpResp = Request
++ org.apache.http.client.fluent.Response httpResp = Request
+ .Post(endpoint)
+ .bodyString(payload, ContentType.APPLICATION_JSON)
+ .connectTimeout(CONNECT_TIMEOUT)
+ .execute();
-
- return new Gson().fromJson(httpResp.returnContent().asString(),
- org.apache.streampipes.model.Response.class);
-
++ return handleResponse(httpResp);
+ } catch (Exception e) {
+ LOG.error(e.getMessage());
+ }
+ throw new RuntimeException("Failed to invoke pipeline element: " + endpoint);
+ }
+
+ @Override
- public String detach(String endpoint) {
++ public Response detach(String endpoint) {
+ LOG.info("Detach pipeline element: {}", endpoint);
+ try {
- Response httpResp = Request
++ org.apache.http.client.fluent.Response httpResp = Request
+ .Delete(endpoint)
+ .connectTimeout(CONNECT_TIMEOUT)
+ .execute();
-
- String resp = httpResp.returnContent().asString();
- org.apache.streampipes.model.Response streamPipesResp = new Gson().fromJson(resp,
- org.apache.streampipes.model.Response.class);
-
- return streamPipesResp.toString();
++ return handleResponse(httpResp);
+ } catch (Exception e) {
+ LOG.error(e.getMessage());
+ }
+ throw new IllegalArgumentException("Failed to detach pipeline element: " + endpoint);
+ }
+
+ @Override
+ public void unregister(){
+ // TODO: unregister element from Consul and
+ NodeManager.getInstance()
+ .retrieveNodeInfoDescription()
+ .setSupportedElements(Collections.emptyList());
+
+ String url = "http://"
+ + NodeControllerConfig.INSTANCE.getBackendHost()
+ + ":"
+ + NodeControllerConfig.INSTANCE.getBackendPort()
+ + "/"
+ + "streampipes-backend/api/v2/users/admin@streampipes.org/nodes"
+ + "/"
+ + NodeControllerConfig.INSTANCE.getNodeControllerId();
+
+ try {
+ String desc = JacksonSerializer.getObjectMapper()
+ .writeValueAsString(NodeManager.getInstance().retrieveNodeInfoDescription());
+
+ Request.Put(url)
+ .bodyString(desc, ContentType.APPLICATION_JSON)
+ .connectTimeout(1000)
+ .socketTimeout(100000)
+ .execute();
+
+ } catch (IOException e) {
+ e.printStackTrace();
+ }
+ }
+
++ private Response handleResponse(org.apache.http.client.fluent.Response httpResp) throws JsonSyntaxException,
++ IOException {
++ String resp = httpResp.returnContent().asString();
++ return JacksonSerializer
++ .getObjectMapper()
++ .readValue(resp, Response.class);
++ }
++
+ private String makeConsulRegistrationEndpoint() {
+ if (System.getenv(ENV_CONSUL_LOCATION) != null) {
+ return HTTP_PROTOCOL
+ + System.getenv(ENV_CONSUL_LOCATION)
+ + COLON
+ + "8500"
+ + SLASH
+ + "v1/agent/service/register";
+ } else {
+ return HTTP_PROTOCOL
+ + "localhost"
+ + COLON
+ + "8500"
+ + SLASH
+ + "v1/agent/service/register";
+ }
+ }
+
+}
diff --cc streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/management/pe/InvocableLifeCycle.java
index 2782cce,c5dce8d..168d998
--- a/streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/management/pe/InvocableLifeCycle.java
+++ b/streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/management/pe/InvocableLifeCycle.java
@@@ -15,19 -15,32 +15,19 @@@
* limitations under the License.
*
*/
+package org.apache.streampipes.node.controller.container.management.pe;
-@import '../../../../scss/sp/sp-dialog.scss';
+import org.apache.streampipes.container.model.node.InvocableRegistration;
+import org.apache.streampipes.model.Response;
-.customize-section {
- display:flex;
- flex: 1 1 auto;
- padding: 20px;
-}
+public interface InvocableLifeCycle {
-.padding-20 {
- padding: 20px;
-}
+ void register(InvocableRegistration registration);
-.mb-10 {
- margin-bottom: 10px;
-}
+ Response invoke(String endpoint, String payload);
- String detach(String runningInstanceId);
-::ng-deep .pipeline-radio-group .mat-radio-label {
- padding: 0;
-}
++ Response detach(String runningInstanceId);
-.status-text {
- font-size: 14pt;
- margin-top:10px;
-}
+ void unregister();
-.status-subtext {
- font-size: 12pt;
-}
+}
diff --cc streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/management/relay/DataStreamRelayManager.java
index 0000000,0000000..ccfa9c6
new file mode 100644
--- /dev/null
+++ b/streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/management/relay/DataStreamRelayManager.java
@@@ -1,0 -1,0 +1,109 @@@
++/*
++ * Licensed to the Apache Software Foundation (ASF) under one or more
++ * contributor license agreements. See the NOTICE file distributed with
++ * this work for additional information regarding copyright ownership.
++ * The ASF licenses this file to You under the Apache License, Version 2.0
++ * (the "License"); you may not use this file except in compliance with
++ * the License. You may obtain a copy of the License at
++ *
++ * http://www.apache.org/licenses/LICENSE-2.0
++ *
++ * Unless required by applicable law or agreed to in writing, software
++ * distributed under the License is distributed on an "AS IS" BASIS,
++ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
++ * See the License for the specific language governing permissions and
++ * limitations under the License.
++ *
++ */
++package org.apache.streampipes.node.controller.container.management.relay;
++
++import com.fasterxml.jackson.core.JsonProcessingException;
++import org.apache.streampipes.model.Response;
++import org.apache.streampipes.model.SpDataStreamRelay;
++import org.apache.streampipes.model.SpDataStreamRelayContainer;
++import org.apache.streampipes.model.graph.DataProcessorInvocation;
++import org.apache.streampipes.model.grounding.TransportProtocol;
++import org.apache.streampipes.node.controller.container.management.relay.metrics.RelayMetrics;
++
++import java.util.HashMap;
++import java.util.List;
++import java.util.Map;
++import java.util.stream.Collectors;
++
++public class DataStreamRelayManager {
++
++ private static DataStreamRelayManager instance = null;
++
++ private DataStreamRelayManager() {}
++
++ public static DataStreamRelayManager getInstance() {
++ if (instance == null) {
++ synchronized (DataStreamRelayManager.class) {
++ if (instance == null)
++ instance = new DataStreamRelayManager();
++ }
++ }
++ return instance;
++ }
++
++ public Response startAdapterDataStreamRelay(SpDataStreamRelayContainer desc) {
++ String strategy = desc.getEventRelayStrategy();
++ String runningInstanceId = desc.getRunningStreamRelayInstanceId();
++ TransportProtocol source = desc.getInputGrounding().getTransportProtocol();
++
++ Map<String, EventRelay> eventRelayMap = new HashMap<>();
++
++ desc.getOutputStreamRelays().forEach(r -> {
++ TransportProtocol target = r.getEventGrounding().getTransportProtocol();
++ EventRelay eventRelay = new EventRelay(source, target, strategy);
++ eventRelay.start();
++ eventRelayMap.put(r.getElementId(), eventRelay);
++ });
++ RunningRelayInstances.INSTANCE.add(desc.getRunningStreamRelayInstanceId(), eventRelayMap);
++ return new Response(runningInstanceId,true,"");
++ }
++
++ public Response stopAdapterDataStreamRelay(String id) {
++ Map<String, EventRelay> relay = RunningRelayInstances.INSTANCE.get(id);
++ if (relay != null) {
++ relay.values().forEach(EventRelay::stop);
++ }
++ RunningRelayInstances.INSTANCE.remove(id);
++ return new Response(id, true, "");
++ }
++
++ public void startPipelineElementDataStreamRelay(DataProcessorInvocation graph) {
++ TransportProtocol source = graph
++ .getOutputStream()
++ .getEventGrounding()
++ .getTransportProtocol();
++
++ String strategy = graph.getEventRelayStrategy();
++ Map<String, EventRelay> eventRelayMap = new HashMap<>();
++
++ List<SpDataStreamRelay> dataStreamRelays = graph.getOutputStreamRelays();
++ dataStreamRelays.forEach(r -> {
++ TransportProtocol target = r.getEventGrounding().getTransportProtocol();
++ EventRelay eventRelay = new EventRelay(source, target, strategy);
++ eventRelay.start();
++ eventRelayMap.put(r.getElementId(), eventRelay);
++ });
++ RunningRelayInstances.INSTANCE.add(graph.getDeploymentRunningInstanceId(), eventRelayMap);
++ }
++
++ public void stopPipelineElementDataStreamRelay(String id) {
++ // Stop relay for invocable if existing
++ Map<String, EventRelay> relay = RunningRelayInstances.INSTANCE.get(id);
++ if (relay != null) {
++ relay.values().forEach(EventRelay::stop);
++ }
++ RunningRelayInstances.INSTANCE.remove(id);
++ }
++
++ public List<RelayMetrics> getAllRelays() {
++ return RunningRelayInstances.INSTANCE.getRunningInstances()
++ .stream()
++ .map(EventRelay::getRelayMetrics)
++ .collect(Collectors.toList());
++ }
++}
diff --cc streampipes-pipeline-management/pom.xml
index 34d43de,d9444f2..5cf9721
--- a/streampipes-pipeline-management/pom.xml
+++ b/streampipes-pipeline-management/pom.xml
@@@ -99,6 -94,11 +99,16 @@@
</dependency>
<dependency>
<groupId>org.apache.streampipes</groupId>
+ <artifactId>streampipes-serializers-json</artifactId>
+ <version>0.68.0-SNAPSHOT</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.streampipes</groupId>
++ <artifactId>streampipes-serializers-jsonld</artifactId>
++ <version>0.68.0-SNAPSHOT</version>
++ </dependency>
++ <dependency>
++ <groupId>org.apache.streampipes</groupId>
<artifactId>streampipes-storage-management</artifactId>
<version>0.68.0-SNAPSHOT</version>
</dependency>
diff --cc streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/http/HttpRequestBuilder.java
index f6706eb,2077607..35509cb
--- a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/http/HttpRequestBuilder.java
+++ b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/http/HttpRequestBuilder.java
@@@ -24,23 -22,18 +24,23 @@@ import com.google.gson.JsonSyntaxExcept
import org.apache.http.client.fluent.Request;
import org.apache.http.client.fluent.Response;
import org.apache.http.entity.ContentType;
+import org.apache.streampipes.commons.Utils;
+import org.apache.streampipes.model.base.InvocableStreamPipesEntity;
import org.apache.streampipes.model.base.NamedStreamPipesEntity;
import org.apache.streampipes.model.pipeline.PipelineElementStatus;
--import org.apache.streampipes.serializers.json.JacksonSerializer;
- import org.apache.streampipes.serializers.jsonld.JsonLdTransformer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
++import org.apache.streampipes.serializers.json.JacksonSerializer;
++import org.apache.streampipes.serializers.jsonld.JsonLdTransformer;
import java.io.IOException;
public class HttpRequestBuilder {
- private NamedStreamPipesEntity payload;
- private String endpointUrl;
+ private final NamedStreamPipesEntity payload;
- private final String belongsTo;
++ private final String endpointUrl;
+
+ private static final Integer CONNECT_TIMEOUT = 10000;
private final static Logger LOG = LoggerFactory.getLogger(HttpRequestBuilder.class);
@@@ -50,20 -43,11 +50,18 @@@
}
public PipelineElementStatus invoke() {
- LOG.info("Invoking element: " + belongsTo);
try {
- String json;
- String jsonDocument = toJson();
- Response httpResp =
- Request.Post(belongsTo).bodyString(jsonDocument, ContentType.APPLICATION_JSON).connectTimeout(10000).execute();
+ if (payload instanceof InvocableStreamPipesEntity) {
+ LOG.info("Invoking pipeline element: " + endpointUrl);
- json = jsonLd();
+ } else {
+ LOG.info("Invoking data stream relay: " + endpointUrl);
- json = jackson();
+ }
++ String json = toJson();
+ Response httpResp = Request
+ .Post(endpointUrl)
+ .bodyString(json, ContentType.APPLICATION_JSON)
+ .connectTimeout(CONNECT_TIMEOUT)
+ .execute();
return handleResponse(httpResp);
} catch (Exception e) {
LOG.error(e.getMessage());
@@@ -72,21 -56,12 +70,20 @@@
}
public PipelineElementStatus detach() {
+ if (payload instanceof InvocableStreamPipesEntity) {
+ LOG.info("Detaching pipeline element: " + endpointUrl);
+ } else {
+ LOG.info("Detaching data stream relay: " + endpointUrl);
+ }
-
try {
- Response httpResp = Request.Delete(belongsTo).connectTimeout(10000).execute();
+ Response httpResp = Request
+ .Delete(endpointUrl)
+ .connectTimeout(CONNECT_TIMEOUT)
+ .execute();
return handleResponse(httpResp);
} catch (Exception e) {
- LOG.error("Could not stop pipeline " + belongsTo, e.getMessage());
- return new PipelineElementStatus(belongsTo, payload.getName(), false, e.getMessage());
+ LOG.error("Could not stop pipeline " + endpointUrl, e.getMessage());
+ return new PipelineElementStatus(endpointUrl, payload.getName(), false, e.getMessage());
}
}
@@@ -96,15 -73,11 +95,11 @@@
return convert(streamPipesResp);
}
- private String jsonLd() throws Exception {
- return Utils.asString(new JsonLdTransformer().toJsonLd(payload));
- }
-
- private String toJson() throws Exception {
- return JacksonSerializer.getObjectMapper().writeValueAsString(payload);
+ private PipelineElementStatus convert(org.apache.streampipes.model.Response response) {
+ return new PipelineElementStatus(endpointUrl, payload.getName(), response.isSuccess(), response.getOptionalMessage());
}
- private String jackson() throws JsonProcessingException {
- private PipelineElementStatus convert(org.apache.streampipes.model.Response response) {
- return new PipelineElementStatus(belongsTo, payload.getName(), response.isSuccess(), response.getOptionalMessage());
++ private String toJson() throws Exception {
+ return JacksonSerializer.getObjectMapper().writeValueAsString(payload);
}
}
diff --cc streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/http/InvocableEntityUrlGenerator.java
index 814f026,0000000..1b4cb35
mode 100644,000000..100644
--- a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/http/InvocableEntityUrlGenerator.java
+++ b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/http/InvocableEntityUrlGenerator.java
@@@ -1,105 -1,0 +1,106 @@@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.apache.streampipes.manager.execution.http;
+
+import org.apache.streampipes.config.consul.ConsulSpConfig;
+import org.apache.streampipes.container.util.ConsulUtil;
+import org.apache.streampipes.model.base.InvocableStreamPipesEntity;
+import org.apache.streampipes.model.graph.DataProcessorInvocation;
+
+public class InvocableEntityUrlGenerator extends EndpointUrlGenerator<InvocableStreamPipesEntity> {
+
++ private static final String DATA_PROCESSOR_PREFIX = "sepa";
++ private static final String DATA_SINK_PREFIX = "sec";
+ private static final String DEFAULT_TARGET_NODE_ID = "default";
- private static final String INVOKE_ROUTE = "api/v2/node/container/invoke";
- private static final String DETACH_ROUTE = "api/v2/node/container/detach";
++ private static final String ELEMENT_ROUTE = "api/v2/node/element";
+
+ public InvocableEntityUrlGenerator(InvocableStreamPipesEntity graph) {
+ super(graph);
+ }
+
+ @Override
+ public String generateInvokeEndpoint() {
+ if (isDefaultTarget()) {
+ // default deployments to primary pipeline element
+ return getDefaultEndpoint();
+ } else {
+ // edge deployments to secondary pipeline element
- return getDeploymentTargetEndpoint(INVOKE_ROUTE);
++ return getDeploymentTargetEndpoint(ELEMENT_ROUTE);
+ }
+ }
+
+ @Override
+ public String generateDetachEndpoint() {
+ if (isDefaultTarget()) {
+ // detach primary pipeline element
+ return getDefaultEndpoint() + SLASH + graph.getDeploymentRunningInstanceId();
+ } else {
+ // detach edge deployments to secondary pipeline element
- return getDeploymentTargetEndpoint(DETACH_ROUTE) + SLASH + graph.getDeploymentRunningInstanceId();
++ return getDeploymentTargetEndpoint(ELEMENT_ROUTE) + SLASH + graph.getDeploymentRunningInstanceId();
+ }
+ }
+
+ // Helper methods
+
+ private boolean isDefaultTarget() {
+ return graph.getDeploymentTargetNodeId() == null ||
+ graph.getDeploymentTargetNodeId().equals(DEFAULT_TARGET_NODE_ID);
+ }
+
+ private String getDefaultEndpoint() {
+ return graph.getBelongsTo();
+ }
+
+ private String getDeploymentTargetEndpoint(String route) {
+ modifyInvocableElement();
+ return HTTP_PROTOCOL + graph.getDeploymentTargetNodeHostname() + COLON + graph.getDeploymentTargetNodePort()
+ + SLASH
+ + route
+ + SLASH
+ + getIdentifier()
+ + SLASH
+ + graph.getAppId();
+ }
+
+ private void modifyInvocableElement() {
+ // Necessary because secondary pipeline element description is not stored in backend
+ // It uses information from primary pipeline element. Node controller will locally forward
+ // request accordingly, thus fields must be correct.
+ String route = ConsulSpConfig.SERVICE_ROUTE_PREFIX
+ + graph.getElementEndpointServiceName()
+ + SLASH
+ + ConsulSpConfig.BASE_PREFIX
+ + SLASH
+ + ConsulSpConfig.SECONDARY_NODE_KEY
+ + SLASH
+ + graph.getDeploymentTargetNodeId()
+ + SLASH;
+
+ String host = ConsulUtil.getValueForRoute(route + "SP_HOST", String.class);
+ int port = ConsulUtil.getValueForRoute(route + "SP_PORT", Integer.class);
+ graph.setElementEndpointHostname(host);
+ graph.setElementEndpointPort(port);
+ graph.setBelongsTo(HTTP_PROTOCOL + host + COLON + port + SLASH + getIdentifier() + SLASH + graph.getAppId());
+ graph.setElementId(graph.getBelongsTo() + SLASH + graph.getDeploymentRunningInstanceId());
+ }
+
+ private String getIdentifier() {
- return graph instanceof DataProcessorInvocation ? "sepa" : "sec";
++ return graph instanceof DataProcessorInvocation ? DATA_PROCESSOR_PREFIX : DATA_SINK_PREFIX;
+ }
+
+}
diff --cc streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/node/NodeClusterManager.java
index c49a53f,0000000..d37e23a
mode 100644,000000..100644
--- a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/node/NodeClusterManager.java
+++ b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/node/NodeClusterManager.java
@@@ -1,84 -1,0 +1,84 @@@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.apache.streampipes.manager.node;
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+import org.apache.http.client.ClientProtocolException;
+import org.apache.http.client.fluent.Request;
+import org.apache.http.entity.ContentType;
+import org.apache.streampipes.model.node.NodeInfoDescription;
+import org.apache.streampipes.serializers.json.JacksonSerializer;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+
+public enum NodeClusterManager {
+ INSTANCE;
+
+
+ private static final Logger LOG =
+ LoggerFactory.getLogger(NodeClusterManager.class.getCanonicalName());
+
+
+ public boolean updateNodeInfoDescription(NodeInfoDescription desc) {
+ boolean successfullyUpdated = false;
+ try {
+ String body = JacksonSerializer.getObjectMapper().writeValueAsString(desc);
+ String url = makeNodeControllerEndpoint(desc);
+
+ LOG.info("Trying to update description for node controller: " + url);
+
+ boolean connected = false;
+ while (!connected) {
+ connected = put(url, body);
+
+ if (!connected) {
+ LOG.info("Retrying in 5 seconds");
+ try {
+ Thread.sleep(5000);
+ } catch (InterruptedException e) {
+ e.printStackTrace();
+ }
+ }
+ }
+ successfullyUpdated = true;
+ } catch (IOException e) {
+ e.printStackTrace();
+ }
+ return successfullyUpdated;
+ }
+
+ private String makeNodeControllerEndpoint(NodeInfoDescription desc) {
- return "http://" + desc.getHostname() + ":" + desc.getPort() + "/api/v2/node/update";
++ return "http://" + desc.getHostname() + ":" + desc.getPort() + "/api/v2/node/info";
+ }
+
+ private boolean put(String url, String body) {
+ try {
+ Request.Put(url)
+ .bodyString(body, ContentType.APPLICATION_JSON)
+ .connectTimeout(1000)
+ .socketTimeout(100000)
+ .execute();
+ return true;
+ } catch (IOException e) {
+ e.printStackTrace();
+ }
+ return false;
+ }
+}
diff --cc ui/package.json
index 31a9149,3808ec0..40c64e2
--- a/ui/package.json
+++ b/ui/package.json
@@@ -36,11 -36,11 +36,11 @@@
"@ngx-loading-bar/core": "5.1.0",
"@ngx-loading-bar/http-client": "5.1.0",
"@stomp/ng2-stompjs": "7.2.0",
- "@swimlane/ngx-charts": "13.0.2",
- "angular-datatables": "9.0.2",
+ "@swimlane/ngx-charts": "16.0.0",
+ "angular-datatables": "^10.0.0",
"angular-gridster2": "8.3.0",
"angular-loading-bar": "0.8.0",
- "angular-material-icons": "0.4.0",
+ "angular-material-icons": "^0.4.0",
"angular-plotly.js": "1.5.0",
"angular-tree-component": "8.5.6",
"angular-ui-tree": "2.9.0",
diff --cc ui/src/app/platform-services/platform.module.ts
index b09c0ad,85962e5..bc6f440
--- a/ui/src/app/platform-services/platform.module.ts
+++ b/ui/src/app/platform-services/platform.module.ts
@@@ -22,7 -22,7 +22,8 @@@ import {PipelineService} from "./apis/p
import {PlatformServicesCommons} from "./apis/commons.service";
import {PipelineElementEndpointService} from "./apis/pipeline-element-endpoint.service";
import {FilesService} from "./apis/files.service";
+import {NodeService} from "./apis/node.service";
+ import {MeasurementUnitsService} from "./apis/measurement-units.service";
@NgModule({
imports: [],