You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@streampipes.apache.org by ze...@apache.org on 2021/10/09 05:42:03 UTC
[incubator-streampipes] branch STREAMPIPES-438 updated: Working on
recovery for adapters
This is an automated email from the ASF dual-hosted git repository.
zehnder pushed a commit to branch STREAMPIPES-438
in repository https://gitbox.apache.org/repos/asf/incubator-streampipes.git
The following commit(s) were added to refs/heads/STREAMPIPES-438 by this push:
new a721058 Working on recovery for adapters
a721058 is described below
commit a7210589305c76e3fd5c93abd33aa66c9ef3424a
Author: Philipp Zehnder <ze...@fzi.de>
AuthorDate: Sat Oct 9 07:41:40 2021 +0200
Working on recovery for adapters
---
.../master/health/AdapterHealthCheck.java | 28 ++++++++++++++++++----
.../master/management/WorkerRestClient.java | 26 +++++++++++++++-----
.../worker/management/AdapterWorkerManagement.java | 10 ++++++--
.../worker/rest/AdapterWorkerResource.java | 15 ++++++++----
.../management/AdapterWorkerManagementTest.java | 26 +++++++++-----------
.../connect/RunningAdapterInstances.java | 19 +++++++++++----
.../adapter-description.component.ts | 9 -------
7 files changed, 88 insertions(+), 45 deletions(-)
diff --git a/streampipes-connect-container-master/src/main/java/org/apache/streampipes/connect/container/master/health/AdapterHealthCheck.java b/streampipes-connect-container-master/src/main/java/org/apache/streampipes/connect/container/master/health/AdapterHealthCheck.java
index 42577ba..0a6029c 100644
--- a/streampipes-connect-container-master/src/main/java/org/apache/streampipes/connect/container/master/health/AdapterHealthCheck.java
+++ b/streampipes-connect-container-master/src/main/java/org/apache/streampipes/connect/container/master/health/AdapterHealthCheck.java
@@ -20,31 +20,51 @@ package org.apache.streampipes.connect.container.master.health;
import org.apache.streampipes.connect.api.exception.AdapterException;
import org.apache.streampipes.connect.container.master.management.AdapterMasterManagement;
+import org.apache.streampipes.connect.container.master.management.WorkerRestClient;
import org.apache.streampipes.model.connect.adapter.AdapterDescription;
import org.apache.streampipes.storage.api.IAdapterStorage;
import org.apache.streampipes.storage.couchdb.CouchDbStorageManager;
-import java.util.List;
+import java.util.*;
public class AdapterHealthCheck {
public AdapterHealthCheck() {
}
+ // TODO how can I test this code?
public void checkAndRestoreAdapters() {
AdapterMasterManagement adapterMasterManagement = new AdapterMasterManagement();
IAdapterStorage adapterStorage = CouchDbStorageManager.INSTANCE.getAdapterInstanceStorage();
+ List<AdapterDescription> allAdapersToRecover = new ArrayList<>();
// Get all adapters
List<AdapterDescription> allRunningInstancesAdaperDescription = adapterStorage.getAllAdapters();
- for (AdapterDescription adapterDescription : allRunningInstancesAdaperDescription) {
+ Map<String, List<AdapterDescription>> groupByWorker = new HashMap<>();
+ allRunningInstancesAdaperDescription.forEach(ad -> {
+ String selectedEndpointUrl = ad.getSelectedEndpointUrl();
+ if (groupByWorker.containsKey(selectedEndpointUrl)) {
+ groupByWorker.get(selectedEndpointUrl).add(ad);
+ } else {
+ List<AdapterDescription> tmp = Arrays.asList(ad);
+ groupByWorker.put(selectedEndpointUrl, tmp);
+ }
+ });
- // Ask worker if they are up and running
+ groupByWorker.keySet().forEach(adapterEndpointUrl -> {
+ try {
+ List<AdapterDescription> allRunningInstancesOfOneWorker = WorkerRestClient.getAllRunningAdapterInstanceDescriptions("");
+ // TODO Remove all running adapters from allRunningInstancesAdaperDescription
+ } catch (AdapterException e) {
+ e.printStackTrace();
+ }
+ });
- // If not
+ for (AdapterDescription adapterDescription : allAdapersToRecover) {
+ // TODO how do I know there is a worker to start them?
// Invoke the adapters
try {
diff --git a/streampipes-connect-container-master/src/main/java/org/apache/streampipes/connect/container/master/management/WorkerRestClient.java b/streampipes-connect-container-master/src/main/java/org/apache/streampipes/connect/container/master/management/WorkerRestClient.java
index 5798c63..d8eec8e 100644
--- a/streampipes-connect-container-master/src/main/java/org/apache/streampipes/connect/container/master/management/WorkerRestClient.java
+++ b/streampipes-connect-container-master/src/main/java/org/apache/streampipes/connect/container/master/management/WorkerRestClient.java
@@ -78,15 +78,29 @@ public class WorkerRestClient {
stopAdapter(adapterSetDescription, url);
}
+ public static List<AdapterDescription> getAllRunningAdapterInstanceDescriptions(String url) throws AdapterException {
+ // Stop execution of adapter
+ try {
+ logger.info("Requesting all running adapter description instances: " + url);
+
+ String responseString = Request.Get(url)
+ .connectTimeout(1000)
+ .socketTimeout(100000)
+ .execute().returnContent().asString();
+
+ List<AdapterDescription> result = JacksonSerializer.getObjectMapper().readValue(responseString, List.class);
+
+ return result;
+ } catch (IOException e) {
+ logger.error("List of running adapters could not be fetched", e);
+ throw new AdapterException("Adapter was not stopped successfully with url: " + url);
+ }
+ }
+
public static void startAdapter(String url, AdapterDescription ad) throws AdapterException {
try {
logger.info("Trying to start adapter on endpoint: " + url);
- // this ensures that all adapters have a valid uri otherwise the json-ld serializer fails
-// if (ad.getUri() == null) {
-// ad.setUri("https://streampipes.org/adapter/" + UUID.randomUUID());
-// }
-
String adapterDescription = JacksonSerializer.getObjectMapper().writeValueAsString(ad);
String responseString = Request.Post(url)
@@ -107,7 +121,7 @@ public class WorkerRestClient {
public static void stopAdapter(AdapterDescription ad,
String url) throws AdapterException {
- // Stop execution of adatper
+ // Stop execution of adapter
try {
logger.info("Trying to stopAdapter adapter on endpoint: " + url);
diff --git a/streampipes-connect-container-worker/src/main/java/org/apache/streampipes/connect/container/worker/management/AdapterWorkerManagement.java b/streampipes-connect-container-worker/src/main/java/org/apache/streampipes/connect/container/worker/management/AdapterWorkerManagement.java
index 0e5b06c..94ba63e 100644
--- a/streampipes-connect-container-worker/src/main/java/org/apache/streampipes/connect/container/worker/management/AdapterWorkerManagement.java
+++ b/streampipes-connect-container-worker/src/main/java/org/apache/streampipes/connect/container/worker/management/AdapterWorkerManagement.java
@@ -28,15 +28,21 @@ import org.apache.streampipes.model.connect.adapter.AdapterStreamDescription;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import java.util.Collection;
+
public class AdapterWorkerManagement {
private static final Logger logger = LoggerFactory.getLogger(AdapterWorkerManagement.class);
+ public Collection<AdapterDescription> getAllRunningAdapterInstances() {
+ return RunningAdapterInstances.INSTANCE.getAllRunningAdapterDescriptions();
+ }
+
public void invokeStreamAdapter(AdapterStreamDescription adapterStreamDescription) throws AdapterException {
IAdapter<?> adapter = AdapterUtils.setAdapter(adapterStreamDescription);
- RunningAdapterInstances.INSTANCE.addAdapter(adapterStreamDescription.getElementId(), adapter);
+ RunningAdapterInstances.INSTANCE.addAdapter(adapterStreamDescription.getElementId(), adapter, adapterStreamDescription);
adapter.startAdapter();
}
@@ -48,7 +54,7 @@ public class AdapterWorkerManagement {
IAdapter<?> adapter = AdapterUtils.setAdapter(adapterSetDescription);
- RunningAdapterInstances.INSTANCE.addAdapter(adapterSetDescription.getElementId(), adapter);
+ RunningAdapterInstances.INSTANCE.addAdapter(adapterSetDescription.getElementId(), adapter, adapterSetDescription);
adapter.changeEventGrounding(adapterSetDescription.getDataSet().getEventGrounding().getTransportProtocol());
diff --git a/streampipes-connect-container-worker/src/main/java/org/apache/streampipes/connect/container/worker/rest/AdapterWorkerResource.java b/streampipes-connect-container-worker/src/main/java/org/apache/streampipes/connect/container/worker/rest/AdapterWorkerResource.java
index d252a12..a05e749 100644
--- a/streampipes-connect-container-worker/src/main/java/org/apache/streampipes/connect/container/worker/rest/AdapterWorkerResource.java
+++ b/streampipes-connect-container-worker/src/main/java/org/apache/streampipes/connect/container/worker/rest/AdapterWorkerResource.java
@@ -28,10 +28,7 @@ import org.apache.streampipes.rest.shared.impl.AbstractSharedRestInterface;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import javax.ws.rs.Consumes;
-import javax.ws.rs.POST;
-import javax.ws.rs.Path;
-import javax.ws.rs.Produces;
+import javax.ws.rs.*;
import javax.ws.rs.core.MediaType;
import javax.ws.rs.core.Response;
@@ -50,6 +47,16 @@ public class AdapterWorkerResource extends AbstractSharedRestInterface {
this.adapterManagement = adapterManagement;
}
+ // get all running instances
+ @GET
+ @JacksonSerialized
+ @Path("/running")
+ @Produces(MediaType.APPLICATION_JSON)
+ public Response getRunningAdapterInstances() {
+ return ok(adapterManagement.getAllRunningAdapterInstances());
+ }
+
+
@POST
@JacksonSerialized
@Path("/stream/invoke")
diff --git a/streampipes-connect-container-worker/src/test/java/org/apache/streampipes/connect/container/worker/management/AdapterWorkerManagementTest.java b/streampipes-connect-container-worker/src/test/java/org/apache/streampipes/connect/container/worker/management/AdapterWorkerManagementTest.java
index cf579c3..631f3f9 100644
--- a/streampipes-connect-container-worker/src/test/java/org/apache/streampipes/connect/container/worker/management/AdapterWorkerManagementTest.java
+++ b/streampipes-connect-container-worker/src/test/java/org/apache/streampipes/connect/container/worker/management/AdapterWorkerManagementTest.java
@@ -18,28 +18,24 @@
package org.apache.streampipes.connect.container.worker.management;
-import static org.junit.Assert.*;
-
-import org.junit.Test;
-import org.junit.runner.RunWith;
-import org.powermock.core.classloader.annotations.PowerMockIgnore;
-import org.powermock.core.classloader.annotations.PrepareForTest;
-import org.powermock.modules.junit4.PowerMockRunner;
import org.apache.streampipes.connect.RunningAdapterInstances;
import org.apache.streampipes.connect.adapter.Adapter;
import org.apache.streampipes.connect.adapter.AdapterRegistry;
-import org.apache.streampipes.connect.api.exception.AdapterException;
import org.apache.streampipes.connect.adapter.model.specific.SpecificDataSetAdapter;
+import org.apache.streampipes.connect.api.exception.AdapterException;
import org.apache.streampipes.connect.container.worker.utils.Utils;
-import org.apache.streampipes.model.connect.adapter.AdapterSetDescription;
-import org.apache.streampipes.model.connect.adapter.AdapterStreamDescription;
-import org.apache.streampipes.model.connect.adapter.GenericAdapterSetDescription;
-import org.apache.streampipes.model.connect.adapter.GenericAdapterStreamDescription;
-import org.apache.streampipes.model.connect.adapter.SpecificAdapterSetDescription;
+import org.apache.streampipes.model.connect.adapter.*;
import org.apache.streampipes.model.connect.guess.GuessSchema;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.powermock.core.classloader.annotations.PowerMockIgnore;
+import org.powermock.core.classloader.annotations.PrepareForTest;
+import org.powermock.modules.junit4.PowerMockRunner;
import java.util.ArrayList;
+import static org.junit.Assert.*;
+
@RunWith(PowerMockRunner.class)
@PrepareForTest({ AdapterRegistry.class })
@PowerMockIgnore({"com.sun.org.apache.xerces.*", "javax.xml.*", "org.xml.*", "javax.management.*"})
@@ -64,7 +60,7 @@ public class AdapterWorkerManagementTest {
@Test
public void stopStreamAdapterSuccess() throws AdapterException {
TestAdapter testAdapter = getTestAdapterInstance();
- RunningAdapterInstances.INSTANCE.addAdapter("http://t.de/", testAdapter);
+ RunningAdapterInstances.INSTANCE.addAdapter("http://t.de/", testAdapter, null);
AdapterWorkerManagement adapterWorkerManagement = new AdapterWorkerManagement();
adapterWorkerManagement.stopStreamAdapter(Utils.getMinimalStreamAdapter());
@@ -92,7 +88,7 @@ public class AdapterWorkerManagementTest {
public void stopSetAdapterSuccess() throws AdapterException {
TestAdapter testAdapter = getTestAdapterInstance();
- RunningAdapterInstances.INSTANCE.addAdapter("http://t.de/", testAdapter);
+ RunningAdapterInstances.INSTANCE.addAdapter("http://t.de/", testAdapter, null);
AdapterWorkerManagement adapterWorkerManagement = new AdapterWorkerManagement();
adapterWorkerManagement.stopSetAdapter(Utils.getMinimalSetAdapter());
diff --git a/streampipes-connect/src/main/java/org/apache/streampipes/connect/RunningAdapterInstances.java b/streampipes-connect/src/main/java/org/apache/streampipes/connect/RunningAdapterInstances.java
index 22baaac..d4cf08d 100644
--- a/streampipes-connect/src/main/java/org/apache/streampipes/connect/RunningAdapterInstances.java
+++ b/streampipes-connect/src/main/java/org/apache/streampipes/connect/RunningAdapterInstances.java
@@ -19,24 +19,33 @@
package org.apache.streampipes.connect;
import org.apache.streampipes.connect.api.IAdapter;
+import org.apache.streampipes.model.connect.adapter.AdapterDescription;
+import java.util.Collection;
import java.util.HashMap;
import java.util.Map;
public enum RunningAdapterInstances {
INSTANCE;
- private final Map<String, IAdapter<?>> runningInstances = new HashMap<>();
+ private final Map<String, IAdapter<?>> runningAdapterInstances = new HashMap<>();
+ private final Map<String, AdapterDescription> runningAdapterDescriptionInstances = new HashMap<>();
- public void addAdapter(String elementId, IAdapter<?> adapter) {
- runningInstances.put(elementId, adapter);
+ public void addAdapter(String elementId, IAdapter<?> adapter, AdapterDescription adapterDescription) {
+ runningAdapterInstances.put(elementId, adapter);
+ runningAdapterDescriptionInstances.put(elementId, adapterDescription);
}
public IAdapter<?> removeAdapter(String elementId) {
- IAdapter<?> result = runningInstances.get(elementId);
- runningInstances.remove(elementId);
+ IAdapter<?> result = runningAdapterInstances.get(elementId);
+ runningAdapterInstances.remove(elementId);
+ runningAdapterDescriptionInstances.remove(elementId);
return result;
}
+ public Collection<AdapterDescription> getAllRunningAdapterDescriptions() {
+ return this.runningAdapterDescriptionInstances.values();
+ }
+
}
diff --git a/ui/src/app/connect/components/data-marketplace/adapter-description/adapter-description.component.ts b/ui/src/app/connect/components/data-marketplace/adapter-description/adapter-description.component.ts
index 799d78c..d82c1e9 100644
--- a/ui/src/app/connect/components/data-marketplace/adapter-description/adapter-description.component.ts
+++ b/ui/src/app/connect/components/data-marketplace/adapter-description/adapter-description.component.ts
@@ -85,15 +85,6 @@ export class AdapterDescriptionComponent implements OnInit {
});
}
- deleteAdapterTemplate(adapter: AdapterDescription): void {
- this.adapterToDelete = adapter.elementId;
- this.dataMarketplaceService.deleteAdapterTemplate(adapter).subscribe(res => {
- this.adapterToDelete = undefined;
- this.updateAdapterEmitter.emit();
- this.deleting = false;
- });
- }
-
createTemplate(adapter: AdapterDescription): void {
this.createTemplateEmitter.emit(adapter);
}