You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@streampipes.apache.org by ze...@apache.org on 2021/10/09 05:42:03 UTC

[incubator-streampipes] branch STREAMPIPES-438 updated: Working on recovery for adapters

This is an automated email from the ASF dual-hosted git repository.

zehnder pushed a commit to branch STREAMPIPES-438
in repository https://gitbox.apache.org/repos/asf/incubator-streampipes.git


The following commit(s) were added to refs/heads/STREAMPIPES-438 by this push:
     new a721058  Working on recovery for adapters
a721058 is described below

commit a7210589305c76e3fd5c93abd33aa66c9ef3424a
Author: Philipp Zehnder <ze...@fzi.de>
AuthorDate: Sat Oct 9 07:41:40 2021 +0200

    Working on recovery for adapters
---
 .../master/health/AdapterHealthCheck.java          | 28 ++++++++++++++++++----
 .../master/management/WorkerRestClient.java        | 26 +++++++++++++++-----
 .../worker/management/AdapterWorkerManagement.java | 10 ++++++--
 .../worker/rest/AdapterWorkerResource.java         | 15 ++++++++----
 .../management/AdapterWorkerManagementTest.java    | 26 +++++++++-----------
 .../connect/RunningAdapterInstances.java           | 19 +++++++++++----
 .../adapter-description.component.ts               |  9 -------
 7 files changed, 88 insertions(+), 45 deletions(-)

diff --git a/streampipes-connect-container-master/src/main/java/org/apache/streampipes/connect/container/master/health/AdapterHealthCheck.java b/streampipes-connect-container-master/src/main/java/org/apache/streampipes/connect/container/master/health/AdapterHealthCheck.java
index 42577ba..0a6029c 100644
--- a/streampipes-connect-container-master/src/main/java/org/apache/streampipes/connect/container/master/health/AdapterHealthCheck.java
+++ b/streampipes-connect-container-master/src/main/java/org/apache/streampipes/connect/container/master/health/AdapterHealthCheck.java
@@ -20,31 +20,51 @@ package org.apache.streampipes.connect.container.master.health;
 
 import org.apache.streampipes.connect.api.exception.AdapterException;
 import org.apache.streampipes.connect.container.master.management.AdapterMasterManagement;
+import org.apache.streampipes.connect.container.master.management.WorkerRestClient;
 import org.apache.streampipes.model.connect.adapter.AdapterDescription;
 import org.apache.streampipes.storage.api.IAdapterStorage;
 import org.apache.streampipes.storage.couchdb.CouchDbStorageManager;
 
-import java.util.List;
+import java.util.*;
 
 public class AdapterHealthCheck {
 
     public AdapterHealthCheck() {
     }
 
+    // TODO how can I test this code?
     public void checkAndRestoreAdapters() {
         AdapterMasterManagement adapterMasterManagement = new AdapterMasterManagement();
 
         IAdapterStorage adapterStorage = CouchDbStorageManager.INSTANCE.getAdapterInstanceStorage();
 
+        List<AdapterDescription> allAdapersToRecover = new ArrayList<>();
 
         // Get all adapters
         List<AdapterDescription> allRunningInstancesAdaperDescription = adapterStorage.getAllAdapters();
 
-        for (AdapterDescription adapterDescription : allRunningInstancesAdaperDescription) {
+        Map<String, List<AdapterDescription>> groupByWorker = new HashMap<>();
+        allRunningInstancesAdaperDescription.forEach(ad -> {
+            String selectedEndpointUrl = ad.getSelectedEndpointUrl();
+            if (groupByWorker.containsKey(selectedEndpointUrl)) {
+                groupByWorker.get(selectedEndpointUrl).add(ad);
+            } else {
+                List<AdapterDescription> tmp = Arrays.asList(ad);
+                groupByWorker.put(selectedEndpointUrl, tmp);
+            }
+        });
 
-        // Ask worker if they are up and running
+        groupByWorker.keySet().forEach(adapterEndpointUrl -> {
+            try {
+                List<AdapterDescription> allRunningInstancesOfOneWorker = WorkerRestClient.getAllRunningAdapterInstanceDescriptions("");
+                // TODO Remove all running adapters from allRunningInstancesAdaperDescription
+            } catch (AdapterException e) {
+                e.printStackTrace();
+            }
+        });
 
-        // If not
+        for (AdapterDescription adapterDescription : allAdapersToRecover) {
+            // TODO how do I know there is a worker to start them?
 
             // Invoke the adapters
             try {
diff --git a/streampipes-connect-container-master/src/main/java/org/apache/streampipes/connect/container/master/management/WorkerRestClient.java b/streampipes-connect-container-master/src/main/java/org/apache/streampipes/connect/container/master/management/WorkerRestClient.java
index 5798c63..d8eec8e 100644
--- a/streampipes-connect-container-master/src/main/java/org/apache/streampipes/connect/container/master/management/WorkerRestClient.java
+++ b/streampipes-connect-container-master/src/main/java/org/apache/streampipes/connect/container/master/management/WorkerRestClient.java
@@ -78,15 +78,29 @@ public class WorkerRestClient {
         stopAdapter(adapterSetDescription, url);
     }
 
+    public static List<AdapterDescription> getAllRunningAdapterInstanceDescriptions(String url) throws AdapterException {
+        // Stop execution of adapter
+        try {
+            logger.info("Requesting all running adapter description instances: " + url);
+
+            String responseString = Request.Get(url)
+                    .connectTimeout(1000)
+                    .socketTimeout(100000)
+                    .execute().returnContent().asString();
+
+            List<AdapterDescription> result = JacksonSerializer.getObjectMapper().readValue(responseString, List.class);
+
+            return result;
+        } catch (IOException e) {
+            logger.error("List of running adapters could not be fetched", e);
+            throw new AdapterException("Adapter was not stopped successfully with url: " + url);
+        }
+    }
+
     public static void startAdapter(String url, AdapterDescription ad) throws AdapterException {
         try {
             logger.info("Trying to start adapter on endpoint: " + url);
 
-            // this ensures that all adapters have a valid uri otherwise the json-ld serializer fails
-//            if (ad.getUri() == null) {
-//                ad.setUri("https://streampipes.org/adapter/" + UUID.randomUUID());
-//            }
-
             String adapterDescription = JacksonSerializer.getObjectMapper().writeValueAsString(ad);
 
             String responseString = Request.Post(url)
@@ -107,7 +121,7 @@ public class WorkerRestClient {
     public static void stopAdapter(AdapterDescription ad,
                                    String url) throws AdapterException {
 
-        // Stop execution of adatper
+        // Stop execution of adapter
         try {
             logger.info("Trying to stopAdapter adapter on endpoint: " + url);
 
diff --git a/streampipes-connect-container-worker/src/main/java/org/apache/streampipes/connect/container/worker/management/AdapterWorkerManagement.java b/streampipes-connect-container-worker/src/main/java/org/apache/streampipes/connect/container/worker/management/AdapterWorkerManagement.java
index 0e5b06c..94ba63e 100644
--- a/streampipes-connect-container-worker/src/main/java/org/apache/streampipes/connect/container/worker/management/AdapterWorkerManagement.java
+++ b/streampipes-connect-container-worker/src/main/java/org/apache/streampipes/connect/container/worker/management/AdapterWorkerManagement.java
@@ -28,15 +28,21 @@ import org.apache.streampipes.model.connect.adapter.AdapterStreamDescription;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.util.Collection;
+
 public class AdapterWorkerManagement {
 
     private static final Logger logger = LoggerFactory.getLogger(AdapterWorkerManagement.class);
 
+    public Collection<AdapterDescription> getAllRunningAdapterInstances() {
+        return RunningAdapterInstances.INSTANCE.getAllRunningAdapterDescriptions();
+    }
+
     public void invokeStreamAdapter(AdapterStreamDescription adapterStreamDescription) throws AdapterException {
 
        IAdapter<?> adapter = AdapterUtils.setAdapter(adapterStreamDescription);
 
-        RunningAdapterInstances.INSTANCE.addAdapter(adapterStreamDescription.getElementId(), adapter);
+        RunningAdapterInstances.INSTANCE.addAdapter(adapterStreamDescription.getElementId(), adapter, adapterStreamDescription);
         adapter.startAdapter();
     }
 
@@ -48,7 +54,7 @@ public class AdapterWorkerManagement {
 
         IAdapter<?> adapter = AdapterUtils.setAdapter(adapterSetDescription);
 
-        RunningAdapterInstances.INSTANCE.addAdapter(adapterSetDescription.getElementId(), adapter);
+        RunningAdapterInstances.INSTANCE.addAdapter(adapterSetDescription.getElementId(), adapter, adapterSetDescription);
 
         adapter.changeEventGrounding(adapterSetDescription.getDataSet().getEventGrounding().getTransportProtocol());
 
diff --git a/streampipes-connect-container-worker/src/main/java/org/apache/streampipes/connect/container/worker/rest/AdapterWorkerResource.java b/streampipes-connect-container-worker/src/main/java/org/apache/streampipes/connect/container/worker/rest/AdapterWorkerResource.java
index d252a12..a05e749 100644
--- a/streampipes-connect-container-worker/src/main/java/org/apache/streampipes/connect/container/worker/rest/AdapterWorkerResource.java
+++ b/streampipes-connect-container-worker/src/main/java/org/apache/streampipes/connect/container/worker/rest/AdapterWorkerResource.java
@@ -28,10 +28,7 @@ import org.apache.streampipes.rest.shared.impl.AbstractSharedRestInterface;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import javax.ws.rs.Consumes;
-import javax.ws.rs.POST;
-import javax.ws.rs.Path;
-import javax.ws.rs.Produces;
+import javax.ws.rs.*;
 import javax.ws.rs.core.MediaType;
 import javax.ws.rs.core.Response;
 
@@ -50,6 +47,16 @@ public class AdapterWorkerResource extends AbstractSharedRestInterface {
         this.adapterManagement = adapterManagement;
     }
 
+    // get all running instances
+    @GET
+    @JacksonSerialized
+    @Path("/running")
+    @Produces(MediaType.APPLICATION_JSON)
+    public Response getRunningAdapterInstances() {
+        return ok(adapterManagement.getAllRunningAdapterInstances());
+    }
+
+
     @POST
     @JacksonSerialized
     @Path("/stream/invoke")
diff --git a/streampipes-connect-container-worker/src/test/java/org/apache/streampipes/connect/container/worker/management/AdapterWorkerManagementTest.java b/streampipes-connect-container-worker/src/test/java/org/apache/streampipes/connect/container/worker/management/AdapterWorkerManagementTest.java
index cf579c3..631f3f9 100644
--- a/streampipes-connect-container-worker/src/test/java/org/apache/streampipes/connect/container/worker/management/AdapterWorkerManagementTest.java
+++ b/streampipes-connect-container-worker/src/test/java/org/apache/streampipes/connect/container/worker/management/AdapterWorkerManagementTest.java
@@ -18,28 +18,24 @@
 
 package org.apache.streampipes.connect.container.worker.management;
 
-import static org.junit.Assert.*;
-
-import org.junit.Test;
-import org.junit.runner.RunWith;
-import org.powermock.core.classloader.annotations.PowerMockIgnore;
-import org.powermock.core.classloader.annotations.PrepareForTest;
-import org.powermock.modules.junit4.PowerMockRunner;
 import org.apache.streampipes.connect.RunningAdapterInstances;
 import org.apache.streampipes.connect.adapter.Adapter;
 import org.apache.streampipes.connect.adapter.AdapterRegistry;
-import org.apache.streampipes.connect.api.exception.AdapterException;
 import org.apache.streampipes.connect.adapter.model.specific.SpecificDataSetAdapter;
+import org.apache.streampipes.connect.api.exception.AdapterException;
 import org.apache.streampipes.connect.container.worker.utils.Utils;
-import org.apache.streampipes.model.connect.adapter.AdapterSetDescription;
-import org.apache.streampipes.model.connect.adapter.AdapterStreamDescription;
-import org.apache.streampipes.model.connect.adapter.GenericAdapterSetDescription;
-import org.apache.streampipes.model.connect.adapter.GenericAdapterStreamDescription;
-import org.apache.streampipes.model.connect.adapter.SpecificAdapterSetDescription;
+import org.apache.streampipes.model.connect.adapter.*;
 import org.apache.streampipes.model.connect.guess.GuessSchema;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.powermock.core.classloader.annotations.PowerMockIgnore;
+import org.powermock.core.classloader.annotations.PrepareForTest;
+import org.powermock.modules.junit4.PowerMockRunner;
 
 import java.util.ArrayList;
 
+import static org.junit.Assert.*;
+
 @RunWith(PowerMockRunner.class)
 @PrepareForTest({ AdapterRegistry.class })
 @PowerMockIgnore({"com.sun.org.apache.xerces.*", "javax.xml.*", "org.xml.*", "javax.management.*"})
@@ -64,7 +60,7 @@ public class AdapterWorkerManagementTest {
     @Test
     public void stopStreamAdapterSuccess() throws AdapterException {
         TestAdapter testAdapter = getTestAdapterInstance();
-        RunningAdapterInstances.INSTANCE.addAdapter("http://t.de/", testAdapter);
+        RunningAdapterInstances.INSTANCE.addAdapter("http://t.de/", testAdapter, null);
         AdapterWorkerManagement adapterWorkerManagement = new AdapterWorkerManagement();
         adapterWorkerManagement.stopStreamAdapter(Utils.getMinimalStreamAdapter());
 
@@ -92,7 +88,7 @@ public class AdapterWorkerManagementTest {
     public void stopSetAdapterSuccess() throws AdapterException {
         TestAdapter testAdapter = getTestAdapterInstance();
 
-        RunningAdapterInstances.INSTANCE.addAdapter("http://t.de/", testAdapter);
+        RunningAdapterInstances.INSTANCE.addAdapter("http://t.de/", testAdapter, null);
         AdapterWorkerManagement adapterWorkerManagement = new AdapterWorkerManagement();
         adapterWorkerManagement.stopSetAdapter(Utils.getMinimalSetAdapter());
 
diff --git a/streampipes-connect/src/main/java/org/apache/streampipes/connect/RunningAdapterInstances.java b/streampipes-connect/src/main/java/org/apache/streampipes/connect/RunningAdapterInstances.java
index 22baaac..d4cf08d 100644
--- a/streampipes-connect/src/main/java/org/apache/streampipes/connect/RunningAdapterInstances.java
+++ b/streampipes-connect/src/main/java/org/apache/streampipes/connect/RunningAdapterInstances.java
@@ -19,24 +19,33 @@
 package org.apache.streampipes.connect;
 
 import org.apache.streampipes.connect.api.IAdapter;
+import org.apache.streampipes.model.connect.adapter.AdapterDescription;
 
+import java.util.Collection;
 import java.util.HashMap;
 import java.util.Map;
 
 public enum RunningAdapterInstances {
     INSTANCE;
 
-    private final Map<String, IAdapter<?>> runningInstances = new HashMap<>();
+    private final Map<String, IAdapter<?>> runningAdapterInstances = new HashMap<>();
+    private final Map<String, AdapterDescription> runningAdapterDescriptionInstances = new HashMap<>();
 
-    public void addAdapter(String elementId, IAdapter<?> adapter) {
-        runningInstances.put(elementId, adapter);
+    public void addAdapter(String elementId, IAdapter<?> adapter, AdapterDescription adapterDescription) {
+        runningAdapterInstances.put(elementId, adapter);
+        runningAdapterDescriptionInstances.put(elementId, adapterDescription);
     }
 
     public IAdapter<?> removeAdapter(String elementId) {
-        IAdapter<?> result = runningInstances.get(elementId);
-        runningInstances.remove(elementId);
+        IAdapter<?> result = runningAdapterInstances.get(elementId);
+        runningAdapterInstances.remove(elementId);
+        runningAdapterDescriptionInstances.remove(elementId);
         return result;
     }
 
+    public Collection<AdapterDescription> getAllRunningAdapterDescriptions() {
+       return this.runningAdapterDescriptionInstances.values();
+    }
+
 
 }
diff --git a/ui/src/app/connect/components/data-marketplace/adapter-description/adapter-description.component.ts b/ui/src/app/connect/components/data-marketplace/adapter-description/adapter-description.component.ts
index 799d78c..d82c1e9 100644
--- a/ui/src/app/connect/components/data-marketplace/adapter-description/adapter-description.component.ts
+++ b/ui/src/app/connect/components/data-marketplace/adapter-description/adapter-description.component.ts
@@ -85,15 +85,6 @@ export class AdapterDescriptionComponent implements OnInit {
     });
   }
 
-  deleteAdapterTemplate(adapter: AdapterDescription): void {
-    this.adapterToDelete = adapter.elementId;
-    this.dataMarketplaceService.deleteAdapterTemplate(adapter).subscribe(res => {
-      this.adapterToDelete = undefined;
-      this.updateAdapterEmitter.emit();
-      this.deleting = false;
-    });
-  }
-
   createTemplate(adapter: AdapterDescription): void {
     this.createTemplateEmitter.emit(adapter);
   }