You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@streampipes.apache.org by wi...@apache.org on 2021/03/17 09:17:23 UTC

[incubator-streampipes] 02/02: Add auto-restart for pipeline elments, adapters, relays in case of system reboot or container restart

This is an automated email from the ASF dual-hosted git repository.

wiener pushed a commit to branch edge-extensions
in repository https://gitbox.apache.org/repos/asf/incubator-streampipes.git

commit 45b1f3c0271b636989131b2a1ac4f0711fc38f97
Author: Patrick Wiener <wi...@fzi.de>
AuthorDate: Wed Mar 17 10:14:30 2021 +0100

    Add auto-restart for pipeline elments, adapters, relays in case of system reboot or container restart
---
 .../api/InvocablePipelineElementResource.java      | 42 +++++++++++++---------
 .../controller/container/NodeControllerInit.java   |  9 ++++-
 .../container/management/RunningInstances.java     |  4 +++
 .../container/management/node/NodeManager.java     | 17 ++++++---
 .../orchestrator/RunningContainerInstances.java    |  6 ++++
 .../management/pe/InvocableElementManager.java     | 23 ++++++++++++
 .../management/pe/RunningInvocableInstances.java   |  7 ++++
 .../management/relay/RunningRelayInstances.java    |  5 +++
 .../controller/container/storage/CRUDStorage.java  |  2 ++
 .../controller/container/storage/MapDBImpl.java    |  7 ++++
 10 files changed, 99 insertions(+), 23 deletions(-)

diff --git a/streampipes-container/src/main/java/org/apache/streampipes/container/api/InvocablePipelineElementResource.java b/streampipes-container/src/main/java/org/apache/streampipes/container/api/InvocablePipelineElementResource.java
index e612c16..a56a7d3 100644
--- a/streampipes-container/src/main/java/org/apache/streampipes/container/api/InvocablePipelineElementResource.java
+++ b/streampipes-container/src/main/java/org/apache/streampipes/container/api/InvocablePipelineElementResource.java
@@ -55,26 +55,30 @@ public abstract class InvocablePipelineElementResource<I extends InvocableStream
     @JacksonSerialized
     public javax.ws.rs.core.Response invokeRuntime(@PathParam("elementId") String elementId, I graph) {
 
-        try {
-            if (isDebug()) {
-              graph = createGroundingDebugInformation(graph);
+        // in case element container is still running when backend/node controller crashes
+        if (!alreadyRunning(graph)) {
+            try {
+                if (isDebug()) {
+                    graph = createGroundingDebugInformation(graph);
+                }
+
+                InvocableDeclarer declarer = (InvocableDeclarer) getDeclarerById(elementId);
+
+                if (declarer != null) {
+                    //String runningInstanceId = getInstanceId(graph.getElementId(), elementId);
+                    String runningInstanceId = graph.getDeploymentRunningInstanceId();
+                    RunningInstances.INSTANCE.add(runningInstanceId, graph, declarer.getClass().newInstance());
+                    Response resp = RunningInstances.INSTANCE.getInvocation(runningInstanceId).invokeRuntime(graph);
+                    return ok(resp);
+                }
+            } catch (InstantiationException | IllegalAccessException e) {
+                e.printStackTrace();
+                return ok(new Response(elementId, false, e.getMessage()));
             }
 
-            InvocableDeclarer declarer = (InvocableDeclarer) getDeclarerById(elementId);
-
-            if (declarer != null) {
-                //String runningInstanceId = getInstanceId(graph.getElementId(), elementId);
-                String runningInstanceId = graph.getDeploymentRunningInstanceId();
-                RunningInstances.INSTANCE.add(runningInstanceId, graph, declarer.getClass().newInstance());
-                Response resp = RunningInstances.INSTANCE.getInvocation(runningInstanceId).invokeRuntime(graph);
-                return ok(resp);
-            }
-        } catch (InstantiationException | IllegalAccessException e) {
-            e.printStackTrace();
-            return ok(new Response(elementId, false, e.getMessage()));
+            return ok(new Response(elementId, false, "Could not find the element with id: " + elementId));
         }
-
-        return ok(new Response(elementId, false, "Could not find the element with id: " + elementId));
+        return ok(new Response(elementId, true, "Already running element with id: " + elementId));
     }
 
     @POST
@@ -147,5 +151,9 @@ public abstract class InvocablePipelineElementResource<I extends InvocableStream
     private Boolean isDebug() {
         return "true".equals(System.getenv("SP_DEBUG"));
     }
+
+    private Boolean alreadyRunning(I graph) {
+        return RunningInstances.INSTANCE.getInvocation(graph.getDeploymentRunningInstanceId()) != null;
+    }
 }
 
diff --git a/streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/NodeControllerInit.java b/streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/NodeControllerInit.java
index 2a3890f..c62d9c5 100644
--- a/streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/NodeControllerInit.java
+++ b/streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/NodeControllerInit.java
@@ -23,6 +23,7 @@ import org.apache.streampipes.node.controller.container.api.NodeControllerResour
 import org.apache.streampipes.node.controller.container.config.NodeControllerConfig;
 import org.apache.streampipes.node.controller.container.management.node.NodeManager;
 import org.apache.streampipes.node.controller.container.management.janitor.JanitorManager;
+import org.apache.streampipes.node.controller.container.management.pe.InvocableElementManager;
 import org.apache.streampipes.node.controller.container.management.resource.ResourceManager;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -31,6 +32,7 @@ import org.springframework.boot.autoconfigure.EnableAutoConfiguration;
 import org.springframework.context.annotation.Configuration;
 import org.springframework.context.annotation.Import;
 
+import javax.annotation.PostConstruct;
 import javax.annotation.PreDestroy;
 import java.util.Collections;
 
@@ -58,7 +60,7 @@ public class NodeControllerInit {
         LOG.info("Load node info description");
         NodeManager.getInstance().init();
 
-        LOG.info("Register node controller at backend");
+        LOG.info("Register node controller at node management");
         boolean success = NodeManager.getInstance().register();
 
         if (success) {
@@ -75,6 +77,11 @@ public class NodeControllerInit {
         } else throw new SpRuntimeException("Could not register node controller at backend");
     }
 
+    @PostConstruct
+    public void init() {
+        InvocableElementManager.getInstance().invokePipelineElementsOnSystemRebootOrRestart();
+    }
+
     @PreDestroy
     public void onExit(){
     }
diff --git a/streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/management/RunningInstances.java b/streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/management/RunningInstances.java
index 7091646..934f917 100644
--- a/streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/management/RunningInstances.java
+++ b/streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/management/RunningInstances.java
@@ -17,6 +17,8 @@
  */
 package org.apache.streampipes.node.controller.container.management;
 
+import java.util.List;
+
 public interface RunningInstances<T> {
 
     void add(String id, T value);
@@ -25,5 +27,7 @@ public interface RunningInstances<T> {
 
     T get(String id);
 
+    List<T> getAll();
+
     void remove(String id);
 }
diff --git a/streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/management/node/NodeManager.java b/streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/management/node/NodeManager.java
index dac74c2..472f38d 100644
--- a/streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/management/node/NodeManager.java
+++ b/streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/management/node/NodeManager.java
@@ -26,14 +26,17 @@ import org.apache.streampipes.model.message.NotificationType;
 import org.apache.streampipes.model.message.Notifications;
 import org.apache.streampipes.model.message.SuccessMessage;
 import org.apache.streampipes.model.node.*;
+import org.apache.streampipes.model.node.container.DeploymentContainer;
 import org.apache.streampipes.model.node.container.DockerContainer;
 import org.apache.streampipes.model.node.meta.GeoLocation;
+import org.apache.streampipes.model.pipeline.Pipeline;
 import org.apache.streampipes.node.controller.container.config.NodeControllerConfig;
 import org.apache.streampipes.serializers.json.JacksonSerializer;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
+import java.util.List;
 
 public class NodeManager {
     private static final Logger LOG = LoggerFactory.getLogger(NodeManager.class.getCanonicalName());
@@ -130,6 +133,10 @@ public class NodeManager {
         }
     }
 
+    public List<DeploymentContainer> getRegisteredContainer() {
+        return this.nodeInfo.getRegisteredContainers();
+    }
+
     // Interactions with core
 
     public boolean register() {
@@ -138,7 +145,7 @@ public class NodeManager {
             String body = JacksonSerializer.getObjectMapper().writeValueAsString(this.nodeInfo);
             String endpoint = generateRegistrationEndpoint();
 
-            LOG.info("Trying to register node at node management: " + endpoint);
+            LOG.info("Trying to register node controller at node management: " + endpoint);
 
             while (!connected) {
                 connected = post(endpoint, body);
@@ -151,7 +158,7 @@ public class NodeManager {
                     }
                 }
             }
-            LOG.info("Successfully registered node at backend");
+            LOG.info("Successfully registered node controller at node management");
         } catch (IOException e) {
             e.printStackTrace();
         }
@@ -178,7 +185,7 @@ public class NodeManager {
                     }
                 }
             }
-            LOG.info("Successfully registered node at backend");
+            LOG.info("Successfully synced node controller with node management");
         } catch (IOException e) {
             e.printStackTrace();
         }
@@ -191,7 +198,7 @@ public class NodeManager {
         try {
             String endpoint = generateVersionEndpoint();
 
-            LOG.info("Trying to fetch StreamPipes version from core: " + endpoint);
+            LOG.info("Trying to retrieve StreamPipes version from backend: " + endpoint);
 
             while (!connected) {
                 Response response = get(endpoint);
@@ -210,7 +217,7 @@ public class NodeManager {
                     }
                 }
             }
-            LOG.info("Successfully registered node at backend");
+            LOG.info("Successfully retrieved StreamPipes version from backend");
         } catch (IOException e) {
             e.printStackTrace();
         }
diff --git a/streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/management/orchestrator/RunningContainerInstances.java b/streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/management/orchestrator/RunningContainerInstances.java
index 9b2e3a0..f1fdb63 100644
--- a/streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/management/orchestrator/RunningContainerInstances.java
+++ b/streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/management/orchestrator/RunningContainerInstances.java
@@ -23,6 +23,7 @@ import org.apache.streampipes.node.controller.container.storage.MapDBImpl;
 
 import java.io.File;
 import java.util.HashMap;
+import java.util.List;
 import java.util.Map;
 
 public enum RunningContainerInstances implements RunningInstances<DockerContainer> {
@@ -50,6 +51,11 @@ public enum RunningContainerInstances implements RunningInstances<DockerContaine
     }
 
     @Override
+    public List<DockerContainer> getAll() {
+        return mapDB.retrieveAll();
+    }
+
+    @Override
     public void remove(String id) {
         mapDB.delete(id);
     }
diff --git a/streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/management/pe/InvocableElementManager.java b/streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/management/pe/InvocableElementManager.java
index e15fd5c..04cf003 100644
--- a/streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/management/pe/InvocableElementManager.java
+++ b/streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/management/pe/InvocableElementManager.java
@@ -82,6 +82,7 @@ public class InvocableElementManager implements PipelineElementLifeCycle {
     public void register(InvocableRegistration registration) {
         registerAtConsul(registration);
         updateAndSyncNodeInfoDescription(registration);
+        invokePipelineElementsOnSystemRebootOrRestart();
         LOG.info("Successfully registered pipeline element container");
     }
 
@@ -149,6 +150,24 @@ public class InvocableElementManager implements PipelineElementLifeCycle {
         return response;
     }
 
+    public void invokePipelineElementsOnSystemRebootOrRestart() {
+        LOG.info("Checking for pipeline elements to be started ...");
+        getAllInvocables()
+                .forEach(graph -> {
+                    Response status = InvocableElementManager.getInstance().invoke(graph);
+                    if (status.isSuccess()) {
+                        if (status.getOptionalMessage().isEmpty()) {
+                            LOG.info("Pipeline element successfully restarted {}", status.getElementId());
+                        } else {
+                            LOG.info("Pipeline element already running {}", status.getElementId());
+                        }
+                    } else {
+                        LOG.error("Pipeline element could not be restarted - are the pipeline element containers " +
+                                "running? {}", status.getElementId());
+                    }
+                });
+    }
+
     private EventProducer getReconfigurationEventProducerFromInvocable(InvocableStreamPipesEntity graph) {
         TransportProtocol tp = getReconfigurationTransportProtocol(graph);
         EventProducer pub;
@@ -189,6 +208,10 @@ public class InvocableElementManager implements PipelineElementLifeCycle {
         }
     }
 
+    private List<InvocableStreamPipesEntity> getAllInvocables() {
+        return RunningInvocableInstances.INSTANCE.getAll();
+    }
+
     private NodeInfoDescription getNodeInfoDescription() {
         return NodeManager.getInstance().retrieveNodeInfoDescription();
     }
diff --git a/streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/management/pe/RunningInvocableInstances.java b/streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/management/pe/RunningInvocableInstances.java
index bfa68a1..6c7d923 100644
--- a/streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/management/pe/RunningInvocableInstances.java
+++ b/streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/management/pe/RunningInvocableInstances.java
@@ -22,6 +22,7 @@ import org.apache.streampipes.node.controller.container.storage.MapDBImpl;
 import org.apache.streampipes.node.controller.container.management.RunningInstances;
 
 import java.io.File;
+import java.util.List;
 
 public enum RunningInvocableInstances implements RunningInstances<InvocableStreamPipesEntity> {
     INSTANCE;
@@ -48,7 +49,13 @@ public enum RunningInvocableInstances implements RunningInstances<InvocableStrea
     }
 
     @Override
+    public List<InvocableStreamPipesEntity> getAll() {
+        return mapDB.retrieveAll();
+    }
+
+    @Override
     public void remove(String id) {
         mapDB.delete(id);
     }
+
 }
diff --git a/streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/management/relay/RunningRelayInstances.java b/streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/management/relay/RunningRelayInstances.java
index 085ed82..63baf7b 100644
--- a/streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/management/relay/RunningRelayInstances.java
+++ b/streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/management/relay/RunningRelayInstances.java
@@ -44,6 +44,11 @@ public enum RunningRelayInstances implements RunningInstances<Map<String,EventRe
     }
 
     @Override
+    public List<Map<String, EventRelay>> getAll() {
+        return new ArrayList<>(runningInstances.values());
+    }
+
+    @Override
     public void remove(String id) {
         runningInstances.remove(id);
     }
diff --git a/streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/storage/CRUDStorage.java b/streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/storage/CRUDStorage.java
index c588487..0937366 100644
--- a/streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/storage/CRUDStorage.java
+++ b/streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/storage/CRUDStorage.java
@@ -24,6 +24,8 @@ public interface CRUDStorage {
 
     <T>T retrieve(String id);
 
+    <T> List<T> retrieveAll();
+
     <T> void update(String id, T value);
 
     void delete(String id);
diff --git a/streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/storage/MapDBImpl.java b/streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/storage/MapDBImpl.java
index 8483fbd..f07302c 100644
--- a/streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/storage/MapDBImpl.java
+++ b/streampipes-node-controller-container/src/main/java/org/apache/streampipes/node/controller/container/storage/MapDBImpl.java
@@ -22,7 +22,9 @@ import org.mapdb.DBMaker;
 import org.mapdb.Serializer;
 
 import java.io.File;
+import java.util.List;
 import java.util.concurrent.ConcurrentMap;
+import java.util.stream.Collectors;
 
 public class MapDBImpl implements CRUDStorage {
 
@@ -58,6 +60,11 @@ public class MapDBImpl implements CRUDStorage {
     }
 
     @Override
+    public <T> List<T> retrieveAll() {
+        return map.values().stream().map(v -> (T) v).collect(Collectors.toList());
+    }
+
+    @Override
     public <T> void update(String id, T value) {
         map.put(id, value);
     }