You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@streampipes.apache.org by wi...@apache.org on 2021/03/05 10:27:43 UTC
[incubator-streampipes] branch edge-extensions updated: add sinks
to be hosted on edge nodes, add mapDB file persistence to nodectlr,
fix rollback
This is an automated email from the ASF dual-hosted git repository.
wiener pushed a commit to branch edge-extensions
in repository https://gitbox.apache.org/repos/asf/incubator-streampipes.git
The following commit(s) were added to refs/heads/edge-extensions by this push:
new 04c97b6 add sinks to be hosted on edge nodes, add mapDB file persistence to nodectlr, fix rollback
04c97b6 is described below
commit 04c97b64035b09af570f22dbba53f7dc9467a591
Author: Patrick Wiener <wi...@fzi.de>
AuthorDate: Fri Mar 5 11:27:17 2021 +0100
add sinks to be hosted on edge nodes, add mapDB file persistence to nodectlr, fix rollback
---
.../extensions/ExtensionsModelSubmitter.java | 19 ++++--
.../standalone/init/StandaloneModelSubmitter.java | 16 ++++-
.../container/util/NodeControllerUtil.java | 13 ++--
streampipes-node-controller-container/pom.xml | 12 ++--
...ource.java => ContainerDeploymentResource.java} | 2 +-
.../container/api/HealthCheckResource.java | 2 +-
.../container/api/InvocableEntityResource.java | 1 -
.../api/NodeControllerResourceConfig.java | 2 +-
.../container/config/NodeControllerConfig.java | 13 ++--
...RunningInstances.java => RunningInstances.java} | 2 +-
.../orchestrator/RunningContainerInstances.java | 20 +++---
.../management/pe/RunningInvocableInstances.java | 22 ++++---
.../management/relay/RunningRelayInstances.java | 5 +-
.../controller/container/storage/CRUDStorage.java | 15 +++--
.../controller/container/storage/MapDBImpl.java | 73 ++++++++++++++++++++++
.../manager/execution/http/ElementSubmitter.java | 32 +++++++++-
.../manager/execution/http/GraphSubmitter.java | 2 +-
.../pipeline/PipelineMigrationExecutor.java | 15 +++--
.../execution/pipeline/migration/Command.java | 23 -------
.../migration/PipelineElementStopCommand.java | 30 ---------
.../pipeline/migration/RelayStartCommand.java | 30 ---------
.../pipeline/migration/RelayStopCommand.java | 30 ---------
.../manager/node/AbstractClusterManager.java | 72 ++++++++++++++++-----
.../manager/node/NodeClusterManager.java | 14 +++--
.../streampipes/manager/node/NodeSyncOptions.java | 2 +-
.../node-configuration-details.component.html | 7 ---
.../node-configuration-details.component.ts | 34 ++++++++--
27 files changed, 299 insertions(+), 209 deletions(-)
diff --git a/streampipes-container-extensions/src/main/java/org/apache/streampipes/container/extensions/ExtensionsModelSubmitter.java b/streampipes-container-extensions/src/main/java/org/apache/streampipes/container/extensions/ExtensionsModelSubmitter.java
index 967519c..8d45a65 100644
--- a/streampipes-container-extensions/src/main/java/org/apache/streampipes/container/extensions/ExtensionsModelSubmitter.java
+++ b/streampipes-container-extensions/src/main/java/org/apache/streampipes/container/extensions/ExtensionsModelSubmitter.java
@@ -52,9 +52,8 @@ import org.springframework.context.annotation.Import;
import javax.annotation.PreDestroy;
import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.List;
+import java.util.*;
+import java.util.stream.Collectors;
@Configuration
@EnableAutoConfiguration
@@ -99,7 +98,7 @@ public abstract class ExtensionsModelSubmitter extends ModelSubmitter<EdgeExtens
conf.getId(),
conf.getHost(),
conf.getPort(),
- DeclarersSingleton.getInstance().getEpaDeclarers());
+ generateSupportedAppIds());
String nodeControllerUrl = PROTOCOL + conf.getNodeControllerHost() + COLON + conf.getNodeControllerPort();
@@ -149,6 +148,18 @@ public abstract class ExtensionsModelSubmitter extends ModelSubmitter<EdgeExtens
}
}
+ private List<String> generateSupportedAppIds() {
+ List<String> supportedAppIds = new ArrayList<>();
+
+ List<String> dataProcessorsAppIds = new ArrayList<>(DeclarersSingleton.getInstance().getEpaDeclarers().keySet());
+ List<String> dataSinksAppIds = new ArrayList<>(DeclarersSingleton.getInstance().getConsumerDeclarers().keySet());
+
+ supportedAppIds.addAll(dataProcessorsAppIds);
+ supportedAppIds.addAll(dataSinksAppIds);
+
+ return supportedAppIds;
+ }
+
private ConnectWorkerContainer getContainerDescription(String connectWorkerHost, int connectWorkerPort,
boolean runsOnEdgeNode) {
diff --git a/streampipes-container-standalone/src/main/java/org/apache/streampipes/container/standalone/init/StandaloneModelSubmitter.java b/streampipes-container-standalone/src/main/java/org/apache/streampipes/container/standalone/init/StandaloneModelSubmitter.java
index 2fa53ae..f8e25ce 100644
--- a/streampipes-container-standalone/src/main/java/org/apache/streampipes/container/standalone/init/StandaloneModelSubmitter.java
+++ b/streampipes-container-standalone/src/main/java/org/apache/streampipes/container/standalone/init/StandaloneModelSubmitter.java
@@ -32,7 +32,9 @@ import org.apache.streampipes.container.init.RunningInstances;
import org.apache.streampipes.container.model.PeConfig;
import org.apache.streampipes.container.util.ConsulUtil;
+import java.util.ArrayList;
import java.util.Collections;
+import java.util.List;
import javax.annotation.PreDestroy;
@@ -65,7 +67,7 @@ public abstract class StandaloneModelSubmitter extends ModelSubmitter<PeConfig>
peConfig.getId(),
peConfig.getHost(),
peConfig.getPort(),
- DeclarersSingleton.getInstance().getEpaDeclarers());
+ generateSupportedAppIds());
} else {
// primary
ConsulUtil.registerPeService(
@@ -75,6 +77,18 @@ public abstract class StandaloneModelSubmitter extends ModelSubmitter<PeConfig>
}
}
+ private List<String> generateSupportedAppIds() {
+ List<String> supportedAppIds = new ArrayList<>();
+
+ List<String> dataProcessorsAppIds = new ArrayList<>(DeclarersSingleton.getInstance().getEpaDeclarers().keySet());
+ List<String> dataSinksAppIds = new ArrayList<>(DeclarersSingleton.getInstance().getConsumerDeclarers().keySet());
+
+ supportedAppIds.addAll(dataProcessorsAppIds);
+ supportedAppIds.addAll(dataSinksAppIds);
+
+ return supportedAppIds;
+ }
+
@PreDestroy
public void onExit() {
LOG.info("Shutting down StreamPipes pipeline element container...");
diff --git a/streampipes-container/src/main/java/org/apache/streampipes/container/util/NodeControllerUtil.java b/streampipes-container/src/main/java/org/apache/streampipes/container/util/NodeControllerUtil.java
index be7ee90..497d537 100644
--- a/streampipes-container/src/main/java/org/apache/streampipes/container/util/NodeControllerUtil.java
+++ b/streampipes-container/src/main/java/org/apache/streampipes/container/util/NodeControllerUtil.java
@@ -44,19 +44,18 @@ public class NodeControllerUtil {
private static final String NODE_CONTROLLER_CONTAINER_HOST = "SP_NODE_CONTROLLER_CONTAINER_HOST";
private static final String NODE_CONTROLLER_CONTAINER_PORT = "SP_NODE_CONTROLLER_CONTAINER_PORT";
- public static void register(String serviceID, String host, int port,
- Map<String, SemanticEventProcessingAgentDeclarer> epaDeclarers) {
+ public static void register(String serviceID, String host, int port, List<String> supportedAppIds) {
register(PE_TAG, makeSvcId(host, serviceID), host, port,
- Arrays.asList(PE_TAG, SECONDARY_PE_IDENTIFIER_TAG), epaDeclarers);
+ Arrays.asList(PE_TAG, SECONDARY_PE_IDENTIFIER_TAG), supportedAppIds);
}
public static void register(String svcName, String svcId, String host, int port, List<String> tag,
- Map<String, SemanticEventProcessingAgentDeclarer> epaDeclarers) {
+ List<String> supportedAppIds) {
boolean connected = false;
while (!connected) {
LOG.info("Trying to register pipeline element container at node controller: " + makeRegistrationEndpoint());
- String body = createSvcBody(svcName, svcId, host, port, tag, epaDeclarers);
+ String body = createSvcBody(svcName, svcId, host, port, tag, supportedAppIds);
connected = registerSvcHttpClient(body);
if (!connected) {
@@ -87,7 +86,7 @@ public class NodeControllerUtil {
}
private static String createSvcBody(String name, String id, String host, int port, List<String> tags,
- Map<String, SemanticEventProcessingAgentDeclarer> epaDeclarers) {
+ List<String> supportedAppIds) {
try {
ConsulServiceRegistrationBody body = new ConsulServiceRegistrationBody();
String healthCheckURL = HTTP_PROTOCOL + host + COLON + port;
@@ -101,7 +100,7 @@ public class NodeControllerUtil {
InvocableRegistration svcBody = new InvocableRegistration();
svcBody.setConsulServiceRegistrationBody(body);
- svcBody.setSupportedPipelineElementAppIds(new ArrayList<>(epaDeclarers.keySet()));
+ svcBody.setSupportedPipelineElementAppIds(supportedAppIds);
return JacksonSerializer.getObjectMapper().writeValueAsString(svcBody);
} catch (JsonProcessingException e) {
diff --git a/streampipes-node-controller-container/pom.xml b/streampipes-node-controller-container/pom.xml
index d5e0f49..8b8b60f 100644
--- a/streampipes-node-controller-container/pom.xml
+++ b/streampipes-node-controller-container/pom.xml
@@ -76,13 +76,6 @@
</dependency>
<!--external dependencies-->
- <!-- parse yaml config -->
-<!-- <dependency>-->
-<!-- <groupId>org.yaml</groupId>-->
-<!-- <artifactId>snakeyaml</artifactId>-->
-<!-- <version>1.21</version>-->
-<!-- </dependency>-->
- <!-- docker client for java -->
<dependency>
<groupId>com.spotify</groupId>
<artifactId>docker-client</artifactId>
@@ -135,6 +128,11 @@
<groupId>com.fasterxml.jackson.module</groupId>
<artifactId>jackson-module-jaxb-annotations</artifactId>
</dependency>
+ <dependency>
+ <groupId>org.mapdb</groupId>
+ <artifactId>mapdb</artifactId>
+ <version>3.0.8</version>
+ </dependency>
</dependencies>
<build>
diff --git a/streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/api/ContainerResource.java b/streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/api/ContainerDeploymentResource.java
similarity index 96%
rename from streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/api/ContainerResource.java
rename to streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/api/ContainerDeploymentResource.java
index 21733ef..b6899df 100644
--- a/streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/api/ContainerResource.java
+++ b/streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/api/ContainerDeploymentResource.java
@@ -25,7 +25,7 @@ import javax.ws.rs.*;
import javax.ws.rs.core.MediaType;
@Path("/api/v2/node/container")
-public class ContainerResource extends AbstractResource {
+public class ContainerDeploymentResource extends AbstractResource {
@GET
@Produces(MediaType.APPLICATION_JSON)
diff --git a/streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/api/HealthCheckResource.java b/streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/api/HealthCheckResource.java
index 8c593a4..6a4f88e 100644
--- a/streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/api/HealthCheckResource.java
+++ b/streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/api/HealthCheckResource.java
@@ -25,7 +25,7 @@ import javax.ws.rs.Produces;
import javax.ws.rs.core.MediaType;
import javax.ws.rs.core.Response;
-@Path("/")
+@Path("/healthy")
public class HealthCheckResource extends AbstractResource {
@GET
diff --git a/streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/api/InvocableEntityResource.java b/streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/api/InvocableEntityResource.java
index 6028438..03a0a4f 100644
--- a/streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/api/InvocableEntityResource.java
+++ b/streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/api/InvocableEntityResource.java
@@ -25,7 +25,6 @@ import org.apache.streampipes.model.base.InvocableStreamPipesEntity;
import org.apache.streampipes.node.controller.container.management.pe.InvocableElementManager;
import org.apache.streampipes.node.controller.container.management.pe.RunningInvocableInstances;
import org.apache.streampipes.rest.shared.annotation.JacksonSerialized;
-import org.apache.streampipes.serializers.json.JacksonSerializer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
diff --git a/streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/api/NodeControllerResourceConfig.java b/streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/api/NodeControllerResourceConfig.java
index ac277e8..893bcb3 100644
--- a/streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/api/NodeControllerResourceConfig.java
+++ b/streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/api/NodeControllerResourceConfig.java
@@ -29,6 +29,6 @@ public class NodeControllerResourceConfig extends ResourceConfig {
register(InvocableEntityResource.class);
register(DataStreamRelayResource.class);
register(ConnectResource.class);
- register(ContainerResource.class);
+ register(ContainerDeploymentResource.class);
}
}
diff --git a/streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/config/NodeControllerConfig.java b/streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/config/NodeControllerConfig.java
index dc780a4..9937254 100644
--- a/streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/config/NodeControllerConfig.java
+++ b/streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/config/NodeControllerConfig.java
@@ -20,8 +20,10 @@ package org.apache.streampipes.node.controller.container.config;
import org.apache.streampipes.config.SpConfig;
import org.apache.streampipes.model.node.resources.fielddevice.FieldDeviceAccessResource;
+import java.util.Arrays;
import java.util.List;
import java.util.stream.Collectors;
+import java.util.stream.Stream;
public enum NodeControllerConfig {
INSTANCE;
@@ -93,7 +95,6 @@ public enum NodeControllerConfig {
String.class);
}
- // TODO: should be flexibly set due to node broker technology used
public int getNodeBrokerPort() {
return getEnvOrDefault(
ConfigKeys.NODE_BROKER_CONTAINER_PORT,
@@ -102,15 +103,13 @@ public enum NodeControllerConfig {
}
public List<String> getNodeLocations() {
- return System.getenv()
- .entrySet()
- .stream()
- .filter(e -> (e.getKey().contains(ConfigKeys.NODE_LOCATION)))
- .map(x -> x.getKey().replace(ConfigKeys.NODE_LOCATION + "_", "").toLowerCase() + "=" + x.getValue())
+ return System.getenv().entrySet().stream()
+ .filter(e -> e.getKey().contains(ConfigKeys.NODE_LOCATION))
+ .map(v -> v.getValue().split(";"))
+ .flatMap(Stream::of)
.collect(Collectors.toList());
}
- // TODO: get supported PE programmatically instead of environment variables
public List<String> getSupportedPipelineElements() {
return System.getenv()
.entrySet()
diff --git a/streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/management/IRunningInstances.java b/streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/management/RunningInstances.java
similarity index 96%
rename from streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/management/IRunningInstances.java
rename to streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/management/RunningInstances.java
index afb797d..7091646 100644
--- a/streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/management/IRunningInstances.java
+++ b/streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/management/RunningInstances.java
@@ -17,7 +17,7 @@
*/
package org.apache.streampipes.node.controller.container.management;
-public interface IRunningInstances<T> {
+public interface RunningInstances<T> {
void add(String id, T value);
diff --git a/streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/management/orchestrator/RunningContainerInstances.java b/streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/management/orchestrator/RunningContainerInstances.java
index 3393a74..9b2e3a0 100644
--- a/streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/management/orchestrator/RunningContainerInstances.java
+++ b/streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/management/orchestrator/RunningContainerInstances.java
@@ -18,33 +18,39 @@
package org.apache.streampipes.node.controller.container.management.orchestrator;
import org.apache.streampipes.model.node.container.DockerContainer;
-import org.apache.streampipes.node.controller.container.management.IRunningInstances;
+import org.apache.streampipes.node.controller.container.management.RunningInstances;
+import org.apache.streampipes.node.controller.container.storage.MapDBImpl;
+import java.io.File;
import java.util.HashMap;
import java.util.Map;
-public enum RunningContainerInstances implements IRunningInstances<DockerContainer> {
+public enum RunningContainerInstances implements RunningInstances<DockerContainer> {
INSTANCE;
- private final Map<String, DockerContainer> runningInstances = new HashMap<>();
+ private final MapDBImpl mapDB;
+
+ RunningContainerInstances() {
+ this.mapDB = new MapDBImpl(new File("containers.db"));
+ }
@Override
public void add(String id, DockerContainer container) {
- runningInstances.put(id, container);
+ mapDB.create(id, container);
}
@Override
public boolean isRunning(String id) {
- return runningInstances.get(id) != null;
+ return mapDB.retrieve(id) != null;
}
@Override
public DockerContainer get(String id) {
- return isRunning(id) ? runningInstances.get(id) : null;
+ return isRunning(id) ? mapDB.retrieve(id) : null;
}
@Override
public void remove(String id) {
- runningInstances.remove(id);
+ mapDB.delete(id);
}
}
diff --git a/streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/management/pe/RunningInvocableInstances.java b/streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/management/pe/RunningInvocableInstances.java
index 3067f71..bfa68a1 100644
--- a/streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/management/pe/RunningInvocableInstances.java
+++ b/streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/management/pe/RunningInvocableInstances.java
@@ -18,33 +18,37 @@
package org.apache.streampipes.node.controller.container.management.pe;
import org.apache.streampipes.model.base.InvocableStreamPipesEntity;
-import org.apache.streampipes.node.controller.container.management.IRunningInstances;
+import org.apache.streampipes.node.controller.container.storage.MapDBImpl;
+import org.apache.streampipes.node.controller.container.management.RunningInstances;
-import java.util.HashMap;
-import java.util.Map;
+import java.io.File;
-public enum RunningInvocableInstances implements IRunningInstances<InvocableStreamPipesEntity> {
+public enum RunningInvocableInstances implements RunningInstances<InvocableStreamPipesEntity> {
INSTANCE;
- private final Map<String, InvocableStreamPipesEntity> runningInvocableInstances = new HashMap<>();
+ private final MapDBImpl mapDB;
+
+ RunningInvocableInstances() {
+ this.mapDB = new MapDBImpl(new File("invocables.db"));
+ }
@Override
public void add(String id, InvocableStreamPipesEntity value) {
- runningInvocableInstances.put(id, value);
+ mapDB.create(id, value);
}
@Override
public boolean isRunning(String id) {
- return runningInvocableInstances.get(id) != null;
+ return mapDB.retrieve(id) != null;
}
@Override
public InvocableStreamPipesEntity get(String id) {
- return runningInvocableInstances.get(id);
+ return mapDB.retrieve(id);
}
@Override
public void remove(String id) {
- runningInvocableInstances.remove(id);
+ mapDB.delete(id);
}
}
diff --git a/streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/management/relay/RunningRelayInstances.java b/streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/management/relay/RunningRelayInstances.java
index e95ab57..085ed82 100644
--- a/streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/management/relay/RunningRelayInstances.java
+++ b/streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/management/relay/RunningRelayInstances.java
@@ -17,12 +17,12 @@
*/
package org.apache.streampipes.node.controller.container.management.relay;
-import org.apache.streampipes.node.controller.container.management.IRunningInstances;
+import org.apache.streampipes.node.controller.container.management.RunningInstances;
import java.util.*;
import java.util.stream.Collectors;
-public enum RunningRelayInstances implements IRunningInstances<Map<String,EventRelay>> {
+public enum RunningRelayInstances implements RunningInstances<Map<String,EventRelay>> {
INSTANCE;
private final Map<String, Map<String, EventRelay>> runningInstances = new HashMap<>();
@@ -53,6 +53,5 @@ public enum RunningRelayInstances implements IRunningInstances<Map<String,EventR
.map(Map::values)
.flatMap(Collection::stream)
.collect(Collectors.toList());
- //return new ArrayList<>(runningInstances.values().forEach(e -> e.values()));
}
}
diff --git a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/pipeline/migration/PipelineElementStartCommand.java b/streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/storage/CRUDStorage.java
similarity index 75%
rename from streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/pipeline/migration/PipelineElementStartCommand.java
rename to streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/storage/CRUDStorage.java
index 4afbf46..c588487 100644
--- a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/pipeline/migration/PipelineElementStartCommand.java
+++ b/streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/storage/CRUDStorage.java
@@ -15,17 +15,16 @@
* limitations under the License.
*
*/
-package org.apache.streampipes.manager.execution.pipeline.migration;
+package org.apache.streampipes.node.controller.container.storage;
-public class PipelineElementStartCommand implements Command {
+import java.util.List;
- @Override
- public void execute() {
+public interface CRUDStorage {
+ <T> void create(String id, T value);
- }
+ <T>T retrieve(String id);
- @Override
- public void rollback() {
+ <T> void update(String id, T value);
- }
+ void delete(String id);
}
diff --git a/streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/storage/MapDBImpl.java b/streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/storage/MapDBImpl.java
new file mode 100644
index 0000000..8483fbd
--- /dev/null
+++ b/streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/storage/MapDBImpl.java
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.apache.streampipes.node.controller.container.storage;
+
+import org.mapdb.DB;
+import org.mapdb.DBMaker;
+import org.mapdb.Serializer;
+
+import java.io.File;
+import java.util.concurrent.ConcurrentMap;
+
+public class MapDBImpl implements CRUDStorage {
+
+ private static final String DB_STORAGE_PATH = "/var/lib/streampipes/";
+
+ private DB db;
+ private ConcurrentMap<String, Object> map;
+
+ public MapDBImpl(File dbFile) {
+ if("true".equals(System.getenv("SP_DEBUG"))) {
+ db = DBMaker
+ .memoryDB()
+ .closeOnJvmShutdown()
+ .make();
+ } else {
+ db = DBMaker
+ .fileDB(DB_STORAGE_PATH + dbFile)
+ .closeOnJvmShutdown()
+ .make();
+ }
+ map = db.hashMap("nodectlcache", Serializer.STRING, Serializer.JAVA)
+ .createOrOpen();
+ }
+
+ @Override
+ public <T> void create(String id, T value) {
+ map.put(id, value);
+ }
+
+ @Override
+ public <T> T retrieve(String id) {
+ return (T) map.get(id);
+ }
+
+ @Override
+ public <T> void update(String id, T value) {
+ map.put(id, value);
+ }
+
+ @Override
+ public void delete(String id) {
+ map.remove(id);
+ }
+
+ public void close() {
+ db.close();
+ }
+}
diff --git a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/http/ElementSubmitter.java b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/http/ElementSubmitter.java
index 57715b9..c6de47c 100644
--- a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/http/ElementSubmitter.java
+++ b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/http/ElementSubmitter.java
@@ -130,11 +130,16 @@ public abstract class ElementSubmitter {
private void rollbackInvokedEntities(PipelineOperationStatus status) {
for (PipelineElementStatus s : status.getElementStatus()) {
if (s.isSuccess()) {
- Optional<InvocableStreamPipesEntity> graphs = findGraphs(s.getElementId());
+ Optional<InvocableStreamPipesEntity> graphs = compareAndFindGraphs(s.getElementId());
+ Optional<SpDataStreamRelayContainer> relays = compareAndFindRelays(s.getElementId());
graphs.ifPresent(graph -> {
- LOG.info("Rolling back element " + graph.getElementId());
+ LOG.info("Rolling back element " + s.getElementId());
makeDetachHttpRequest(new InvocableEntityUrlGenerator(graph), graph);
});
+ relays.ifPresent(relay -> {
+ LOG.info("Rolling back element " + s.getElementId());
+ makeDetachHttpRequest(new StreamRelayEndpointUrlGenerator(relay), relay);
+ });
}
}
}
@@ -148,10 +153,33 @@ public abstract class ElementSubmitter {
return status;
}
+ // Old: filter not working. leads to comparing endpoint of primary pipeline element with the one of nodectl
+ // because endpoint in PipelineOperationStatus set in @{HttpRequestBuilder} used nodectl endpoint
+ @Deprecated
private Optional<InvocableStreamPipesEntity> findGraphs(String elementId) {
return graphs.stream().filter(i -> i.getBelongsTo().equals(elementId)).findFirst();
}
+ private Optional<InvocableStreamPipesEntity> compareAndFindGraphs(String elementId) {
+ return graphs.stream().filter(graph -> matchingEndpoints(graph, elementId)).findFirst();
+ }
+
+ private Optional<SpDataStreamRelayContainer> compareAndFindRelays(String elementId) {
+ return relays.stream().filter(relay -> matchingEndpoints(relay, elementId)).findFirst();
+ }
+
+ private boolean matchingEndpoints(NamedStreamPipesEntity entity, String endpoint) {
+ boolean matching = false;
+ if (entity instanceof InvocableStreamPipesEntity) {
+ matching = new InvocableEntityUrlGenerator((InvocableStreamPipesEntity) entity)
+ .generateInvokeEndpoint().equals(endpoint);
+ } else if (entity instanceof SpDataStreamRelayContainer) {
+ matching = new StreamRelayEndpointUrlGenerator((SpDataStreamRelayContainer) entity)
+ .generateInvokeEndpoint().equals(endpoint);
+ }
+ return matching;
+ }
+
protected boolean allInvocableEntitiesRunning(PipelineOperationStatus status) {
return status.getElementStatus().stream().allMatch(PipelineElementStatus::isSuccess);
}
diff --git a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/http/GraphSubmitter.java b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/http/GraphSubmitter.java
index 726d1a6..2285355 100644
--- a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/http/GraphSubmitter.java
+++ b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/http/GraphSubmitter.java
@@ -74,7 +74,7 @@ public class GraphSubmitter extends ElementSubmitter {
return verifyPipelineOperationStatus(
status,
"Successfully started pipeline " + pipelineName,
- "Could not start pipeline" + pipelineName,
+ "Could not start pipeline " + pipelineName,
true);
}
diff --git a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/pipeline/PipelineMigrationExecutor.java b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/pipeline/PipelineMigrationExecutor.java
index e610d24..49cf71f 100644
--- a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/pipeline/PipelineMigrationExecutor.java
+++ b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/pipeline/PipelineMigrationExecutor.java
@@ -80,7 +80,6 @@ public class PipelineMigrationExecutor extends AbstractPipelineExecutor {
this.relaysToBeDeleted = new ArrayList<>();
}
- // TODO: refactor!
public PipelineOperationStatus migratePipelineElement() {
PipelineOperationStatus status = initPipelineOperationStatus();
@@ -308,9 +307,17 @@ public class PipelineMigrationExecutor extends AbstractPipelineExecutor {
}
private void findPredecessorsInMigrationPipeline(PipelineGraph pipelineGraphAfterMigration) {
- PipelineGraphHelpers.findStreams(pipelineGraphAfterMigration).forEach(stream ->
- predecessorsAfterMigration.addAll(getPredecessors(stream, migrationEntity.getTargetElement(),
- pipelineGraphAfterMigration, new ArrayList<>())));
+ // get unique list of predecessors
+ List<NamedStreamPipesEntity> predecessors = PipelineGraphHelpers.findStreams(pipelineGraphAfterMigration).stream()
+ .map(stream -> getPredecessors(stream, migrationEntity.getTargetElement(),
+ pipelineGraphAfterMigration, new ArrayList<>()))
+ .flatMap(List::stream)
+ .collect(Collectors.toList())
+ .stream()
+ .distinct()
+ .collect(Collectors.toList());
+
+ predecessorsAfterMigration.addAll(predecessors);
}
private List<SpDataSet> findDataSets() {
diff --git a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/pipeline/migration/Command.java b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/pipeline/migration/Command.java
deleted file mode 100644
index fd4bdd9..0000000
--- a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/pipeline/migration/Command.java
+++ /dev/null
@@ -1,23 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-package org.apache.streampipes.manager.execution.pipeline.migration;
-
-public interface Command {
- void execute();
- void rollback();
-}
diff --git a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/pipeline/migration/PipelineElementStopCommand.java b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/pipeline/migration/PipelineElementStopCommand.java
deleted file mode 100644
index 5a2bbd6..0000000
--- a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/pipeline/migration/PipelineElementStopCommand.java
+++ /dev/null
@@ -1,30 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-package org.apache.streampipes.manager.execution.pipeline.migration;
-
-public class PipelineElementStopCommand implements Command {
- @Override
- public void execute() {
-
- }
-
- @Override
- public void rollback() {
-
- }
-}
diff --git a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/pipeline/migration/RelayStartCommand.java b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/pipeline/migration/RelayStartCommand.java
deleted file mode 100644
index b2e242d..0000000
--- a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/pipeline/migration/RelayStartCommand.java
+++ /dev/null
@@ -1,30 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-package org.apache.streampipes.manager.execution.pipeline.migration;
-
-public class RelayStartCommand implements Command {
- @Override
- public void execute() {
-
- }
-
- @Override
- public void rollback() {
-
- }
-}
diff --git a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/pipeline/migration/RelayStopCommand.java b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/pipeline/migration/RelayStopCommand.java
deleted file mode 100644
index 1938268..0000000
--- a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/pipeline/migration/RelayStopCommand.java
+++ /dev/null
@@ -1,30 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-package org.apache.streampipes.manager.execution.pipeline.migration;
-
-public class RelayStopCommand implements Command {
- @Override
- public void execute() {
-
- }
-
- @Override
- public void rollback() {
-
- }
-}
diff --git a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/node/AbstractClusterManager.java b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/node/AbstractClusterManager.java
index dce1e06..0973c3e 100644
--- a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/node/AbstractClusterManager.java
+++ b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/node/AbstractClusterManager.java
@@ -28,6 +28,8 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.net.Socket;
public abstract class AbstractClusterManager {
private static final Logger LOG = LoggerFactory.getLogger(AbstractClusterManager.class.getCanonicalName());
@@ -36,8 +38,6 @@ public abstract class AbstractClusterManager {
private static final String COLON = ":";
private static final long RETRY_INTERVAL_MS = 5000;
private static final int CONNECT_TIMEOUT = 1000;
- private static final String BASE_NODE_CONTROLLER_INFO_ROUTE = "/api/v2/node/info";
- private static final String BASE_NODE_CONTROLLER_RELAY_ROUTE = "/api/v2/node/stream/relay";
public enum RequestOptions {
GET,POST,PUT,DELETE
@@ -46,19 +46,55 @@ public abstract class AbstractClusterManager {
protected static <T> boolean syncWithNodeController(T element, NodeSyncOptions sync) {
switch (sync) {
case ACTIVATE_NODE:
- return sync(element, "/activate", RequestOptions.POST, false, NodeInfoDescription.class);
+ return sync(element, "/api/v2/node/info/activate", RequestOptions.POST, false);
case DEACTIVATE_NODE:
- return sync(element, "/deactivate", RequestOptions.POST, false, NodeInfoDescription.class);
+ return sync(element, "/api/v2/node/info/deactivate", RequestOptions.POST, false);
case UPDATE_NODE:
- return sync(element, "", RequestOptions.PUT, true, NodeInfoDescription.class);
+ return sync(element, "/api/v2/node/info", RequestOptions.PUT, true);
case RESTART_RELAYS:
- return sync(element, "/invoke", RequestOptions.POST, true, SpDataStreamRelayContainer.class);
+ return sync(element, "/api/v2/node/stream/relay/invoke", RequestOptions.POST, true);
+ case HEALTHY:
+ return healthCheck(element, "/healthy");
default:
return false;
}
}
- private static <T> boolean sync(T element, String subroute, RequestOptions request, boolean withBody, Class<?> type) {
+ private static <T> boolean healthCheck(T element, String route) {
+ String url = generateEndpoint(element, route);
+ // call node controller REST endpoints
+ //return get(url).contains("PONG");
+
+ NodeInfoDescription desc = (NodeInfoDescription) element;
+ String nodeCtlId = desc.getNodeControllerId();
+ boolean isAlive = false;
+ String host = desc.getHostname();
+ int port = desc.getPort();
+
+ int retries = 5;
+ for (int i = 0 ; i < retries ; i++) {
+ try {
+ LOG.info("Trying to health check node controller={} ({})", nodeCtlId, i+1 + "/" + retries);
+ InetSocketAddress sa = new InetSocketAddress(host, port);
+ Socket ss = new Socket();
+ ss.connect(sa, 500);
+ ss.close();
+ if (ss.isConnected()) {
+ isAlive = true;
+ break;
+ }
+ Thread.sleep(1000);
+ } catch (IOException | InterruptedException e) {
+ continue;
+ }
+ isAlive = true;
+ }
+ LOG.info(isAlive ? "Successfully health check node controller=" + url :
+ "Could not perform health check node with controller=" + url);
+ return isAlive;
+ }
+
+ private static <T> boolean sync(T element, String route, RequestOptions request, boolean withBody) {
boolean synced = false;
String body = "{}";
@@ -66,7 +102,7 @@ public abstract class AbstractClusterManager {
body = jackson(element);
}
- String url = generateEndpoint(element, subroute, type);
+ String url = generateEndpoint(element, route);
LOG.info("Trying to sync with node controller=" + url);
boolean connected = false;
@@ -93,23 +129,21 @@ public abstract class AbstractClusterManager {
// Helpers
- private static <T> String generateEndpoint(T desc, String subroute, Class<?> type) {
- if (type.equals(NodeInfoDescription.class)) {
+ private static <T> String generateEndpoint(T desc, String route) {
+ if (desc instanceof NodeInfoDescription) {
NodeInfoDescription d = (NodeInfoDescription) desc;
return PROTOCOL
+ d.getHostname()
+ COLON
+ d.getPort()
- + BASE_NODE_CONTROLLER_INFO_ROUTE
- + subroute;
+ + route;
} else {
SpDataStreamRelayContainer d = (SpDataStreamRelayContainer) desc;
return PROTOCOL
+ d.getDeploymentTargetNodeHostname()
+ COLON
+ d.getDeploymentTargetNodePort()
- + BASE_NODE_CONTROLLER_RELAY_ROUTE
- + subroute;
+ + route;
}
}
@@ -121,6 +155,16 @@ public abstract class AbstractClusterManager {
}
}
+ private static String get(String url) {
+ try {
+ return Request.Get(url)
+ .connectTimeout(CONNECT_TIMEOUT)
+ .execute().returnContent().toString();
+ } catch (IOException e) {
+ throw new SpRuntimeException("Something went wrong during GET request to node controller", e);
+ }
+ }
+
private static boolean put(String url, String body) {
try {
Request.Put(url)
diff --git a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/node/NodeClusterManager.java b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/node/NodeClusterManager.java
index 007dc15..5b85135 100644
--- a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/node/NodeClusterManager.java
+++ b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/node/NodeClusterManager.java
@@ -28,21 +28,25 @@ import org.apache.streampipes.storage.management.StorageDispatcher;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import java.util.ArrayList;
import java.util.List;
import java.util.Optional;
+import java.util.stream.Collectors;
public class NodeClusterManager extends AbstractClusterManager {
private static final Logger LOG = LoggerFactory.getLogger(NodeClusterManager.class.getCanonicalName());
-
public static List<NodeInfoDescription> getAvailableNodes() {
- //return new AvailableNodesFetcher().fetchNodes();
- return getNodeStorageApi().getAllActiveNodes();
+ List<NodeInfoDescription> availableAndHealthyNodes = new ArrayList<>();
+ getNodeStorageApi().getAllActiveNodes().forEach(node -> {
+ if (syncWithNodeController(node, NodeSyncOptions.HEALTHY)) {
+ availableAndHealthyNodes.add(node);
+ }
+ });
+ return availableAndHealthyNodes;
}
-
public static List<NodeInfoDescription> getAllNodes() {
- //return new AvailableNodesFetcher().fetchNodes();
return getNodeStorageApi().getAllNodes();
}
diff --git a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/node/NodeSyncOptions.java b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/node/NodeSyncOptions.java
index 644e688..5ba7567 100644
--- a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/node/NodeSyncOptions.java
+++ b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/node/NodeSyncOptions.java
@@ -18,5 +18,5 @@
package org.apache.streampipes.manager.node;
public enum NodeSyncOptions {
- ACTIVATE_NODE, DEACTIVATE_NODE, UPDATE_NODE,RESTART_RELAYS;
+ ACTIVATE_NODE, DEACTIVATE_NODE, UPDATE_NODE,RESTART_RELAYS, HEALTHY;
}
diff --git a/ui/src/app/configuration/node-configuration/node-configuration-details/node-configuration-details.component.html b/ui/src/app/configuration/node-configuration/node-configuration-details/node-configuration-details.component.html
index 6712a6e..37b068f 100644
--- a/ui/src/app/configuration/node-configuration/node-configuration-details/node-configuration-details.component.html
+++ b/ui/src/app/configuration/node-configuration/node-configuration-details/node-configuration-details.component.html
@@ -16,13 +16,6 @@
~
-->
-<!--<div class="sp-dialog-container">-->
-<!-- <div class="sp-dialog-content padding-20">-->
-<!-- <div fxFlex="100" fxLayout="column">-->
-<!-- <p>node-settings works!</p>-->
-<!-- </div>-->
-<!-- </div>-->
-<!--</div>-->
<div class="sp-dialog-container">
<div class="sp-dialog-content padding-20">
<div fxFlex="100" fxLayout="column">
diff --git a/ui/src/app/configuration/node-configuration/node-configuration-details/node-configuration-details.component.ts b/ui/src/app/configuration/node-configuration/node-configuration-details/node-configuration-details.component.ts
index ff2579d..bd9d555 100644
--- a/ui/src/app/configuration/node-configuration/node-configuration-details/node-configuration-details.component.ts
+++ b/ui/src/app/configuration/node-configuration/node-configuration-details/node-configuration-details.component.ts
@@ -17,11 +17,16 @@
*/
import {Component, Input, OnInit} from '@angular/core';
-import {NodeInfoDescription} from "../../../core-model/gen/streampipes-model";
+import {Message, NodeInfoDescription, PipelineOperationStatus} from "../../../core-model/gen/streampipes-model";
import {FormGroup} from "@angular/forms";
import {DialogRef} from "../../../core-ui/dialog/base-dialog/dialog-ref";
import {MatChipInputEvent} from "@angular/material/chips";
import {COMMA, ENTER} from "@angular/cdk/keycodes";
+import {NodeService} from "../../../platform-services/apis/node.service";
+import {PipelineStatusDialogComponent} from "../../../pipelines/dialog/pipeline-status/pipeline-status-dialog.component";
+import {PanelType} from "../../../core-ui/dialog/base-dialog/base-dialog.model";
+import {DialogService} from "../../../core-ui/dialog/base-dialog/base-dialog.service";
+import {MatSnackBar} from "@angular/material/snack-bar";
export interface Fruit {
name: string;
@@ -38,7 +43,7 @@ export class NodeConfigurationDetailsComponent implements OnInit {
saving: boolean = false;
saved: boolean = false;
advancedSettings: boolean;
-
+ errorMessage: string = '';
visible = true;
selectable = true;
removable = true;
@@ -49,7 +54,9 @@ export class NodeConfigurationDetailsComponent implements OnInit {
@Input()
node: NodeInfoDescription;
- constructor(private dialogRef: DialogRef<NodeConfigurationDetailsComponent>) {
+ constructor(private nodeService: NodeService,
+ private dialogRef: DialogRef<NodeConfigurationDetailsComponent>,
+ private _snackBar: MatSnackBar) {
}
ngOnInit(): void {
@@ -58,13 +65,32 @@ export class NodeConfigurationDetailsComponent implements OnInit {
}
updateNodeInfo() {
-
+ let updateRequest;
+ this.node.staticNodeMetadata.locationTags = this.tmpTags;
+ updateRequest = this.nodeService.updateNodeState(this.node);
+
+ updateRequest
+ .subscribe(statusMessage => {
+ if (statusMessage.success) {
+ this.openSnackBar("Node successfully updated");
+ this.hide();
+ } else {
+ this.openSnackBar("Node not updated")
+ }
+ });
}
hide() {
this.dialogRef.close();
}
+ openSnackBar(message: string) {
+ this._snackBar.open(message, "close", {
+ duration: 3000,
+ });
+ }
+
+
add(event: MatChipInputEvent): void {
const input = event.input;
const value = event.value;