You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@streampipes.apache.org by ze...@apache.org on 2022/11/02 09:46:52 UTC

[incubator-streampipes] branch dev updated: [STREAMPIPES-613] Add check if adapter should be started on service restart

This is an automated email from the ASF dual-hosted git repository.

zehnder pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-streampipes.git


The following commit(s) were added to refs/heads/dev by this push:
     new 84814ed1e [STREAMPIPES-613] Add check if adapter should be started on service restart
84814ed1e is described below

commit 84814ed1e00a6c39ef49829095f76f4d473a1e64
Author: Philipp Zehnder <ze...@fzi.de>
AuthorDate: Wed Nov 2 10:46:39 2022 +0100

    [STREAMPIPES-613] Add check if adapter should be started on service restart
---
 .../master/health/AdapterHealthCheck.java          | 194 +++++++++++----------
 .../master/health/AdapterHealthCheckTest.java      |  41 ++++-
 .../init/ConnectWorkerRegistrationService.java     |   3 +-
 3 files changed, 140 insertions(+), 98 deletions(-)

diff --git a/streampipes-connect-container-master/src/main/java/org/apache/streampipes/connect/container/master/health/AdapterHealthCheck.java b/streampipes-connect-container-master/src/main/java/org/apache/streampipes/connect/container/master/health/AdapterHealthCheck.java
index 132b7693e..689978fde 100644
--- a/streampipes-connect-container-master/src/main/java/org/apache/streampipes/connect/container/master/health/AdapterHealthCheck.java
+++ b/streampipes-connect-container-master/src/main/java/org/apache/streampipes/connect/container/master/health/AdapterHealthCheck.java
@@ -27,103 +27,111 @@ import org.apache.streampipes.model.connect.adapter.AdapterStreamDescription;
 import org.apache.streampipes.storage.api.IAdapterStorage;
 import org.apache.streampipes.storage.couchdb.CouchDbStorageManager;
 
-import java.util.*;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
 
 public class AdapterHealthCheck {
 
-    private IAdapterStorage adapterStorage;
-    private AdapterMasterManagement adapterMasterManagement;
-
-    public AdapterHealthCheck() {
-        this.adapterStorage = CouchDbStorageManager.INSTANCE.getAdapterInstanceStorage();
-        this.adapterMasterManagement = new AdapterMasterManagement();
-    }
-
-    public AdapterHealthCheck(IAdapterStorage adapterStorage,
-                              AdapterMasterManagement adapterMasterManagement) {
-        this.adapterStorage = adapterStorage;
-        this.adapterMasterManagement = adapterMasterManagement;
-    }
-
-    /**
-     * In this method it is checked which adapters are currently running. Then it calls all workers to validate if the adapter instance is
-     * still running as expected. If the adapter is not running anymore a new worker instance is invoked.
-     */
-    public void checkAndRestoreAdapters() {
-        // Get all adapters
-        Map<String, AdapterDescription> allRunningInstancesAdapterDescriptions = this.getAllRunningInstancesAdapterDescriptions();
-
-        // Get all worker containers that run adapters
-        Map<String, List<AdapterDescription>> groupByWorker = this.getAllWorkersWithAdapters(allRunningInstancesAdapterDescriptions);
-
-        // Get adapters that are not running anymore
-        Map<String, AdapterDescription> allAdaptersToRecover = this.getAdaptersToRecover(groupByWorker, allRunningInstancesAdapterDescriptions);
-
-        // Recover Adapters
-        this.recoverAdapters(allAdaptersToRecover);
-    }
-
-    public Map<String, AdapterDescription> getAllRunningInstancesAdapterDescriptions() {
-        Map<String, AdapterDescription> result = new HashMap<>();
-        List<AdapterDescription> allRunningInstancesAdapterDescription = this.adapterStorage.getAllAdapters();
-        allRunningInstancesAdapterDescription.forEach(adapterDescription -> {
-            result.put(adapterDescription.getElementId(), adapterDescription);
-        });
-
-        return result;
-    }
-
-    public Map<String, List<AdapterDescription>> getAllWorkersWithAdapters(
-            Map<String, AdapterDescription> allRunningInstancesAdapterDescription) {
-
-        Map<String, List<AdapterDescription>> groupByWorker = new HashMap<>();
-        allRunningInstancesAdapterDescription.values().forEach(ad -> {
-            String selectedEndpointUrl = ad.getSelectedEndpointUrl();
-            if (selectedEndpointUrl != null) {
-                if (groupByWorker.containsKey(selectedEndpointUrl)) {
-                    groupByWorker.get(selectedEndpointUrl).add(ad);
-                } else {
-                    List<AdapterDescription> tmp = new ArrayList<>();
-                    tmp.add(ad);
-                    groupByWorker.put(selectedEndpointUrl, tmp);
-                }
-            }
-        });
-
-        return groupByWorker;
-    }
-
-    public Map<String, AdapterDescription> getAdaptersToRecover(
-            Map<String, List<AdapterDescription>> groupByWorker,
-            Map<String, AdapterDescription> allRunningInstancesAdapterDescription) {
-        groupByWorker.keySet().forEach(adapterEndpointUrl -> {
-            try {
-                List<AdapterDescription> allRunningInstancesOfOneWorker = WorkerRestClient.getAllRunningAdapterInstanceDescriptions(adapterEndpointUrl + WorkerPaths.getRunningAdaptersPath());
-                allRunningInstancesOfOneWorker.forEach(adapterDescription -> {
-                    allRunningInstancesAdapterDescription.remove(adapterDescription.getElementId());
-                });
-            } catch (AdapterException e) {
-                e.printStackTrace();
-            }
-        });
-
-        return allRunningInstancesAdapterDescription;
-    }
-
-
-    public boolean recoverAdapters(Map<String, AdapterDescription> adaptersToRecover) {
-        for (AdapterDescription adapterDescription : adaptersToRecover.values()) {
-            // Invoke the adapters
-            try {
-                if (adapterDescription instanceof AdapterStreamDescription) {
-                    this.adapterMasterManagement.startStreamAdapter(adapterDescription.getElementId());
-                }
-            } catch (AdapterException e) {
-                e.printStackTrace();
-            }
+  private final IAdapterStorage adapterStorage;
+  private final AdapterMasterManagement adapterMasterManagement;
+
+  public AdapterHealthCheck() {
+    this.adapterStorage = CouchDbStorageManager.INSTANCE.getAdapterInstanceStorage();
+    this.adapterMasterManagement = new AdapterMasterManagement();
+  }
+
+  public AdapterHealthCheck(IAdapterStorage adapterStorage,
+                            AdapterMasterManagement adapterMasterManagement) {
+    this.adapterStorage = adapterStorage;
+    this.adapterMasterManagement = adapterMasterManagement;
+  }
+
+  /**
+   * In this method it is checked which adapters are currently running.
+   * Then it calls all workers to validate if the adapter instance is
+   * still running as expected. If the adapter is not running anymore a new worker instance is invoked.
+   */
+  public void checkAndRestoreAdapters() {
+    // Get all adapters
+    Map<String, AdapterDescription> allRunningInstancesAdapterDescriptions =
+        this.getAllRunningInstancesAdapterDescriptions();
+
+    // Get all worker containers that run adapters
+    Map<String, List<AdapterDescription>> groupByWorker =
+        this.getAllWorkersWithAdapters(allRunningInstancesAdapterDescriptions);
+
+    // Get adapters that are not running anymore
+    Map<String, AdapterDescription> allAdaptersToRecover =
+        this.getAdaptersToRecover(groupByWorker, allRunningInstancesAdapterDescriptions);
+
+    // Recover Adapters
+    this.recoverAdapters(allAdaptersToRecover);
+  }
+
+  public Map<String, AdapterDescription> getAllRunningInstancesAdapterDescriptions() {
+    Map<String, AdapterDescription> result = new HashMap<>();
+    List<AdapterDescription> allRunningInstancesAdapterDescription = this.adapterStorage.getAllAdapters();
+    allRunningInstancesAdapterDescription.forEach(adapterDescription ->
+        result.put(adapterDescription.getElementId(), adapterDescription));
+
+    return result;
+  }
+
+  public Map<String, List<AdapterDescription>> getAllWorkersWithAdapters(
+      Map<String, AdapterDescription> allRunningInstancesAdapterDescription) {
+
+    Map<String, List<AdapterDescription>> groupByWorker = new HashMap<>();
+    allRunningInstancesAdapterDescription.values().forEach(ad -> {
+      String selectedEndpointUrl = ad.getSelectedEndpointUrl();
+      if (selectedEndpointUrl != null) {
+        if (groupByWorker.containsKey(selectedEndpointUrl)) {
+          groupByWorker.get(selectedEndpointUrl).add(ad);
+        } else {
+          List<AdapterDescription> tmp = new ArrayList<>();
+          tmp.add(ad);
+          groupByWorker.put(selectedEndpointUrl, tmp);
         }
-
-        return true;
+      }
+    });
+
+    return groupByWorker;
+  }
+
+  public Map<String, AdapterDescription> getAdaptersToRecover(
+      Map<String, List<AdapterDescription>> groupByWorker,
+      Map<String, AdapterDescription> allRunningInstancesAdapterDescription) {
+    groupByWorker.keySet().forEach(adapterEndpointUrl -> {
+      try {
+        List<AdapterDescription> allRunningInstancesOfOneWorker =
+            WorkerRestClient.getAllRunningAdapterInstanceDescriptions(
+                adapterEndpointUrl + WorkerPaths.getRunningAdaptersPath());
+        allRunningInstancesOfOneWorker.forEach(adapterDescription ->
+            allRunningInstancesAdapterDescription.remove(adapterDescription.getElementId()));
+      } catch (AdapterException e) {
+        e.printStackTrace();
+      }
+    });
+
+    return allRunningInstancesAdapterDescription;
+  }
+
+
+  public void recoverAdapters(Map<String, AdapterDescription> adaptersToRecover) {
+    for (AdapterDescription adapterDescription : adaptersToRecover.values()) {
+      // Invoke all adapters that were running when the adapter container was stopped
+      try {
+        if (adapterDescription instanceof AdapterStreamDescription) {
+          if (((AdapterStreamDescription) adapterDescription).isRunning()) {
+            this.adapterMasterManagement.startStreamAdapter(adapterDescription.getElementId());
+          }
+        }
+      } catch (AdapterException e) {
+        e.printStackTrace();
+      }
     }
 
+  }
+
 }
diff --git a/streampipes-connect-container-master/src/test/java/org/apache/streampipes/connect/container/master/health/AdapterHealthCheckTest.java b/streampipes-connect-container-master/src/test/java/org/apache/streampipes/connect/container/master/health/AdapterHealthCheckTest.java
index 77185fa8b..a74de380c 100644
--- a/streampipes-connect-container-master/src/test/java/org/apache/streampipes/connect/container/master/health/AdapterHealthCheckTest.java
+++ b/streampipes-connect-container-master/src/test/java/org/apache/streampipes/connect/container/master/health/AdapterHealthCheckTest.java
@@ -17,26 +17,30 @@
  */
 package org.apache.streampipes.connect.container.master.health;
 
+import org.apache.streampipes.connect.api.exception.AdapterException;
+import org.apache.streampipes.connect.container.master.management.AdapterMasterManagement;
 import org.apache.streampipes.model.connect.adapter.AdapterDescription;
+import org.apache.streampipes.model.connect.adapter.AdapterStreamDescription;
 import org.apache.streampipes.model.connect.adapter.SpecificAdapterStreamDescription;
 import org.apache.streampipes.sdk.builder.adapter.SpecificDataStreamAdapterBuilder;
 import org.apache.streampipes.storage.couchdb.impl.AdapterInstanceStorageImpl;
 import org.junit.Test;
 
-import java.util.Arrays;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNotNull;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
 import static org.powermock.api.mockito.PowerMockito.mock;
 import static org.powermock.api.mockito.PowerMockito.when;
 
 public class AdapterHealthCheckTest {
 
-    private String testElementId = "testElementId";
-    private String selectedEndpointUrl = "http://test.de";
+    private final String testElementId = "testElementId";
 
     @Test
     public void getAllRunningInstancesAdapterDescriptions() {
@@ -61,10 +65,39 @@ public class AdapterHealthCheckTest {
 
         assertNotNull(result);
         assertEquals(1, result.keySet().size());
+        String selectedEndpointUrl = "http://test.de";
         assertEquals(1, result.get(selectedEndpointUrl).size());
         assertEquals(getAdapterDescriptionList().get(0), result.get(selectedEndpointUrl).get(0));
     }
 
+    @Test
+    public void recoverRunningAdaptersTest() throws AdapterException {
+        AdapterMasterManagement adapterMasterManagementMock = mock(AdapterMasterManagement.class);
+        AdapterHealthCheck adapterHealthCheck = new AdapterHealthCheck(null, adapterMasterManagementMock);
+
+        adapterHealthCheck.recoverAdapters(getAdaptersToRecoverData(true));
+
+        verify(adapterMasterManagementMock, times(1)).startStreamAdapter(any());
+    }
+
+
+    @Test
+    public void recoverStoppedAdaptersTest() throws AdapterException {
+        AdapterMasterManagement adapterMasterManagementMock = mock(AdapterMasterManagement.class);
+        AdapterHealthCheck adapterHealthCheck = new AdapterHealthCheck(null, adapterMasterManagementMock);
+
+        adapterHealthCheck.recoverAdapters(getAdaptersToRecoverData(false));
+
+        verify(adapterMasterManagementMock, times(0)).startStreamAdapter(any());
+    }
+
+    private Map<String, AdapterDescription> getAdaptersToRecoverData(boolean isRunning) {
+        Map<String, AdapterDescription> adaptersToRecover = new HashMap<>();
+        AdapterStreamDescription ad = SpecificDataStreamAdapterBuilder.create("").build();
+        ad.setRunning(isRunning);
+        adaptersToRecover.put("", ad);
+        return adaptersToRecover;
+    }
 
     private List<AdapterDescription> getAdapterDescriptionList() {
 
@@ -74,7 +107,7 @@ public class AdapterHealthCheckTest {
                 .build();
         adapterStreamDescription.setSelectedEndpointUrl("http://test.de");
 
-        return Arrays.asList(adapterStreamDescription);
+        return List.of(adapterStreamDescription);
     }
 
     private Map<String, AdapterDescription> getAdapterDescriptionMap() {
diff --git a/streampipes-connect-container-worker/src/main/java/org/apache/streampipes/connect/container/worker/init/ConnectWorkerRegistrationService.java b/streampipes-connect-container-worker/src/main/java/org/apache/streampipes/connect/container/worker/init/ConnectWorkerRegistrationService.java
index fd5095199..aec9e07a8 100644
--- a/streampipes-connect-container-worker/src/main/java/org/apache/streampipes/connect/container/worker/init/ConnectWorkerRegistrationService.java
+++ b/streampipes-connect-container-worker/src/main/java/org/apache/streampipes/connect/container/worker/init/ConnectWorkerRegistrationService.java
@@ -31,7 +31,8 @@ public class ConnectWorkerRegistrationService {
 
     while (!connected) {
 
-      connected = MasterRestClient.register(new ConnectWorkerDescriptionProvider().getContainerDescription(serviceDef.getServiceGroup()));
+      connected = MasterRestClient.register(
+          new ConnectWorkerDescriptionProvider().getContainerDescription(serviceDef.getServiceGroup()));
 
       if (connected) {
         LOG.info("Successfully connected to master. Worker is now running.");