You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@streampipes.apache.org by ze...@apache.org on 2022/11/02 09:46:52 UTC
[incubator-streampipes] branch dev updated: [STREAMPIPES-613] Add check if adapter should be started on service restart
This is an automated email from the ASF dual-hosted git repository.
zehnder pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-streampipes.git
The following commit(s) were added to refs/heads/dev by this push:
new 84814ed1e [STREAMPIPES-613] Add check if adapter should be started on service restart
84814ed1e is described below
commit 84814ed1e00a6c39ef49829095f76f4d473a1e64
Author: Philipp Zehnder <ze...@fzi.de>
AuthorDate: Wed Nov 2 10:46:39 2022 +0100
[STREAMPIPES-613] Add check if adapter should be started on service restart
---
.../master/health/AdapterHealthCheck.java | 194 +++++++++++----------
.../master/health/AdapterHealthCheckTest.java | 41 ++++-
.../init/ConnectWorkerRegistrationService.java | 3 +-
3 files changed, 140 insertions(+), 98 deletions(-)
diff --git a/streampipes-connect-container-master/src/main/java/org/apache/streampipes/connect/container/master/health/AdapterHealthCheck.java b/streampipes-connect-container-master/src/main/java/org/apache/streampipes/connect/container/master/health/AdapterHealthCheck.java
index 132b7693e..689978fde 100644
--- a/streampipes-connect-container-master/src/main/java/org/apache/streampipes/connect/container/master/health/AdapterHealthCheck.java
+++ b/streampipes-connect-container-master/src/main/java/org/apache/streampipes/connect/container/master/health/AdapterHealthCheck.java
@@ -27,103 +27,111 @@ import org.apache.streampipes.model.connect.adapter.AdapterStreamDescription;
import org.apache.streampipes.storage.api.IAdapterStorage;
import org.apache.streampipes.storage.couchdb.CouchDbStorageManager;
-import java.util.*;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
public class AdapterHealthCheck {
- private IAdapterStorage adapterStorage;
- private AdapterMasterManagement adapterMasterManagement;
-
- public AdapterHealthCheck() {
- this.adapterStorage = CouchDbStorageManager.INSTANCE.getAdapterInstanceStorage();
- this.adapterMasterManagement = new AdapterMasterManagement();
- }
-
- public AdapterHealthCheck(IAdapterStorage adapterStorage,
- AdapterMasterManagement adapterMasterManagement) {
- this.adapterStorage = adapterStorage;
- this.adapterMasterManagement = adapterMasterManagement;
- }
-
- /**
- * In this method it is checked which adapters are currently running. Then it calls all workers to validate if the adapter instance is
- * still running as expected. If the adapter is not running anymore a new worker instance is invoked.
- */
- public void checkAndRestoreAdapters() {
- // Get all adapters
- Map<String, AdapterDescription> allRunningInstancesAdapterDescriptions = this.getAllRunningInstancesAdapterDescriptions();
-
- // Get all worker containers that run adapters
- Map<String, List<AdapterDescription>> groupByWorker = this.getAllWorkersWithAdapters(allRunningInstancesAdapterDescriptions);
-
- // Get adapters that are not running anymore
- Map<String, AdapterDescription> allAdaptersToRecover = this.getAdaptersToRecover(groupByWorker, allRunningInstancesAdapterDescriptions);
-
- // Recover Adapters
- this.recoverAdapters(allAdaptersToRecover);
- }
-
- public Map<String, AdapterDescription> getAllRunningInstancesAdapterDescriptions() {
- Map<String, AdapterDescription> result = new HashMap<>();
- List<AdapterDescription> allRunningInstancesAdapterDescription = this.adapterStorage.getAllAdapters();
- allRunningInstancesAdapterDescription.forEach(adapterDescription -> {
- result.put(adapterDescription.getElementId(), adapterDescription);
- });
-
- return result;
- }
-
- public Map<String, List<AdapterDescription>> getAllWorkersWithAdapters(
- Map<String, AdapterDescription> allRunningInstancesAdapterDescription) {
-
- Map<String, List<AdapterDescription>> groupByWorker = new HashMap<>();
- allRunningInstancesAdapterDescription.values().forEach(ad -> {
- String selectedEndpointUrl = ad.getSelectedEndpointUrl();
- if (selectedEndpointUrl != null) {
- if (groupByWorker.containsKey(selectedEndpointUrl)) {
- groupByWorker.get(selectedEndpointUrl).add(ad);
- } else {
- List<AdapterDescription> tmp = new ArrayList<>();
- tmp.add(ad);
- groupByWorker.put(selectedEndpointUrl, tmp);
- }
- }
- });
-
- return groupByWorker;
- }
-
- public Map<String, AdapterDescription> getAdaptersToRecover(
- Map<String, List<AdapterDescription>> groupByWorker,
- Map<String, AdapterDescription> allRunningInstancesAdapterDescription) {
- groupByWorker.keySet().forEach(adapterEndpointUrl -> {
- try {
- List<AdapterDescription> allRunningInstancesOfOneWorker = WorkerRestClient.getAllRunningAdapterInstanceDescriptions(adapterEndpointUrl + WorkerPaths.getRunningAdaptersPath());
- allRunningInstancesOfOneWorker.forEach(adapterDescription -> {
- allRunningInstancesAdapterDescription.remove(adapterDescription.getElementId());
- });
- } catch (AdapterException e) {
- e.printStackTrace();
- }
- });
-
- return allRunningInstancesAdapterDescription;
- }
-
-
- public boolean recoverAdapters(Map<String, AdapterDescription> adaptersToRecover) {
- for (AdapterDescription adapterDescription : adaptersToRecover.values()) {
- // Invoke the adapters
- try {
- if (adapterDescription instanceof AdapterStreamDescription) {
- this.adapterMasterManagement.startStreamAdapter(adapterDescription.getElementId());
- }
- } catch (AdapterException e) {
- e.printStackTrace();
- }
+ private final IAdapterStorage adapterStorage;
+ private final AdapterMasterManagement adapterMasterManagement;
+
+ public AdapterHealthCheck() {
+ this.adapterStorage = CouchDbStorageManager.INSTANCE.getAdapterInstanceStorage();
+ this.adapterMasterManagement = new AdapterMasterManagement();
+ }
+
+ public AdapterHealthCheck(IAdapterStorage adapterStorage,
+ AdapterMasterManagement adapterMasterManagement) {
+ this.adapterStorage = adapterStorage;
+ this.adapterMasterManagement = adapterMasterManagement;
+ }
+
+ /**
+ * In this method it is checked which adapters are currently running.
+ * Then it calls all workers to validate if the adapter instance is
+ * still running as expected. If the adapter is not running anymore a new worker instance is invoked.
+ */
+ public void checkAndRestoreAdapters() {
+ // Get all adapters
+ Map<String, AdapterDescription> allRunningInstancesAdapterDescriptions =
+ this.getAllRunningInstancesAdapterDescriptions();
+
+ // Get all worker containers that run adapters
+ Map<String, List<AdapterDescription>> groupByWorker =
+ this.getAllWorkersWithAdapters(allRunningInstancesAdapterDescriptions);
+
+ // Get adapters that are not running anymore
+ Map<String, AdapterDescription> allAdaptersToRecover =
+ this.getAdaptersToRecover(groupByWorker, allRunningInstancesAdapterDescriptions);
+
+ // Recover Adapters
+ this.recoverAdapters(allAdaptersToRecover);
+ }
+
+ public Map<String, AdapterDescription> getAllRunningInstancesAdapterDescriptions() {
+ Map<String, AdapterDescription> result = new HashMap<>();
+ List<AdapterDescription> allRunningInstancesAdapterDescription = this.adapterStorage.getAllAdapters();
+ allRunningInstancesAdapterDescription.forEach(adapterDescription ->
+ result.put(adapterDescription.getElementId(), adapterDescription));
+
+ return result;
+ }
+
+ public Map<String, List<AdapterDescription>> getAllWorkersWithAdapters(
+ Map<String, AdapterDescription> allRunningInstancesAdapterDescription) {
+
+ Map<String, List<AdapterDescription>> groupByWorker = new HashMap<>();
+ allRunningInstancesAdapterDescription.values().forEach(ad -> {
+ String selectedEndpointUrl = ad.getSelectedEndpointUrl();
+ if (selectedEndpointUrl != null) {
+ if (groupByWorker.containsKey(selectedEndpointUrl)) {
+ groupByWorker.get(selectedEndpointUrl).add(ad);
+ } else {
+ List<AdapterDescription> tmp = new ArrayList<>();
+ tmp.add(ad);
+ groupByWorker.put(selectedEndpointUrl, tmp);
}
-
- return true;
+ }
+ });
+
+ return groupByWorker;
+ }
+
+ public Map<String, AdapterDescription> getAdaptersToRecover(
+ Map<String, List<AdapterDescription>> groupByWorker,
+ Map<String, AdapterDescription> allRunningInstancesAdapterDescription) {
+ groupByWorker.keySet().forEach(adapterEndpointUrl -> {
+ try {
+ List<AdapterDescription> allRunningInstancesOfOneWorker =
+ WorkerRestClient.getAllRunningAdapterInstanceDescriptions(
+ adapterEndpointUrl + WorkerPaths.getRunningAdaptersPath());
+ allRunningInstancesOfOneWorker.forEach(adapterDescription ->
+ allRunningInstancesAdapterDescription.remove(adapterDescription.getElementId()));
+ } catch (AdapterException e) {
+ e.printStackTrace();
+ }
+ });
+
+ return allRunningInstancesAdapterDescription;
+ }
+
+
+ public void recoverAdapters(Map<String, AdapterDescription> adaptersToRecover) {
+ for (AdapterDescription adapterDescription : adaptersToRecover.values()) {
+ // Invoke all adapters that were running when the adapter container was stopped
+ try {
+ if (adapterDescription instanceof AdapterStreamDescription) {
+ if (((AdapterStreamDescription) adapterDescription).isRunning()) {
+ this.adapterMasterManagement.startStreamAdapter(adapterDescription.getElementId());
+ }
+ }
+ } catch (AdapterException e) {
+ e.printStackTrace();
+ }
}
+ }
+
}
diff --git a/streampipes-connect-container-master/src/test/java/org/apache/streampipes/connect/container/master/health/AdapterHealthCheckTest.java b/streampipes-connect-container-master/src/test/java/org/apache/streampipes/connect/container/master/health/AdapterHealthCheckTest.java
index 77185fa8b..a74de380c 100644
--- a/streampipes-connect-container-master/src/test/java/org/apache/streampipes/connect/container/master/health/AdapterHealthCheckTest.java
+++ b/streampipes-connect-container-master/src/test/java/org/apache/streampipes/connect/container/master/health/AdapterHealthCheckTest.java
@@ -17,26 +17,30 @@
*/
package org.apache.streampipes.connect.container.master.health;
+import org.apache.streampipes.connect.api.exception.AdapterException;
+import org.apache.streampipes.connect.container.master.management.AdapterMasterManagement;
import org.apache.streampipes.model.connect.adapter.AdapterDescription;
+import org.apache.streampipes.model.connect.adapter.AdapterStreamDescription;
import org.apache.streampipes.model.connect.adapter.SpecificAdapterStreamDescription;
import org.apache.streampipes.sdk.builder.adapter.SpecificDataStreamAdapterBuilder;
import org.apache.streampipes.storage.couchdb.impl.AdapterInstanceStorageImpl;
import org.junit.Test;
-import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
import static org.powermock.api.mockito.PowerMockito.mock;
import static org.powermock.api.mockito.PowerMockito.when;
public class AdapterHealthCheckTest {
- private String testElementId = "testElementId";
- private String selectedEndpointUrl = "http://test.de";
+ private final String testElementId = "testElementId";
@Test
public void getAllRunningInstancesAdapterDescriptions() {
@@ -61,10 +65,39 @@ public class AdapterHealthCheckTest {
assertNotNull(result);
assertEquals(1, result.keySet().size());
+ String selectedEndpointUrl = "http://test.de";
assertEquals(1, result.get(selectedEndpointUrl).size());
assertEquals(getAdapterDescriptionList().get(0), result.get(selectedEndpointUrl).get(0));
}
+ @Test
+ public void recoverRunningAdaptersTest() throws AdapterException {
+ AdapterMasterManagement adapterMasterManagementMock = mock(AdapterMasterManagement.class);
+ AdapterHealthCheck adapterHealthCheck = new AdapterHealthCheck(null, adapterMasterManagementMock);
+
+ adapterHealthCheck.recoverAdapters(getAdaptersToRecoverData(true));
+
+ verify(adapterMasterManagementMock, times(1)).startStreamAdapter(any());
+ }
+
+
+ @Test
+ public void recoverStoppedAdaptersTest() throws AdapterException {
+ AdapterMasterManagement adapterMasterManagementMock = mock(AdapterMasterManagement.class);
+ AdapterHealthCheck adapterHealthCheck = new AdapterHealthCheck(null, adapterMasterManagementMock);
+
+ adapterHealthCheck.recoverAdapters(getAdaptersToRecoverData(false));
+
+ verify(adapterMasterManagementMock, times(0)).startStreamAdapter(any());
+ }
+
+ private Map<String, AdapterDescription> getAdaptersToRecoverData(boolean isRunning) {
+ Map<String, AdapterDescription> adaptersToRecover = new HashMap<>();
+ AdapterStreamDescription ad = SpecificDataStreamAdapterBuilder.create("").build();
+ ad.setRunning(isRunning);
+ adaptersToRecover.put("", ad);
+ return adaptersToRecover;
+ }
private List<AdapterDescription> getAdapterDescriptionList() {
@@ -74,7 +107,7 @@ public class AdapterHealthCheckTest {
.build();
adapterStreamDescription.setSelectedEndpointUrl("http://test.de");
- return Arrays.asList(adapterStreamDescription);
+ return List.of(adapterStreamDescription);
}
private Map<String, AdapterDescription> getAdapterDescriptionMap() {
diff --git a/streampipes-connect-container-worker/src/main/java/org/apache/streampipes/connect/container/worker/init/ConnectWorkerRegistrationService.java b/streampipes-connect-container-worker/src/main/java/org/apache/streampipes/connect/container/worker/init/ConnectWorkerRegistrationService.java
index fd5095199..aec9e07a8 100644
--- a/streampipes-connect-container-worker/src/main/java/org/apache/streampipes/connect/container/worker/init/ConnectWorkerRegistrationService.java
+++ b/streampipes-connect-container-worker/src/main/java/org/apache/streampipes/connect/container/worker/init/ConnectWorkerRegistrationService.java
@@ -31,7 +31,8 @@ public class ConnectWorkerRegistrationService {
while (!connected) {
- connected = MasterRestClient.register(new ConnectWorkerDescriptionProvider().getContainerDescription(serviceDef.getServiceGroup()));
+ connected = MasterRestClient.register(
+ new ConnectWorkerDescriptionProvider().getContainerDescription(serviceDef.getServiceGroup()));
if (connected) {
LOG.info("Successfully connected to master. Worker is now running.");