You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@streampipes.apache.org by wi...@apache.org on 2021/03/17 09:17:23 UTC
[incubator-streampipes] 02/02: Add auto-restart for pipeline
elments, adapters, relays in case of system reboot or container restart
This is an automated email from the ASF dual-hosted git repository.
wiener pushed a commit to branch edge-extensions
in repository https://gitbox.apache.org/repos/asf/incubator-streampipes.git
commit 45b1f3c0271b636989131b2a1ac4f0711fc38f97
Author: Patrick Wiener <wi...@fzi.de>
AuthorDate: Wed Mar 17 10:14:30 2021 +0100
Add auto-restart for pipeline elments, adapters, relays in case of system reboot or container restart
---
.../api/InvocablePipelineElementResource.java | 42 +++++++++++++---------
.../controller/container/NodeControllerInit.java | 9 ++++-
.../container/management/RunningInstances.java | 4 +++
.../container/management/node/NodeManager.java | 17 ++++++---
.../orchestrator/RunningContainerInstances.java | 6 ++++
.../management/pe/InvocableElementManager.java | 23 ++++++++++++
.../management/pe/RunningInvocableInstances.java | 7 ++++
.../management/relay/RunningRelayInstances.java | 5 +++
.../controller/container/storage/CRUDStorage.java | 2 ++
.../controller/container/storage/MapDBImpl.java | 7 ++++
10 files changed, 99 insertions(+), 23 deletions(-)
diff --git a/streampipes-container/src/main/java/org/apache/streampipes/container/api/InvocablePipelineElementResource.java b/streampipes-container/src/main/java/org/apache/streampipes/container/api/InvocablePipelineElementResource.java
index e612c16..a56a7d3 100644
--- a/streampipes-container/src/main/java/org/apache/streampipes/container/api/InvocablePipelineElementResource.java
+++ b/streampipes-container/src/main/java/org/apache/streampipes/container/api/InvocablePipelineElementResource.java
@@ -55,26 +55,30 @@ public abstract class InvocablePipelineElementResource<I extends InvocableStream
@JacksonSerialized
public javax.ws.rs.core.Response invokeRuntime(@PathParam("elementId") String elementId, I graph) {
- try {
- if (isDebug()) {
- graph = createGroundingDebugInformation(graph);
+ // in case element container is still running when backend/node controller crashes
+ if (!alreadyRunning(graph)) {
+ try {
+ if (isDebug()) {
+ graph = createGroundingDebugInformation(graph);
+ }
+
+ InvocableDeclarer declarer = (InvocableDeclarer) getDeclarerById(elementId);
+
+ if (declarer != null) {
+ //String runningInstanceId = getInstanceId(graph.getElementId(), elementId);
+ String runningInstanceId = graph.getDeploymentRunningInstanceId();
+ RunningInstances.INSTANCE.add(runningInstanceId, graph, declarer.getClass().newInstance());
+ Response resp = RunningInstances.INSTANCE.getInvocation(runningInstanceId).invokeRuntime(graph);
+ return ok(resp);
+ }
+ } catch (InstantiationException | IllegalAccessException e) {
+ e.printStackTrace();
+ return ok(new Response(elementId, false, e.getMessage()));
}
- InvocableDeclarer declarer = (InvocableDeclarer) getDeclarerById(elementId);
-
- if (declarer != null) {
- //String runningInstanceId = getInstanceId(graph.getElementId(), elementId);
- String runningInstanceId = graph.getDeploymentRunningInstanceId();
- RunningInstances.INSTANCE.add(runningInstanceId, graph, declarer.getClass().newInstance());
- Response resp = RunningInstances.INSTANCE.getInvocation(runningInstanceId).invokeRuntime(graph);
- return ok(resp);
- }
- } catch (InstantiationException | IllegalAccessException e) {
- e.printStackTrace();
- return ok(new Response(elementId, false, e.getMessage()));
+ return ok(new Response(elementId, false, "Could not find the element with id: " + elementId));
}
-
- return ok(new Response(elementId, false, "Could not find the element with id: " + elementId));
+ return ok(new Response(elementId, true, "Already running element with id: " + elementId));
}
@POST
@@ -147,5 +151,9 @@ public abstract class InvocablePipelineElementResource<I extends InvocableStream
private Boolean isDebug() {
return "true".equals(System.getenv("SP_DEBUG"));
}
+
+ private Boolean alreadyRunning(I graph) {
+ return RunningInstances.INSTANCE.getInvocation(graph.getDeploymentRunningInstanceId()) != null;
+ }
}
diff --git a/streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/NodeControllerInit.java b/streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/NodeControllerInit.java
index 2a3890f..c62d9c5 100644
--- a/streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/NodeControllerInit.java
+++ b/streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/NodeControllerInit.java
@@ -23,6 +23,7 @@ import org.apache.streampipes.node.controller.container.api.NodeControllerResour
import org.apache.streampipes.node.controller.container.config.NodeControllerConfig;
import org.apache.streampipes.node.controller.container.management.node.NodeManager;
import org.apache.streampipes.node.controller.container.management.janitor.JanitorManager;
+import org.apache.streampipes.node.controller.container.management.pe.InvocableElementManager;
import org.apache.streampipes.node.controller.container.management.resource.ResourceManager;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -31,6 +32,7 @@ import org.springframework.boot.autoconfigure.EnableAutoConfiguration;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Import;
+import javax.annotation.PostConstruct;
import javax.annotation.PreDestroy;
import java.util.Collections;
@@ -58,7 +60,7 @@ public class NodeControllerInit {
LOG.info("Load node info description");
NodeManager.getInstance().init();
- LOG.info("Register node controller at backend");
+ LOG.info("Register node controller at node management");
boolean success = NodeManager.getInstance().register();
if (success) {
@@ -75,6 +77,11 @@ public class NodeControllerInit {
} else throw new SpRuntimeException("Could not register node controller at backend");
}
+ @PostConstruct
+ public void init() {
+ InvocableElementManager.getInstance().invokePipelineElementsOnSystemRebootOrRestart();
+ }
+
@PreDestroy
public void onExit(){
}
diff --git a/streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/management/RunningInstances.java b/streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/management/RunningInstances.java
index 7091646..934f917 100644
--- a/streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/management/RunningInstances.java
+++ b/streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/management/RunningInstances.java
@@ -17,6 +17,8 @@
*/
package org.apache.streampipes.node.controller.container.management;
+import java.util.List;
+
public interface RunningInstances<T> {
void add(String id, T value);
@@ -25,5 +27,7 @@ public interface RunningInstances<T> {
T get(String id);
+ List<T> getAll();
+
void remove(String id);
}
diff --git a/streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/management/node/NodeManager.java b/streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/management/node/NodeManager.java
index dac74c2..472f38d 100644
--- a/streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/management/node/NodeManager.java
+++ b/streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/management/node/NodeManager.java
@@ -26,14 +26,17 @@ import org.apache.streampipes.model.message.NotificationType;
import org.apache.streampipes.model.message.Notifications;
import org.apache.streampipes.model.message.SuccessMessage;
import org.apache.streampipes.model.node.*;
+import org.apache.streampipes.model.node.container.DeploymentContainer;
import org.apache.streampipes.model.node.container.DockerContainer;
import org.apache.streampipes.model.node.meta.GeoLocation;
+import org.apache.streampipes.model.pipeline.Pipeline;
import org.apache.streampipes.node.controller.container.config.NodeControllerConfig;
import org.apache.streampipes.serializers.json.JacksonSerializer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
+import java.util.List;
public class NodeManager {
private static final Logger LOG = LoggerFactory.getLogger(NodeManager.class.getCanonicalName());
@@ -130,6 +133,10 @@ public class NodeManager {
}
}
+ public List<DeploymentContainer> getRegisteredContainer() {
+ return this.nodeInfo.getRegisteredContainers();
+ }
+
// Interactions with core
public boolean register() {
@@ -138,7 +145,7 @@ public class NodeManager {
String body = JacksonSerializer.getObjectMapper().writeValueAsString(this.nodeInfo);
String endpoint = generateRegistrationEndpoint();
- LOG.info("Trying to register node at node management: " + endpoint);
+ LOG.info("Trying to register node controller at node management: " + endpoint);
while (!connected) {
connected = post(endpoint, body);
@@ -151,7 +158,7 @@ public class NodeManager {
}
}
}
- LOG.info("Successfully registered node at backend");
+ LOG.info("Successfully registered node controller at node management");
} catch (IOException e) {
e.printStackTrace();
}
@@ -178,7 +185,7 @@ public class NodeManager {
}
}
}
- LOG.info("Successfully registered node at backend");
+ LOG.info("Successfully synced node controller with node management");
} catch (IOException e) {
e.printStackTrace();
}
@@ -191,7 +198,7 @@ public class NodeManager {
try {
String endpoint = generateVersionEndpoint();
- LOG.info("Trying to fetch StreamPipes version from core: " + endpoint);
+ LOG.info("Trying to retrieve StreamPipes version from backend: " + endpoint);
while (!connected) {
Response response = get(endpoint);
@@ -210,7 +217,7 @@ public class NodeManager {
}
}
}
- LOG.info("Successfully registered node at backend");
+ LOG.info("Successfully retrieved StreamPipes version from backend");
} catch (IOException e) {
e.printStackTrace();
}
diff --git a/streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/management/orchestrator/RunningContainerInstances.java b/streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/management/orchestrator/RunningContainerInstances.java
index 9b2e3a0..f1fdb63 100644
--- a/streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/management/orchestrator/RunningContainerInstances.java
+++ b/streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/management/orchestrator/RunningContainerInstances.java
@@ -23,6 +23,7 @@ import org.apache.streampipes.node.controller.container.storage.MapDBImpl;
import java.io.File;
import java.util.HashMap;
+import java.util.List;
import java.util.Map;
public enum RunningContainerInstances implements RunningInstances<DockerContainer> {
@@ -50,6 +51,11 @@ public enum RunningContainerInstances implements RunningInstances<DockerContaine
}
@Override
+ public List<DockerContainer> getAll() {
+ return mapDB.retrieveAll();
+ }
+
+ @Override
public void remove(String id) {
mapDB.delete(id);
}
diff --git a/streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/management/pe/InvocableElementManager.java b/streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/management/pe/InvocableElementManager.java
index e15fd5c..04cf003 100644
--- a/streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/management/pe/InvocableElementManager.java
+++ b/streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/management/pe/InvocableElementManager.java
@@ -82,6 +82,7 @@ public class InvocableElementManager implements PipelineElementLifeCycle {
public void register(InvocableRegistration registration) {
registerAtConsul(registration);
updateAndSyncNodeInfoDescription(registration);
+ invokePipelineElementsOnSystemRebootOrRestart();
LOG.info("Successfully registered pipeline element container");
}
@@ -149,6 +150,24 @@ public class InvocableElementManager implements PipelineElementLifeCycle {
return response;
}
+ public void invokePipelineElementsOnSystemRebootOrRestart() {
+ LOG.info("Checking for pipeline elements to be started ...");
+ getAllInvocables()
+ .forEach(graph -> {
+ Response status = InvocableElementManager.getInstance().invoke(graph);
+ if (status.isSuccess()) {
+ if (status.getOptionalMessage().isEmpty()) {
+ LOG.info("Pipeline element successfully restarted {}", status.getElementId());
+ } else {
+ LOG.info("Pipeline element already running {}", status.getElementId());
+ }
+ } else {
+ LOG.error("Pipeline element could not be restarted - are the pipeline element containers " +
+ "running? {}", status.getElementId());
+ }
+ });
+ }
+
private EventProducer getReconfigurationEventProducerFromInvocable(InvocableStreamPipesEntity graph) {
TransportProtocol tp = getReconfigurationTransportProtocol(graph);
EventProducer pub;
@@ -189,6 +208,10 @@ public class InvocableElementManager implements PipelineElementLifeCycle {
}
}
+ private List<InvocableStreamPipesEntity> getAllInvocables() {
+ return RunningInvocableInstances.INSTANCE.getAll();
+ }
+
private NodeInfoDescription getNodeInfoDescription() {
return NodeManager.getInstance().retrieveNodeInfoDescription();
}
diff --git a/streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/management/pe/RunningInvocableInstances.java b/streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/management/pe/RunningInvocableInstances.java
index bfa68a1..6c7d923 100644
--- a/streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/management/pe/RunningInvocableInstances.java
+++ b/streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/management/pe/RunningInvocableInstances.java
@@ -22,6 +22,7 @@ import org.apache.streampipes.node.controller.container.storage.MapDBImpl;
import org.apache.streampipes.node.controller.container.management.RunningInstances;
import java.io.File;
+import java.util.List;
public enum RunningInvocableInstances implements RunningInstances<InvocableStreamPipesEntity> {
INSTANCE;
@@ -48,7 +49,13 @@ public enum RunningInvocableInstances implements RunningInstances<InvocableStrea
}
@Override
+ public List<InvocableStreamPipesEntity> getAll() {
+ return mapDB.retrieveAll();
+ }
+
+ @Override
public void remove(String id) {
mapDB.delete(id);
}
+
}
diff --git a/streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/management/relay/RunningRelayInstances.java b/streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/management/relay/RunningRelayInstances.java
index 085ed82..63baf7b 100644
--- a/streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/management/relay/RunningRelayInstances.java
+++ b/streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/management/relay/RunningRelayInstances.java
@@ -44,6 +44,11 @@ public enum RunningRelayInstances implements RunningInstances<Map<String,EventRe
}
@Override
+ public List<Map<String, EventRelay>> getAll() {
+ return new ArrayList<>(runningInstances.values());
+ }
+
+ @Override
public void remove(String id) {
runningInstances.remove(id);
}
diff --git a/streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/storage/CRUDStorage.java b/streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/storage/CRUDStorage.java
index c588487..0937366 100644
--- a/streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/storage/CRUDStorage.java
+++ b/streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/storage/CRUDStorage.java
@@ -24,6 +24,8 @@ public interface CRUDStorage {
<T>T retrieve(String id);
+ <T> List<T> retrieveAll();
+
<T> void update(String id, T value);
void delete(String id);
diff --git a/streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/storage/MapDBImpl.java b/streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/storage/MapDBImpl.java
index 8483fbd..f07302c 100644
--- a/streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/storage/MapDBImpl.java
+++ b/streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/storage/MapDBImpl.java
@@ -22,7 +22,9 @@ import org.mapdb.DBMaker;
import org.mapdb.Serializer;
import java.io.File;
+import java.util.List;
import java.util.concurrent.ConcurrentMap;
+import java.util.stream.Collectors;
public class MapDBImpl implements CRUDStorage {
@@ -58,6 +60,11 @@ public class MapDBImpl implements CRUDStorage {
}
@Override
+ public <T> List<T> retrieveAll() {
+ return map.values().stream().map(v -> (T) v).collect(Collectors.toList());
+ }
+
+ @Override
public <T> void update(String id, T value) {
map.put(id, value);
}