You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@streampipes.apache.org by ze...@apache.org on 2021/10/11 13:00:16 UTC

[incubator-streampipes] branch STREAMPIPES-438 updated (a721058 -> 3bdfe1d)

This is an automated email from the ASF dual-hosted git repository.

zehnder pushed a change to branch STREAMPIPES-438
in repository https://gitbox.apache.org/repos/asf/incubator-streampipes.git.


    from a721058  Working on recovery for adapters
     new 6207431  [STREAMPIPES-438] Harmonize Model Submitter
     new af1cab9  [STREAMPIPES-438] Minor code refactoring
     new 5897ade  [STREAMPIPES-438] Fix data set adapters
     new 3bdfe1d  [STREAMPIPES-438] Remove data stream source id from backend

The 4 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../master/health/AdapterHealthCheck.java          |  86 +++++++++---
 .../master/management/AdapterMasterManagement.java | 150 ++++++++++-----------
 .../master/management/SourcesManagement.java       | 109 +++++----------
 .../master/management/UnitMasterManagement.java    |   7 +-
 .../management/WorkerAdministrationManagement.java |  59 --------
 .../master/management/WorkerRestClient.java        |  39 +++---
 .../connect/container/master/util/WorkerPaths.java |  19 ++-
 .../master/health/AdapterHealthCheckTest.java      |  87 ++++++++++++
 .../management/DescriptionManagementTest.java      |   1 -
 .../master/management/SourcesManagementTest.java   |  30 +----
 .../worker/management/AdapterWorkerManagement.java |   4 +-
 .../worker/management/GuessManagement.java         |   8 --
 .../management/HttpServerAdapterManagement.java    |   3 +-
 .../worker/management/MasterRestClient.java        |   3 +-
 .../worker/rest/AdapterWorkerResource.java         |   9 +-
 .../container/worker/utils/AdapterUtils.java       |  26 ----
 .../container/worker/utils/AdapterUtilsTest.java   |  52 -------
 .../manager/execution/http/PipelineExecutor.java   |   6 +-
 .../manager/setup/CouchDbInstallationStep.java     |   3 +-
 .../rest/impl/connect/AdapterResource.java         |   4 +-
 .../rest/impl/connect/GuessResource.java           |   3 +-
 .../impl/connect/RuntimeResolvableResource.java    |   4 +-
 .../rest/impl/connect/SourcesResource.java         |  39 +-----
 .../impl/connect/WorkerAdministrationResource.java |   5 +-
 .../builder/adapter/AdapterDescriptionBuilder.java |   9 +-
 .../tests/pipelineElement/AllPipelineElements.ts   |   1 -
 26 files changed, 323 insertions(+), 443 deletions(-)
 create mode 100644 streampipes-connect-container-master/src/test/java/org/apache/streampipes/connect/container/master/health/AdapterHealthCheckTest.java
 delete mode 100644 streampipes-connect-container-worker/src/test/java/org/apache/streampipes/connect/container/worker/utils/AdapterUtilsTest.java

[incubator-streampipes] 02/04: [STREAMPIPES-438] Minor code refactoring

Posted by ze...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

zehnder pushed a commit to branch STREAMPIPES-438
in repository https://gitbox.apache.org/repos/asf/incubator-streampipes.git

commit af1cab9c9ae5c03ada5e3cf9bb40e9467ceb921f
Author: Philipp Zehnder <ze...@fzi.de>
AuthorDate: Mon Oct 11 11:52:05 2021 +0200

    [STREAMPIPES-438] Minor code refactoring
---
 .../master/health/AdapterHealthCheck.java          |  3 ++-
 .../master/management/AdapterMasterManagement.java |  7 +++++--
 .../master/management/SourcesManagement.java       | 23 +++++++---------------
 .../master/management/UnitMasterManagement.java    |  7 ++-----
 .../master/management/WorkerRestClient.java        |  9 ++++++---
 .../worker/management/GuessManagement.java         |  8 --------
 .../management/HttpServerAdapterManagement.java    |  3 ++-
 .../worker/management/MasterRestClient.java        |  3 ++-
 8 files changed, 26 insertions(+), 37 deletions(-)

diff --git a/streampipes-connect-container-master/src/main/java/org/apache/streampipes/connect/container/master/health/AdapterHealthCheck.java b/streampipes-connect-container-master/src/main/java/org/apache/streampipes/connect/container/master/health/AdapterHealthCheck.java
index 8be0b6a..7063649 100644
--- a/streampipes-connect-container-master/src/main/java/org/apache/streampipes/connect/container/master/health/AdapterHealthCheck.java
+++ b/streampipes-connect-container-master/src/main/java/org/apache/streampipes/connect/container/master/health/AdapterHealthCheck.java
@@ -39,7 +39,8 @@ public class AdapterHealthCheck {
         this.adapterMasterManagement = new AdapterMasterManagement();
     }
 
-    public AdapterHealthCheck(IAdapterStorage adapterStorage, AdapterMasterManagement adapterMasterManagement) {
+    public AdapterHealthCheck(IAdapterStorage adapterStorage,
+                              AdapterMasterManagement adapterMasterManagement) {
         this.adapterStorage = adapterStorage;
         this.adapterMasterManagement = adapterMasterManagement;
     }
diff --git a/streampipes-connect-container-master/src/main/java/org/apache/streampipes/connect/container/master/management/AdapterMasterManagement.java b/streampipes-connect-container-master/src/main/java/org/apache/streampipes/connect/container/master/management/AdapterMasterManagement.java
index 252a125..5ada040 100644
--- a/streampipes-connect-container-master/src/main/java/org/apache/streampipes/connect/container/master/management/AdapterMasterManagement.java
+++ b/streampipes-connect-container-master/src/main/java/org/apache/streampipes/connect/container/master/management/AdapterMasterManagement.java
@@ -171,7 +171,9 @@ public class AdapterMasterManagement {
     return allAdapters;
   }
 
-  public void stopSetAdapter(String elementId, String baseUrl, AdapterInstanceStorageImpl adapterStorage) throws AdapterException {
+  public void stopSetAdapter(String elementId,
+                             String baseUrl,
+                             AdapterInstanceStorageImpl adapterStorage) throws AdapterException {
 
     AdapterSetDescription ad = (AdapterSetDescription) adapterStorage.getAdapter(elementId);
 
@@ -216,7 +218,8 @@ public class AdapterMasterManagement {
     }
   }
 
-  private void installDataSource(SpDataStream stream, String username) throws AdapterException {
+  private void installDataSource(SpDataStream stream,
+                                 String username) throws AdapterException {
     try {
       new DataStreamVerifier(stream).verifyAndAdd(username, true, true);
     } catch (SepaParseException e) {
diff --git a/streampipes-connect-container-master/src/main/java/org/apache/streampipes/connect/container/master/management/SourcesManagement.java b/streampipes-connect-container-master/src/main/java/org/apache/streampipes/connect/container/master/management/SourcesManagement.java
index d474062..ee54fdd 100644
--- a/streampipes-connect-container-master/src/main/java/org/apache/streampipes/connect/container/master/management/SourcesManagement.java
+++ b/streampipes-connect-container-master/src/main/java/org/apache/streampipes/connect/container/master/management/SourcesManagement.java
@@ -59,14 +59,15 @@ public class SourcesManagement {
        this(new AdapterInstanceStorageImpl());
     }
 
-    public void addAdapter(String streamId, SpDataSet dataSet) throws AdapterException, NoServiceEndpointsAvailableException {
+    public void addAdapter(String streamId,
+                           SpDataSet dataSet) throws AdapterException, NoServiceEndpointsAvailableException {
 
 
         String newUrl = getAdapterUrl(streamId);
         AdapterSetDescription adapterDescription = (AdapterSetDescription) getAdapterDescriptionById(streamId);
         adapterDescription.setDataSet(dataSet);
 
-        String newId = adapterDescription.getUri() + "/streams/" + dataSet.getDatasetInvocationId();
+        String newId = adapterDescription.getElementId() + "/streams/" + dataSet.getDatasetInvocationId();
         adapterDescription.setElementId(newId);
 
         AdapterSetDescription decryptedAdapterDescription =
@@ -75,10 +76,11 @@ public class SourcesManagement {
         WorkerRestClient.invokeSetAdapter(newUrl, decryptedAdapterDescription);
     }
 
-    public void detachAdapter(String streamId, String runningInstanceId) throws AdapterException, NoServiceEndpointsAvailableException {
+    public void detachAdapter(String streamId,
+                              String runningInstanceId) throws AdapterException, NoServiceEndpointsAvailableException {
         AdapterSetDescription adapterDescription = (AdapterSetDescription) getAdapterDescriptionById(streamId);
 
-        String newId = adapterDescription.getUri() + "/streams/" + runningInstanceId;
+        String newId = adapterDescription.getElementId() + "/streams/" + runningInstanceId;
         adapterDescription.setElementId(newId);
 
         String newUrl = getAdapterUrl(streamId);
@@ -107,7 +109,7 @@ public class SourcesManagement {
             URI uri;
             String uriString = null;
             try {
-                uriString = ad.getUri();
+                uriString = ad.getElementId();
                 uri = new URI(uriString);
             } catch (URISyntaxException e) {
                 logger.error("URI for the sources endpoint is not correct: " + uriString, e);
@@ -149,7 +151,6 @@ public class SourcesManagement {
 
     public SpDataStream getAdapterDataStream(String id) throws AdapterException {
 
-//        AdapterDescription adapterDescription = new AdapterStorageImpl().getAdapter(id);
         // get all Adapters and check id
         AdapterDescription adapterDescription = getAdapterDescriptionById(id);
 
@@ -163,19 +164,9 @@ public class SourcesManagement {
             ((SpDataSet) ds).setSupportedGrounding(eg);
         } else {
             ds = ((AdapterStreamDescription) adapterDescription).getDataStream();
-
-
-//            String topic = adapterDescription.getEventGrounding().getTransportProtocol().getTopicDefinition().getActualTopicName();
-//
-//            TransportProtocol tp = Protocols.kafka(BackendConfig.INSTANCE.getKafkaHost(), BackendConfig.INSTANCE.getKafkaPort(), topic);
-//            EventGrounding eg = new EventGrounding();
-//            eg.setTransportProtocol(tp);
-//
             ds.setEventGrounding(new EventGrounding(adapterDescription.getEventGrounding()));
         }
 
-        String url = adapterDescription.getUri();
-
         ds.setName(adapterDescription.getName());
         ds.setDescription(adapterDescription.getDescription());
         ds.setCorrespondingAdapterId(adapterDescription.getAppId());
diff --git a/streampipes-connect-container-master/src/main/java/org/apache/streampipes/connect/container/master/management/UnitMasterManagement.java b/streampipes-connect-container-master/src/main/java/org/apache/streampipes/connect/container/master/management/UnitMasterManagement.java
index a31e30c..82a0c22 100644
--- a/streampipes-connect-container-master/src/main/java/org/apache/streampipes/connect/container/master/management/UnitMasterManagement.java
+++ b/streampipes-connect-container-master/src/main/java/org/apache/streampipes/connect/container/master/management/UnitMasterManagement.java
@@ -20,12 +20,11 @@ package org.apache.streampipes.connect.container.master.management;
 
 import com.github.jqudt.Unit;
 import com.google.gson.Gson;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 import org.apache.streampipes.connect.api.exception.AdapterException;
 import org.apache.streampipes.model.connect.unit.UnitDescription;
-import org.apache.streampipes.units.UnitCollector;
 import org.apache.streampipes.units.UnitProvider;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import java.util.Iterator;
 import java.util.LinkedList;
@@ -34,11 +33,9 @@ import java.util.List;
 public class UnitMasterManagement {
 
     private static final Logger logger = LoggerFactory.getLogger(UnitMasterManagement.class);
-    private UnitCollector unitCollector;
     private Gson gson;
 
     public UnitMasterManagement() {
-        this.unitCollector = new UnitCollector();
         gson = new Gson();
     }
 
diff --git a/streampipes-connect-container-master/src/main/java/org/apache/streampipes/connect/container/master/management/WorkerRestClient.java b/streampipes-connect-container-master/src/main/java/org/apache/streampipes/connect/container/master/management/WorkerRestClient.java
index c749d53..0026b60 100644
--- a/streampipes-connect-container-master/src/main/java/org/apache/streampipes/connect/container/master/management/WorkerRestClient.java
+++ b/streampipes-connect-container-master/src/main/java/org/apache/streampipes/connect/container/master/management/WorkerRestClient.java
@@ -65,13 +65,15 @@ public class WorkerRestClient {
         updateStreamAdapterStatus(adapterStreamDescription.getElementId(), false);
     }
 
-    public static void invokeSetAdapter(String baseUrl, AdapterSetDescription adapterSetDescription) throws AdapterException {
+    public static void invokeSetAdapter(String baseUrl,
+                                        AdapterSetDescription adapterSetDescription) throws AdapterException {
         String url = baseUrl + WorkerPaths.getSetInvokePath();
 
         startAdapter(url, adapterSetDescription);
     }
 
-    public static void stopSetAdapter(String baseUrl, AdapterSetDescription adapterSetDescription) throws AdapterException {
+    public static void stopSetAdapter(String baseUrl,
+                                      AdapterSetDescription adapterSetDescription) throws AdapterException {
         String url = baseUrl + WorkerPaths.getSetStopPath();
 
         stopAdapter(adapterSetDescription, url);
@@ -95,7 +97,8 @@ public class WorkerRestClient {
         }
     }
 
-    public static void startAdapter(String url, AdapterDescription ad) throws AdapterException {
+    public static void startAdapter(String url,
+                                    AdapterDescription ad) throws AdapterException {
         try {
             logger.info("Trying to start adapter on endpoint: " + url);
 
diff --git a/streampipes-connect-container-worker/src/main/java/org/apache/streampipes/connect/container/worker/management/GuessManagement.java b/streampipes-connect-container-worker/src/main/java/org/apache/streampipes/connect/container/worker/management/GuessManagement.java
index ce91faa..41f81de 100644
--- a/streampipes-connect-container-worker/src/main/java/org/apache/streampipes/connect/container/worker/management/GuessManagement.java
+++ b/streampipes-connect-container-worker/src/main/java/org/apache/streampipes/connect/container/worker/management/GuessManagement.java
@@ -24,7 +24,6 @@ import org.apache.streampipes.connect.api.exception.ParseException;
 import org.apache.streampipes.connect.container.worker.utils.AdapterUtils;
 import org.apache.streampipes.model.connect.adapter.AdapterDescription;
 import org.apache.streampipes.model.connect.guess.GuessSchema;
-import org.apache.streampipes.sdk.helpers.EpProperties;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -44,13 +43,6 @@ public class GuessManagement {
         try {
             guessSchema = adapter.getSchema(adapterDescription);
 
-             // TODO remove, just for performance tests
-            if ("true".equals(System.getenv("SP_DEBUG_CONNECT"))) {
-
-                guessSchema.getEventSchema().addEventProperty(EpProperties.timestampProperty("internal_t1"));
-                guessSchema.getEventSchema().addEventProperty(EpProperties.timestampProperty("internal_t2"));
-            }
-
             for (int i = 0; i < guessSchema.getEventSchema().getEventProperties().size(); i++) {
                 guessSchema.getEventSchema().getEventProperties().get(i).setIndex(i);
             }
diff --git a/streampipes-connect-container-worker/src/main/java/org/apache/streampipes/connect/container/worker/management/HttpServerAdapterManagement.java b/streampipes-connect-container-worker/src/main/java/org/apache/streampipes/connect/container/worker/management/HttpServerAdapterManagement.java
index 0f2dc7d..f7bb8b5 100644
--- a/streampipes-connect-container-worker/src/main/java/org/apache/streampipes/connect/container/worker/management/HttpServerAdapterManagement.java
+++ b/streampipes-connect-container-worker/src/main/java/org/apache/streampipes/connect/container/worker/management/HttpServerAdapterManagement.java
@@ -32,7 +32,8 @@ public enum HttpServerAdapterManagement {
     this.httpServerAdapters = new HashMap<>();
   }
 
-  public void addAdapter(String endpointId, InternalEventProcessor<byte[]> callback) {
+  public void addAdapter(String endpointId,
+                         InternalEventProcessor<byte[]> callback) {
     this.httpServerAdapters.put(endpointId, callback);
   }
 
diff --git a/streampipes-connect-container-worker/src/main/java/org/apache/streampipes/connect/container/worker/management/MasterRestClient.java b/streampipes-connect-container-worker/src/main/java/org/apache/streampipes/connect/container/worker/management/MasterRestClient.java
index 5c7fbdd..09edaeb 100644
--- a/streampipes-connect-container-worker/src/main/java/org/apache/streampipes/connect/container/worker/management/MasterRestClient.java
+++ b/streampipes-connect-container-worker/src/main/java/org/apache/streampipes/connect/container/worker/management/MasterRestClient.java
@@ -32,7 +32,8 @@ public class MasterRestClient {
 
     private static final Logger LOG = LoggerFactory.getLogger(MasterRestClient.class);
 
-    public static boolean register(String baseUrl, List<AdapterDescription> allAvailableAdapters) {
+    public static boolean register(String baseUrl,
+                                   List<AdapterDescription> allAvailableAdapters) {
 
         String url = baseUrl + "/api/v2/connect/admin@streampipes.org/master/administration";
 

[incubator-streampipes] 01/04: [STREAMPIPES-438] Harmonize Model Submitter

Posted by ze...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

zehnder pushed a commit to branch STREAMPIPES-438
in repository https://gitbox.apache.org/repos/asf/incubator-streampipes.git

commit 62074315c97564eb6423e70a46b99b7691af490f
Author: Philipp Zehnder <ze...@fzi.de>
AuthorDate: Mon Oct 11 11:42:51 2021 +0200

    [STREAMPIPES-438] Harmonize Model Submitter
---
 .../master/health/AdapterHealthCheck.java          |  75 +++++++++++---
 .../master/management/AdapterMasterManagement.java | 113 ++++++++++-----------
 .../master/management/SourcesManagement.java       |   8 --
 .../management/WorkerAdministrationManagement.java |  59 -----------
 .../master/management/WorkerRestClient.java        |  28 ++---
 .../connect/container/master/util/WorkerPaths.java |   4 +
 .../master/health/AdapterHealthCheckTest.java      |  87 ++++++++++++++++
 .../management/DescriptionManagementTest.java      |   1 -
 .../master/management/SourcesManagementTest.java   |   3 +-
 .../worker/management/AdapterWorkerManagement.java |   4 +-
 .../worker/rest/AdapterWorkerResource.java         |   1 -
 .../container/worker/utils/AdapterUtils.java       |  26 -----
 .../container/worker/utils/AdapterUtilsTest.java   |  52 ----------
 .../rest/impl/connect/AdapterResource.java         |   4 +-
 .../rest/impl/connect/GuessResource.java           |   3 +-
 .../impl/connect/RuntimeResolvableResource.java    |   4 +-
 .../rest/impl/connect/SourcesResource.java         |   3 -
 .../impl/connect/WorkerAdministrationResource.java |   5 +-
 .../builder/adapter/AdapterDescriptionBuilder.java |   9 +-
 19 files changed, 229 insertions(+), 260 deletions(-)

diff --git a/streampipes-connect-container-master/src/main/java/org/apache/streampipes/connect/container/master/health/AdapterHealthCheck.java b/streampipes-connect-container-master/src/main/java/org/apache/streampipes/connect/container/master/health/AdapterHealthCheck.java
index 0a6029c..8be0b6a 100644
--- a/streampipes-connect-container-master/src/main/java/org/apache/streampipes/connect/container/master/health/AdapterHealthCheck.java
+++ b/streampipes-connect-container-master/src/main/java/org/apache/streampipes/connect/container/master/health/AdapterHealthCheck.java
@@ -21,7 +21,9 @@ package org.apache.streampipes.connect.container.master.health;
 import org.apache.streampipes.connect.api.exception.AdapterException;
 import org.apache.streampipes.connect.container.master.management.AdapterMasterManagement;
 import org.apache.streampipes.connect.container.master.management.WorkerRestClient;
+import org.apache.streampipes.connect.container.master.util.WorkerPaths;
 import org.apache.streampipes.model.connect.adapter.AdapterDescription;
+import org.apache.streampipes.model.connect.adapter.AdapterStreamDescription;
 import org.apache.streampipes.storage.api.IAdapterStorage;
 import org.apache.streampipes.storage.couchdb.CouchDbStorageManager;
 
@@ -29,51 +31,96 @@ import java.util.*;
 
 public class AdapterHealthCheck {
 
+    private IAdapterStorage adapterStorage;
+    private AdapterMasterManagement adapterMasterManagement;
+
     public AdapterHealthCheck() {
+        this.adapterStorage = CouchDbStorageManager.INSTANCE.getAdapterInstanceStorage();
+        this.adapterMasterManagement = new AdapterMasterManagement();
+    }
+
+    public AdapterHealthCheck(IAdapterStorage adapterStorage, AdapterMasterManagement adapterMasterManagement) {
+        this.adapterStorage = adapterStorage;
+        this.adapterMasterManagement = adapterMasterManagement;
     }
 
-    // TODO how can I test this code?
+    /**
+     * In this method it is checked which adapters are currently running. Then it calls all workers to validate if the adapter instance is
+     * still running as expected. If the adapter is not running anymore a new worker instance is invoked.
+     */
     public void checkAndRestoreAdapters() {
-        AdapterMasterManagement adapterMasterManagement = new AdapterMasterManagement();
+        // Get all adapters
+        Map<String, AdapterDescription> allRunningInstancesAdapterDescriptions = this.getAllRunningInstancesAdapterDescriptions();
 
-        IAdapterStorage adapterStorage = CouchDbStorageManager.INSTANCE.getAdapterInstanceStorage();
+        // Get all worker containers that run adapters
+        Map<String, List<AdapterDescription>> groupByWorker = this.getAllWorkersWithAdapters(allRunningInstancesAdapterDescriptions);
 
-        List<AdapterDescription> allAdapersToRecover = new ArrayList<>();
+        // Get adapters that are not running anymore
+        Map<String, AdapterDescription> allAdaptersToRecover = this.getAdaptersToRecover(groupByWorker, allRunningInstancesAdapterDescriptions);
 
-        // Get all adapters
-        List<AdapterDescription> allRunningInstancesAdaperDescription = adapterStorage.getAllAdapters();
+        // Recover Adapters
+        this.recoverAdapters(allAdaptersToRecover);
+    }
+
+    public Map<String, AdapterDescription> getAllRunningInstancesAdapterDescriptions() {
+        Map<String, AdapterDescription> result = new HashMap<>();
+        List<AdapterDescription> allRunningInstancesAdapterDescription = this.adapterStorage.getAllAdapters();
+        allRunningInstancesAdapterDescription.forEach(adapterDescription -> {
+            result.put(adapterDescription.getElementId(), adapterDescription);
+        });
+
+        return result;
+    }
+
+    public Map<String, List<AdapterDescription>> getAllWorkersWithAdapters(
+            Map<String, AdapterDescription> allRunningInstancesAdapterDescription) {
 
         Map<String, List<AdapterDescription>> groupByWorker = new HashMap<>();
-        allRunningInstancesAdaperDescription.forEach(ad -> {
+        allRunningInstancesAdapterDescription.values().forEach(ad -> {
             String selectedEndpointUrl = ad.getSelectedEndpointUrl();
             if (groupByWorker.containsKey(selectedEndpointUrl)) {
                 groupByWorker.get(selectedEndpointUrl).add(ad);
             } else {
-                List<AdapterDescription> tmp = Arrays.asList(ad);
+                List<AdapterDescription> tmp = new ArrayList<>();
+                tmp.add(ad);
                 groupByWorker.put(selectedEndpointUrl, tmp);
             }
         });
 
+        return groupByWorker;
+    }
+
+    public Map<String, AdapterDescription> getAdaptersToRecover(
+            Map<String, List<AdapterDescription>> groupByWorker,
+            Map<String, AdapterDescription> allRunningInstancesAdapterDescription) {
         groupByWorker.keySet().forEach(adapterEndpointUrl -> {
             try {
-                List<AdapterDescription> allRunningInstancesOfOneWorker = WorkerRestClient.getAllRunningAdapterInstanceDescriptions("");
-                // TODO Remove all running adapters from allRunningInstancesAdaperDescription
+                List<AdapterDescription> allRunningInstancesOfOneWorker = WorkerRestClient.getAllRunningAdapterInstanceDescriptions(adapterEndpointUrl + WorkerPaths.getRunningAdaptersPath());
+                allRunningInstancesOfOneWorker.forEach(adapterDescription -> {
+                    allRunningInstancesAdapterDescription.remove(adapterDescription.getElementId());
+                });
             } catch (AdapterException e) {
                 e.printStackTrace();
             }
         });
 
-        for (AdapterDescription adapterDescription : allAdapersToRecover) {
-            // TODO how do I know there is a worker to start them?
+        return allRunningInstancesAdapterDescription;
+    }
+
 
+    public boolean recoverAdapters(Map<String, AdapterDescription> adaptersToRecover) {
+        for (AdapterDescription adapterDescription : adaptersToRecover.values()) {
             // Invoke the adapters
             try {
-                adapterMasterManagement.startStreamAdapter(adapterDescription.getElementId());
+                if (adapterDescription instanceof AdapterStreamDescription) {
+                    this.adapterMasterManagement.startStreamAdapter(adapterDescription.getElementId());
+                }
             } catch (AdapterException e) {
                 e.printStackTrace();
             }
         }
 
-   }
+        return true;
+    }
 
 }
diff --git a/streampipes-connect-container-master/src/main/java/org/apache/streampipes/connect/container/master/management/AdapterMasterManagement.java b/streampipes-connect-container-master/src/main/java/org/apache/streampipes/connect/container/master/management/AdapterMasterManagement.java
index 98dceca..252a125 100644
--- a/streampipes-connect-container-master/src/main/java/org/apache/streampipes/connect/container/master/management/AdapterMasterManagement.java
+++ b/streampipes-connect-container-master/src/main/java/org/apache/streampipes/connect/container/master/management/AdapterMasterManagement.java
@@ -47,17 +47,17 @@ import java.util.UUID;
 
 import static org.apache.streampipes.manager.storage.UserManagementService.getUserService;
 
-
+/**
+ * This class is responsible for managing all the adapter instances which are executed on worker nodes
+ */
 public class AdapterMasterManagement {
 
   private static final Logger LOG = LoggerFactory.getLogger(AdapterMasterManagement.class);
 
   private IAdapterStorage adapterInstanceStorage;
-  private WorkerUrlProvider workerUrlProvider;
 
   public AdapterMasterManagement() {
     this.adapterInstanceStorage = getAdapterInstanceStorage();
-    this.workerUrlProvider = new WorkerUrlProvider();
   }
 
   public AdapterMasterManagement(IAdapterStorage adapterStorage) {
@@ -68,45 +68,36 @@ public class AdapterMasterManagement {
                            String username)
           throws AdapterException {
 
-    try {
-      // Create elementId for adapter
-      String uuid = UUID.randomUUID().toString();
-      ad.setElementId(ad.getElementId() + ":" + uuid);
-      ad.setCreatedAt(System.currentTimeMillis());
-
-      // Find worker to execute adapter
-      String selectedEndpointUrl = workerUrlProvider.getWorkerBaseUrl(ad.getAppId());
-      ad.setSelectedEndpointUrl(selectedEndpointUrl);
-
-      // Add EventGrounding to AdapterDescription
-      EventGrounding eventGrounding = GroundingService.createEventGrounding();
-      ad.setEventGrounding(eventGrounding);
-
-      // Encrypt adapter description to store it in db
-      AdapterDescription encryptedAdapterDescription =
-              new AdapterEncryptionService(new Cloner().adapterDescription(ad)).encrypt();
-
-      // store in db
-      encryptedAdapterDescription.setRev(null);
-      String adapterId = adapterInstanceStorage.storeAdapter(encryptedAdapterDescription);
-
-      // start when stream adapter
-      if (ad instanceof AdapterStreamDescription) {
-        WorkerRestClient.invokeStreamAdapter(selectedEndpointUrl, adapterId);
-        LOG.info("Start adapter");
-      }
+    // Create elementId for adapter
+    // TODO centralized provisioning of element id
+    String uuid = UUID.randomUUID().toString();
+    ad.setElementId(ad.getElementId() + ":" + uuid);
+    ad.setCreatedAt(System.currentTimeMillis());
+
+    // Add EventGrounding to AdapterDescription
+    EventGrounding eventGrounding = GroundingService.createEventGrounding();
+    ad.setEventGrounding(eventGrounding);
 
-      // Create stream
-      SpDataStream storedDescription = new SourcesManagement().getAdapterDataStream(ad.getElementId());
-      storedDescription.setCorrespondingAdapterId(adapterId);
-      installDataSource(storedDescription, username);
-      LOG.info("Install source (source URL: {} in backend", ad.getElementId());
+    // Encrypt adapter description to store it in db
+    AdapterDescription encryptedAdapterDescription =
+            new AdapterEncryptionService(new Cloner().adapterDescription(ad)).encrypt();
 
-      return storedDescription.getElementId();
-    } catch (NoServiceEndpointsAvailableException e) {
-      throw new AdapterException("Could not find a worker for adapter " + ad.getAppId(), e);
+    // store in db
+    encryptedAdapterDescription.setRev(null);
+    String elementId = adapterInstanceStorage.storeAdapter(encryptedAdapterDescription);
+
+    // start when stream adapter
+    if (ad instanceof AdapterStreamDescription) {
+      startStreamAdapter(elementId);
     }
 
+    // Create stream
+    SpDataStream storedDescription = new SourcesManagement().getAdapterDataStream(ad.getElementId());
+    storedDescription.setCorrespondingAdapterId(elementId);
+    installDataSource(storedDescription, username);
+    LOG.info("Install source (source URL: {} in backend", ad.getElementId());
+
+    return storedDescription.getElementId();
   }
 
 
@@ -180,9 +171,9 @@ public class AdapterMasterManagement {
     return allAdapters;
   }
 
-  public void stopSetAdapter(String adapterId, String baseUrl, AdapterInstanceStorageImpl adapterStorage) throws AdapterException {
+  public void stopSetAdapter(String elementId, String baseUrl, AdapterInstanceStorageImpl adapterStorage) throws AdapterException {
 
-    AdapterSetDescription ad = (AdapterSetDescription) adapterStorage.getAdapter(adapterId);
+    AdapterSetDescription ad = (AdapterSetDescription) adapterStorage.getAdapter(elementId);
 
     WorkerRestClient.stopSetAdapter(baseUrl, ad);
   }
@@ -198,30 +189,31 @@ public class AdapterMasterManagement {
   }
 
   public void startStreamAdapter(String elementId) throws AdapterException {
-    // TODO ensure that adapter is not started twice
 
     AdapterDescription ad = adapterInstanceStorage.getAdapter(elementId);
-    try {
-      String endpointUrl = findEndpointUrl(ad);
-      URI uri = new URI(endpointUrl);
-      String baseUrl = uri.getScheme() + "://" + uri.getAuthority();
-      if (!isStreamAdapter(elementId)) {
-        throw new AdapterException("Adapter " + elementId + "is not a stream adapter.");
-      } else {
+
+    if (!isStreamAdapter(ad)) {
+      throw new AdapterException("Adapter " + elementId + "is not a stream adapter.");
+    } else {
+
+      try {
+        // Find endpoint to start adapter on
+        String baseUrl = findEndpointUrl(ad.getAppId());
+
+        // Update selected endpoint URL of adapter
         ad.setSelectedEndpointUrl(baseUrl);
         adapterInstanceStorage.updateAdapter(ad);
 
-        AdapterDescription decryptedAdapterDescription =
-                new AdapterEncryptionService(new Cloner().adapterDescription(ad)).decrypt();
-        WorkerRestClient.invokeStreamAdapter(baseUrl, (AdapterStreamDescription) decryptedAdapterDescription);
+        // Invoke adapter instance
+        WorkerRestClient.invokeStreamAdapter(baseUrl, elementId);
+
+        LOG.info("Started adapter " + elementId + " on: " + baseUrl);
+      } catch (NoServiceEndpointsAvailableException e) {
+        e.printStackTrace();
+      } catch (URISyntaxException e) {
+        e.printStackTrace();
       }
-    } catch (NoServiceEndpointsAvailableException e) {
-      e.printStackTrace();
-    } catch (URISyntaxException e) {
-      e.printStackTrace();
     }
-
-
   }
 
   private void installDataSource(SpDataStream stream, String username) throws AdapterException {
@@ -247,8 +239,11 @@ public class AdapterMasterManagement {
     return new AdapterInstanceStorageImpl();
   }
 
-  private String findEndpointUrl(AdapterDescription adapterDescription) throws NoServiceEndpointsAvailableException {
+  private String findEndpointUrl(String appId) throws NoServiceEndpointsAvailableException, URISyntaxException {
     SpServiceUrlProvider serviceUrlProvider = SpServiceUrlProvider.ADAPTER;
-    return new ExtensionsServiceEndpointGenerator(adapterDescription.getAppId(), serviceUrlProvider).getEndpointResourceUrl();
+    String endpointUrl = new ExtensionsServiceEndpointGenerator(appId, serviceUrlProvider).getEndpointResourceUrl();
+    URI uri = new URI(endpointUrl);
+    String baseUrl = uri.getScheme() + "://" + uri.getAuthority();
+    return baseUrl;
   }
 }
diff --git a/streampipes-connect-container-master/src/main/java/org/apache/streampipes/connect/container/master/management/SourcesManagement.java b/streampipes-connect-container-master/src/main/java/org/apache/streampipes/connect/container/master/management/SourcesManagement.java
index f362473..d474062 100644
--- a/streampipes-connect-container-master/src/main/java/org/apache/streampipes/connect/container/master/management/SourcesManagement.java
+++ b/streampipes-connect-container-master/src/main/java/org/apache/streampipes/connect/container/master/management/SourcesManagement.java
@@ -49,7 +49,6 @@ public class SourcesManagement {
 
     private AdapterInstanceStorageImpl adapterStorage;
     private WorkerUrlProvider workerUrlProvider;
-    private String connectHost = null;
 
     public SourcesManagement(AdapterInstanceStorageImpl adapterStorage) {
       this.adapterStorage = adapterStorage;
@@ -100,7 +99,6 @@ public class SourcesManagement {
     }
 
     public String getAllAdaptersInstallDescription() throws AdapterException {
-//        String host = getConnectHost();
 
         List<AdapterDescription> allAdapters = adapterStorage.getAllAdapters();
         List<Description> allAdapterDescriptions = new ArrayList<>();
@@ -183,12 +181,6 @@ public class SourcesManagement {
         ds.setCorrespondingAdapterId(adapterDescription.getAppId());
         ds.setInternallyManaged(true);
 
-        ds.setUri(url);
-
         return ds;
     }
-
-    public void setConnectHost(String connectHost) {
-        this.connectHost = connectHost;
-    }
 }
diff --git a/streampipes-connect-container-master/src/main/java/org/apache/streampipes/connect/container/master/management/WorkerAdministrationManagement.java b/streampipes-connect-container-master/src/main/java/org/apache/streampipes/connect/container/master/management/WorkerAdministrationManagement.java
index 6258f12..9d3b5e3 100644
--- a/streampipes-connect-container-master/src/main/java/org/apache/streampipes/connect/container/master/management/WorkerAdministrationManagement.java
+++ b/streampipes-connect-container-master/src/main/java/org/apache/streampipes/connect/container/master/management/WorkerAdministrationManagement.java
@@ -31,14 +31,11 @@ public class WorkerAdministrationManagement {
 
     private static final Logger LOG = LoggerFactory.getLogger(AdapterMasterManagement.class);
 
-    private AdapterMasterManagement adapterMasterManagement;
-
     private IAdapterStorage adapterDescriptionStorage;
 
     private AdapterHealthCheck adapterHealthCheck;
 
     public WorkerAdministrationManagement() {
-        this.adapterMasterManagement = new AdapterMasterManagement();
         this.adapterHealthCheck = new AdapterHealthCheck();
         this.adapterDescriptionStorage = CouchDbStorageManager.INSTANCE.getAdapterDescriptionStorage();
     }
@@ -59,61 +56,5 @@ public class WorkerAdministrationManagement {
         });
 
         this.adapterHealthCheck.checkAndRestoreAdapters();
-
-
-        // Check if already registered
-
-//        List<ConnectWorkerContainer> allConnectWorkerContainers =
-//                this.connectionWorkerContainerStorage.getAllConnectWorkerContainers();
-//
-//        boolean alreadyRegistered = false;
-//
-//        // Delete old description if it was registred before
-//        for (ConnectWorkerContainer c : allConnectWorkerContainers) {
-//            if (c.getServiceGroup().equals(connectWorker.getServiceGroup())) {
-//                boolean adaptersChanged = false;
-//
-//                for (AdapterDescription a : c.getAdapters()) {
-//                    if (connectWorker.getAdapters().stream().noneMatch(ad -> ad.getElementId().equals(a.getElementId()))) {
-//                        adaptersChanged = true;
-//                    }
-//                }
-//
-//                for (ProtocolDescription p : c.getProtocols()) {
-//                    if (connectWorker.getProtocols().stream().noneMatch(pr -> pr.getAppId().equals(p.getAppId()))) {
-//                        adaptersChanged = true;
-//                    }
-//                }
-//
-//                if (!adaptersChanged) {
-//                    alreadyRegistered = true;
-//                } else {
-//                    LOG.info("Remove old connect worker: " + connectWorker.getServiceGroup());
-//                    this.connectionWorkerContainerStorage.deleteConnectWorkerContainer(c.getId());
-//                }
-//            }
-//        }
-//
-//
-//
-//        // TODO I am not sure if this is correct
-//        // IF NOT REGISTERED
-//        // Store Connect Worker in DB
-//        if (!alreadyRegistered) {
-//            this.connectionWorkerContainerStorage.storeConnectWorkerContainer(connectWorker);
-//            LOG.info("Stored new connect worker: " + connectWorker.getServiceGroup() + " in database");
-//        } else {
-//            try {
-//                this.adapterMasterManagement.startAllStreamAdapters(connectWorker);
-//            } catch (AdapterException e) {
-//                LOG.error("Could not start adapters on worker: " + connectWorker.getServiceGroup());
-//            } catch (NoServiceEndpointsAvailableException e) {
-//                LOG.error("Could not start adapter due to missing endpoint");
-//            }
-//        }
     }
-
-
-
-
 }
diff --git a/streampipes-connect-container-master/src/main/java/org/apache/streampipes/connect/container/master/management/WorkerRestClient.java b/streampipes-connect-container-master/src/main/java/org/apache/streampipes/connect/container/master/management/WorkerRestClient.java
index d8eec8e..c749d53 100644
--- a/streampipes-connect-container-master/src/main/java/org/apache/streampipes/connect/container/master/management/WorkerRestClient.java
+++ b/streampipes-connect-container-master/src/main/java/org/apache/streampipes/connect/container/master/management/WorkerRestClient.java
@@ -37,27 +37,26 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
-import java.io.UnsupportedEncodingException;
-import java.net.URLEncoder;
-import java.nio.charset.StandardCharsets;
 import java.util.List;
 
+/**
+ * This client can be used to interact with the adapter workers executing the adapter instances
+ */
 public class WorkerRestClient {
 
     private static final Logger logger = LoggerFactory.getLogger(WorkerRestClient.class);
 
-    public static void invokeStreamAdapter(String endpointUrl, String elementId) throws AdapterException {
-        invokeStreamAdapter(endpointUrl, (AdapterStreamDescription) getAndDecryptAdapter(elementId));
-    }
-
-    public static void invokeStreamAdapter(String endpointUrl, AdapterStreamDescription adapterStreamDescription) throws AdapterException {
+    public static void invokeStreamAdapter(String endpointUrl,
+                                           String elementId) throws AdapterException {
+        AdapterStreamDescription adapterStreamDescription =  (AdapterStreamDescription) getAndDecryptAdapter(elementId);
         String url = endpointUrl + WorkerPaths.getStreamInvokePath();
 
         startAdapter(url, adapterStreamDescription);
         updateStreamAdapterStatus(adapterStreamDescription.getElementId(), true);
     }
 
-    public static void stopStreamAdapter(String baseUrl, AdapterStreamDescription adapterStreamDescription) throws AdapterException {
+    public static void stopStreamAdapter(String baseUrl,
+                                         AdapterStreamDescription adapterStreamDescription) throws AdapterException {
         String url = baseUrl + WorkerPaths.getStreamStopPath();
 
         AdapterDescription ad = getAdapterDescriptionById(new AdapterInstanceStorageImpl(), adapterStreamDescription.getElementId());
@@ -79,7 +78,6 @@ public class WorkerRestClient {
     }
 
     public static List<AdapterDescription> getAllRunningAdapterInstanceDescriptions(String url) throws AdapterException {
-        // Stop execution of adapter
         try {
             logger.info("Requesting all running adapter description instances: " + url);
 
@@ -93,7 +91,7 @@ public class WorkerRestClient {
             return result;
         } catch (IOException e) {
             logger.error("List of running adapters could not be fetched", e);
-            throw new AdapterException("Adapter was not stopped successfully with url: " + url);
+            throw new AdapterException("List of running adapters could not be fetched from: " + url);
         }
     }
 
@@ -222,14 +220,6 @@ public class WorkerRestClient {
         return adapterDescription;
     }
 
-    private static String encodeValue(String value) {
-        try {
-            return URLEncoder.encode(value, StandardCharsets.UTF_8.toString());
-        } catch (UnsupportedEncodingException ex) {
-            throw new RuntimeException(ex.getCause());
-        }
-    }
-
     private static void updateStreamAdapterStatus(String adapterId,
                                            boolean running) {
         AdapterStreamDescription adapter = (AdapterStreamDescription) getAndDecryptAdapter(adapterId);
diff --git a/streampipes-connect-container-master/src/main/java/org/apache/streampipes/connect/container/master/util/WorkerPaths.java b/streampipes-connect-container-master/src/main/java/org/apache/streampipes/connect/container/master/util/WorkerPaths.java
index ffe463c..15572ee 100644
--- a/streampipes-connect-container-master/src/main/java/org/apache/streampipes/connect/container/master/util/WorkerPaths.java
+++ b/streampipes-connect-container-master/src/main/java/org/apache/streampipes/connect/container/master/util/WorkerPaths.java
@@ -37,6 +37,10 @@ public class WorkerPaths {
     return WorkerMainPath + "/set/stop";
   }
 
+  public static String getRunningAdaptersPath() {
+    return WorkerMainPath + "/running";
+  }
+
   public static String getRuntimeResolvablePath(String elementId) {
     return WorkerMainPath + "/resolvable/" + elementId + "/configurations";
   }
diff --git a/streampipes-connect-container-master/src/test/java/org/apache/streampipes/connect/container/master/health/AdapterHealthCheckTest.java b/streampipes-connect-container-master/src/test/java/org/apache/streampipes/connect/container/master/health/AdapterHealthCheckTest.java
new file mode 100644
index 0000000..77185fa
--- /dev/null
+++ b/streampipes-connect-container-master/src/test/java/org/apache/streampipes/connect/container/master/health/AdapterHealthCheckTest.java
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.apache.streampipes.connect.container.master.health;
+
+import org.apache.streampipes.model.connect.adapter.AdapterDescription;
+import org.apache.streampipes.model.connect.adapter.SpecificAdapterStreamDescription;
+import org.apache.streampipes.sdk.builder.adapter.SpecificDataStreamAdapterBuilder;
+import org.apache.streampipes.storage.couchdb.impl.AdapterInstanceStorageImpl;
+import org.junit.Test;
+
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.powermock.api.mockito.PowerMockito.mock;
+import static org.powermock.api.mockito.PowerMockito.when;
+
+public class AdapterHealthCheckTest {
+
+    private String testElementId = "testElementId";
+    private String selectedEndpointUrl = "http://test.de";
+
+    @Test
+    public void getAllRunningInstancesAdapterDescriptions() {
+        AdapterInstanceStorageImpl adapterStorage = mock(AdapterInstanceStorageImpl.class);
+        when(adapterStorage.getAllAdapters()).thenReturn(getAdapterDescriptionList());
+
+        AdapterHealthCheck adapterHealthCheck = new AdapterHealthCheck(adapterStorage, null);
+        Map<String, AdapterDescription> result = adapterHealthCheck.getAllRunningInstancesAdapterDescriptions();
+
+        assertNotNull(result);
+        assertEquals(1, result.keySet().size());
+        assertEquals(getAdapterDescriptionList().get(0), result.get(testElementId));
+    }
+
+    @Test
+    public void getAllWorkersWithAdapters() {
+        AdapterInstanceStorageImpl adapterStorage = mock(AdapterInstanceStorageImpl.class);
+        when(adapterStorage.getAllAdapters()).thenReturn(getAdapterDescriptionList());
+
+        AdapterHealthCheck adapterHealthCheck = new AdapterHealthCheck(null, null);
+        Map<String, List<AdapterDescription>> result = adapterHealthCheck.getAllWorkersWithAdapters(getAdapterDescriptionMap());
+
+        assertNotNull(result);
+        assertEquals(1, result.keySet().size());
+        assertEquals(1, result.get(selectedEndpointUrl).size());
+        assertEquals(getAdapterDescriptionList().get(0), result.get(selectedEndpointUrl).get(0));
+    }
+
+
+    private List<AdapterDescription> getAdapterDescriptionList() {
+
+        SpecificAdapterStreamDescription adapterStreamDescription = SpecificDataStreamAdapterBuilder
+                .create("testAppId", "Test Adapter", "")
+                .elementId(testElementId)
+                .build();
+        adapterStreamDescription.setSelectedEndpointUrl("http://test.de");
+
+        return Arrays.asList(adapterStreamDescription);
+    }
+
+    private Map<String, AdapterDescription> getAdapterDescriptionMap() {
+        Map<String, AdapterDescription> result = new HashMap<>();
+        result.put(testElementId, getAdapterDescriptionList().get(0));
+
+       return result;
+    }
+
+}
\ No newline at end of file
diff --git a/streampipes-connect-container-master/src/test/java/org/apache/streampipes/connect/container/master/management/DescriptionManagementTest.java b/streampipes-connect-container-master/src/test/java/org/apache/streampipes/connect/container/master/management/DescriptionManagementTest.java
index a83a585..4a470fb 100644
--- a/streampipes-connect-container-master/src/test/java/org/apache/streampipes/connect/container/master/management/DescriptionManagementTest.java
+++ b/streampipes-connect-container-master/src/test/java/org/apache/streampipes/connect/container/master/management/DescriptionManagementTest.java
@@ -57,7 +57,6 @@ public class DescriptionManagementTest {
         List<FormatDescription> result = descriptionManagement.getFormats();
 
         assertNotNull(result);
-        assertNotNull(result);
         assertEquals(1, result.size());
         assertEquals(JsonFormat.ID, result.get(0).getAppId());
     }
diff --git a/streampipes-connect-container-master/src/test/java/org/apache/streampipes/connect/container/master/management/SourcesManagementTest.java b/streampipes-connect-container-master/src/test/java/org/apache/streampipes/connect/container/master/management/SourcesManagementTest.java
index bdad3b9..00fadf9 100644
--- a/streampipes-connect-container-master/src/test/java/org/apache/streampipes/connect/container/master/management/SourcesManagementTest.java
+++ b/streampipes-connect-container-master/src/test/java/org/apache/streampipes/connect/container/master/management/SourcesManagementTest.java
@@ -115,20 +115,19 @@ public class SourcesManagementTest {
         when(adapterStorage.getAllAdapters()).thenReturn(getAdapterDescriptionList());
 
         SourcesManagement sourcesManagement = new SourcesManagement(adapterStorage);
-        sourcesManagement.setConnectHost("host");
 
         String result = sourcesManagement.getAllAdaptersInstallDescription();
         assertEquals(getJsonString(), result);
 
     }
 
+    @Ignore
     @Test(expected = AdapterException.class)
     public void getAllAdaptersInstallDescriptionFail() throws Exception {
         AdapterInstanceStorageImpl adapterStorage = mock(AdapterInstanceStorageImpl.class);
         AdapterDescription adapterDescription = new GenericAdapterSetDescription();
         when(adapterStorage.getAllAdapters()).thenReturn(Arrays.asList(adapterDescription));
         SourcesManagement sourcesManagement = new SourcesManagement(adapterStorage);
-        sourcesManagement.setConnectHost("host");
 
         sourcesManagement.getAllAdaptersInstallDescription();
 
diff --git a/streampipes-connect-container-worker/src/main/java/org/apache/streampipes/connect/container/worker/management/AdapterWorkerManagement.java b/streampipes-connect-container-worker/src/main/java/org/apache/streampipes/connect/container/worker/management/AdapterWorkerManagement.java
index 94ba63e..cf9d937 100644
--- a/streampipes-connect-container-worker/src/main/java/org/apache/streampipes/connect/container/worker/management/AdapterWorkerManagement.java
+++ b/streampipes-connect-container-worker/src/main/java/org/apache/streampipes/connect/container/worker/management/AdapterWorkerManagement.java
@@ -50,7 +50,7 @@ public class AdapterWorkerManagement {
         stopAdapter(adapterStreamDescription);
     }
 
-    public void invokeSetAdapter (AdapterSetDescription adapterSetDescription) throws AdapterException {
+    public void invokeSetAdapter(AdapterSetDescription adapterSetDescription) throws AdapterException {
 
         IAdapter<?> adapter = AdapterUtils.setAdapter(adapterSetDescription);
 
@@ -61,7 +61,7 @@ public class AdapterWorkerManagement {
         adapter.startAdapter();
     }
 
-    public void stopSetAdapter (AdapterSetDescription adapterSetDescription) throws AdapterException {
+    public void stopSetAdapter(AdapterSetDescription adapterSetDescription) throws AdapterException {
         stopAdapter(adapterSetDescription);
     }
 
diff --git a/streampipes-connect-container-worker/src/main/java/org/apache/streampipes/connect/container/worker/rest/AdapterWorkerResource.java b/streampipes-connect-container-worker/src/main/java/org/apache/streampipes/connect/container/worker/rest/AdapterWorkerResource.java
index a05e749..79c7607 100644
--- a/streampipes-connect-container-worker/src/main/java/org/apache/streampipes/connect/container/worker/rest/AdapterWorkerResource.java
+++ b/streampipes-connect-container-worker/src/main/java/org/apache/streampipes/connect/container/worker/rest/AdapterWorkerResource.java
@@ -47,7 +47,6 @@ public class AdapterWorkerResource extends AbstractSharedRestInterface {
         this.adapterManagement = adapterManagement;
     }
 
-    // get all running instances
     @GET
     @JacksonSerialized
     @Path("/running")
diff --git a/streampipes-connect-container-worker/src/main/java/org/apache/streampipes/connect/container/worker/utils/AdapterUtils.java b/streampipes-connect-container-worker/src/main/java/org/apache/streampipes/connect/container/worker/utils/AdapterUtils.java
index 72df3df..026b8a7 100644
--- a/streampipes-connect-container-worker/src/main/java/org/apache/streampipes/connect/container/worker/utils/AdapterUtils.java
+++ b/streampipes-connect-container-worker/src/main/java/org/apache/streampipes/connect/container/worker/utils/AdapterUtils.java
@@ -18,7 +18,6 @@
 
 package org.apache.streampipes.connect.container.worker.utils;
 
-import org.apache.http.client.fluent.Request;
 import org.apache.streampipes.connect.adapter.model.generic.GenericAdapter;
 import org.apache.streampipes.connect.adapter.model.generic.GenericDataSetAdapter;
 import org.apache.streampipes.connect.adapter.model.generic.GenericDataStreamAdapter;
@@ -31,34 +30,9 @@ import org.apache.streampipes.model.connect.adapter.GenericAdapterStreamDescript
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.io.IOException;
-
 public class AdapterUtils {
     private static final Logger logger = LoggerFactory.getLogger(AdapterUtils .class);
 
-    public static String stopPipeline(String url) {
-        logger.info("Send stopAdapter pipeline request on URL: " + url);
-
-        String result = "";
-        try {
-            result = Request.Get(url)
-                    .connectTimeout(1000)
-                    .socketTimeout(100000)
-                    .execute().returnContent().asString();
-        } catch (IOException e) {
-            e.printStackTrace();
-            result = e.getMessage();
-        }
-
-        logger.info("Successfully stopped pipeline");
-
-        return result;
-    }
-
-    public static String getUrl(String baseUrl, String pipelineId) {
-        return "http://" +baseUrl + "api/v2/pipelines/" + pipelineId + "/stopAdapter";
-    }
-
     public static IAdapter setAdapter(AdapterDescription adapterDescription) {
         IAdapter adapter = null;
 
diff --git a/streampipes-connect-container-worker/src/test/java/org/apache/streampipes/connect/container/worker/utils/AdapterUtilsTest.java b/streampipes-connect-container-worker/src/test/java/org/apache/streampipes/connect/container/worker/utils/AdapterUtilsTest.java
deleted file mode 100644
index 9f2c88e..0000000
--- a/streampipes-connect-container-worker/src/test/java/org/apache/streampipes/connect/container/worker/utils/AdapterUtilsTest.java
+++ /dev/null
@@ -1,52 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-
-package org.apache.streampipes.connect.container.worker.utils;
-
-
-import com.github.tomakehurst.wiremock.client.WireMock;
-import com.github.tomakehurst.wiremock.junit.WireMockRule;
-import org.apache.streampipes.connect.container.worker.management.Mock;
-import org.junit.Rule;
-import org.junit.Test;
-
-import static org.junit.Assert.assertEquals;
-
-public class AdapterUtilsTest {
-    @Rule
-    public WireMockRule wireMockRule = new WireMockRule(Mock.PORT);
-
-
-    @Test
-    public void stopPipeline() {
-        String expected = "";
-
-        WireMock.stubFor(WireMock.get(WireMock.urlEqualTo("/"))
-                .willReturn(WireMock.aResponse()
-                        .withStatus(200)
-                        .withBody(expected)));
-
-
-        String result = AdapterUtils.stopPipeline(Mock.HOST + "/");
-
-        assertEquals(expected, result);
-
-        WireMock.verify(WireMock.getRequestedFor(WireMock.urlMatching("/")));
-    }
-
-}
diff --git a/streampipes-rest/src/main/java/org/apache/streampipes/rest/impl/connect/AdapterResource.java b/streampipes-rest/src/main/java/org/apache/streampipes/rest/impl/connect/AdapterResource.java
index 81d8383..69ad2eb 100644
--- a/streampipes-rest/src/main/java/org/apache/streampipes/rest/impl/connect/AdapterResource.java
+++ b/streampipes-rest/src/main/java/org/apache/streampipes/rest/impl/connect/AdapterResource.java
@@ -18,7 +18,6 @@
 
 package org.apache.streampipes.rest.impl.connect;
 
-import org.apache.streampipes.commons.exceptions.NoServiceEndpointsAvailableException;
 import org.apache.streampipes.connect.api.exception.AdapterException;
 import org.apache.streampipes.connect.container.master.management.AdapterMasterManagement;
 import org.apache.streampipes.model.connect.adapter.AdapterDescription;
@@ -80,7 +79,7 @@ public class AdapterResource extends AbstractAdapterResource<AdapterMasterManage
     @JacksonSerialized
     @Path("/{id}/stop")
     @Produces(MediaType.APPLICATION_JSON)
-    public Response stopAdapter(@PathParam("id") String adapterId) throws NoServiceEndpointsAvailableException {
+    public Response stopAdapter(@PathParam("id") String adapterId) {
         try {
             managementService.stopStreamAdapter(adapterId);
             return ok(Notifications.success("Adapter started"));
@@ -124,7 +123,6 @@ public class AdapterResource extends AbstractAdapterResource<AdapterMasterManage
     @Produces(MediaType.APPLICATION_JSON)
     public Response getAllAdapters() {
         try {
-            // TODO get all invoced adapters
             List<AdapterDescription> result = managementService.getAllAdapterInstances();
 
             return ok(result);
diff --git a/streampipes-rest/src/main/java/org/apache/streampipes/rest/impl/connect/GuessResource.java b/streampipes-rest/src/main/java/org/apache/streampipes/rest/impl/connect/GuessResource.java
index 3d4bdd0..474c119 100644
--- a/streampipes-rest/src/main/java/org/apache/streampipes/rest/impl/connect/GuessResource.java
+++ b/streampipes-rest/src/main/java/org/apache/streampipes/rest/impl/connect/GuessResource.java
@@ -30,7 +30,6 @@ import org.slf4j.LoggerFactory;
 
 import javax.ws.rs.POST;
 import javax.ws.rs.Path;
-import javax.ws.rs.PathParam;
 import javax.ws.rs.Produces;
 import javax.ws.rs.core.MediaType;
 import javax.ws.rs.core.Response;
@@ -49,7 +48,7 @@ public class GuessResource extends AbstractAdapterResource<GuessManagement> {
   @JacksonSerialized
   @Path("/schema")
   @Produces(MediaType.APPLICATION_JSON)
-  public Response guessSchema(AdapterDescription adapterDescription, @PathParam("username") String userName) {
+  public Response guessSchema(AdapterDescription adapterDescription) {
 
       try {
           GuessSchema result = managementService.guessSchema(adapterDescription);
diff --git a/streampipes-rest/src/main/java/org/apache/streampipes/rest/impl/connect/RuntimeResolvableResource.java b/streampipes-rest/src/main/java/org/apache/streampipes/rest/impl/connect/RuntimeResolvableResource.java
index b8a6ab1..1db6196 100644
--- a/streampipes-rest/src/main/java/org/apache/streampipes/rest/impl/connect/RuntimeResolvableResource.java
+++ b/streampipes-rest/src/main/java/org/apache/streampipes/rest/impl/connect/RuntimeResolvableResource.java
@@ -34,8 +34,7 @@ import javax.ws.rs.core.Response;
 @Path("/v2/connect/master/resolvable")
 public class RuntimeResolvableResource extends AbstractAdapterResource<WorkerAdministrationManagement> {
 
-    private static final String SP_NS =  "https://streampipes.org/vocabulary/v1/";
-    private WorkerUrlProvider workerUrlProvider;
+    private final WorkerUrlProvider workerUrlProvider;
 
     public RuntimeResolvableResource() {
         super(WorkerAdministrationManagement::new);
@@ -48,7 +47,6 @@ public class RuntimeResolvableResource extends AbstractAdapterResource<WorkerAdm
     @Produces(MediaType.APPLICATION_JSON)
     @Consumes(MediaType.APPLICATION_JSON)
     public Response fetchConfigurations(@PathParam("id") String appId,
-                                        @PathParam("username") String username,
                                         RuntimeOptionsRequest runtimeOptionsRequest) {
 
         // TODO add solution for formats
diff --git a/streampipes-rest/src/main/java/org/apache/streampipes/rest/impl/connect/SourcesResource.java b/streampipes-rest/src/main/java/org/apache/streampipes/rest/impl/connect/SourcesResource.java
index 8b6d4cd..d5fb288 100644
--- a/streampipes-rest/src/main/java/org/apache/streampipes/rest/impl/connect/SourcesResource.java
+++ b/streampipes-rest/src/main/java/org/apache/streampipes/rest/impl/connect/SourcesResource.java
@@ -94,9 +94,6 @@ public class SourcesResource extends AbstractAdapterResource<SourcesManagement>
                            @PathParam("runningInstanceId") String runningInstanceId) {
         String responseMessage = "Instance of set id: " + elementId  + " with instance id: "+ runningInstanceId + " successfully started";
 
-//        String workerUrl = new Utils().getWorkerUrlById(elementId);
-//        String newUrl = Utils.addUserNameToApi(workerUrl, username);
-
         try {
             managementService.detachAdapter(elementId, runningInstanceId);
         } catch (AdapterException | NoServiceEndpointsAvailableException e) {
diff --git a/streampipes-rest/src/main/java/org/apache/streampipes/rest/impl/connect/WorkerAdministrationResource.java b/streampipes-rest/src/main/java/org/apache/streampipes/rest/impl/connect/WorkerAdministrationResource.java
index 27c7a94..d332996 100644
--- a/streampipes-rest/src/main/java/org/apache/streampipes/rest/impl/connect/WorkerAdministrationResource.java
+++ b/streampipes-rest/src/main/java/org/apache/streampipes/rest/impl/connect/WorkerAdministrationResource.java
@@ -48,11 +48,8 @@ public class WorkerAdministrationResource extends AbstractSharedRestInterface {
     @JacksonSerialized
     @Produces(MediaType.APPLICATION_JSON)
     public Response addWorkerContainer(List<AdapterDescription> availableAdapterDescription) {
-        // Change this to List<AdapterDescription>
-        // How do I store the available AdapterDescriptions when there is no ConnectWorkerContainer
-//        LOG.info("Worker container: " + connectWorkerContainer.getServiceGroup() + " was detected");
+
         this.workerAdministrationManagement.register(availableAdapterDescription);
-        System.out.println(availableAdapterDescription);
 
         return ok(Notifications.success("Worker Container successfully added"));
     }
diff --git a/streampipes-sdk/src/main/java/org/apache/streampipes/sdk/builder/adapter/AdapterDescriptionBuilder.java b/streampipes-sdk/src/main/java/org/apache/streampipes/sdk/builder/adapter/AdapterDescriptionBuilder.java
index 7df2e40..db18910 100644
--- a/streampipes-sdk/src/main/java/org/apache/streampipes/sdk/builder/adapter/AdapterDescriptionBuilder.java
+++ b/streampipes-sdk/src/main/java/org/apache/streampipes/sdk/builder/adapter/AdapterDescriptionBuilder.java
@@ -30,13 +30,11 @@ public abstract class AdapterDescriptionBuilder<BU extends
 
   protected AdapterDescriptionBuilder(String id, T element) {
     super(id, element);
-//    this.elementDescription.setAdapterId(id);
   }
 
   protected AdapterDescriptionBuilder(String id, String label, String description,
                                    T adapterTypeInstance) {
     super(id, label, description, adapterTypeInstance);
-//    this.elementDescription.setAdapterId(id);
   }
 
   public AdapterDescriptionBuilder<BU, T> category(AdapterType... categories) {
@@ -47,4 +45,11 @@ public abstract class AdapterDescriptionBuilder<BU extends
                     .collect(Collectors.toList()));
     return me();
   }
+
+  public AdapterDescriptionBuilder<BU, T> elementId(String elementId) {
+    this.elementDescription.setElementId(elementId);
+    return me();
+  }
+
+
 }

[incubator-streampipes] 03/04: [STREAMPIPES-438] Fix data set adapters

Posted by ze...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

zehnder pushed a commit to branch STREAMPIPES-438
in repository https://gitbox.apache.org/repos/asf/incubator-streampipes.git

commit 5897ade5217c449991659c93547b0533a068eb6a
Author: Philipp Zehnder <ze...@fzi.de>
AuthorDate: Mon Oct 11 14:45:21 2021 +0200

    [STREAMPIPES-438] Fix data set adapters
---
 .../master/management/AdapterMasterManagement.java | 46 ++++++-----
 .../master/management/SourcesManagement.java       | 90 +++++++---------------
 .../master/management/WorkerRestClient.java        |  4 +-
 .../connect/container/master/util/WorkerPaths.java | 15 +++-
 .../master/management/SourcesManagementTest.java   | 29 +------
 .../worker/rest/AdapterWorkerResource.java         |  8 +-
 .../manager/setup/CouchDbInstallationStep.java     |  3 +-
 .../rest/impl/connect/SourcesResource.java         | 33 +-------
 .../tests/pipelineElement/AllPipelineElements.ts   |  1 -
 9 files changed, 75 insertions(+), 154 deletions(-)

diff --git a/streampipes-connect-container-master/src/main/java/org/apache/streampipes/connect/container/master/management/AdapterMasterManagement.java b/streampipes-connect-container-master/src/main/java/org/apache/streampipes/connect/container/master/management/AdapterMasterManagement.java
index 5ada040..a6d28f6 100644
--- a/streampipes-connect-container-master/src/main/java/org/apache/streampipes/connect/container/master/management/AdapterMasterManagement.java
+++ b/streampipes-connect-container-master/src/main/java/org/apache/streampipes/connect/container/master/management/AdapterMasterManagement.java
@@ -23,7 +23,7 @@ import org.apache.streampipes.commons.exceptions.SepaParseException;
 import org.apache.streampipes.connect.adapter.GroundingService;
 import org.apache.streampipes.connect.api.exception.AdapterException;
 import org.apache.streampipes.connect.container.master.util.AdapterEncryptionService;
-import org.apache.streampipes.manager.execution.endpoint.ExtensionsServiceEndpointGenerator;
+import org.apache.streampipes.connect.container.master.util.WorkerPaths;
 import org.apache.streampipes.manager.storage.UserService;
 import org.apache.streampipes.manager.verification.DataStreamVerifier;
 import org.apache.streampipes.model.SpDataStream;
@@ -36,14 +36,13 @@ import org.apache.streampipes.storage.api.IAdapterStorage;
 import org.apache.streampipes.storage.api.IPipelineElementDescriptionStorageCache;
 import org.apache.streampipes.storage.couchdb.impl.AdapterInstanceStorageImpl;
 import org.apache.streampipes.storage.management.StorageManager;
-import org.apache.streampipes.svcdiscovery.api.model.SpServiceUrlProvider;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.net.URI;
 import java.net.URISyntaxException;
 import java.util.List;
 import java.util.UUID;
+import java.util.stream.Collectors;
 
 import static org.apache.streampipes.manager.storage.UserManagementService.getUserService;
 
@@ -92,7 +91,7 @@ public class AdapterMasterManagement {
     }
 
     // Create stream
-    SpDataStream storedDescription = new SourcesManagement().getAdapterDataStream(ad.getElementId());
+    SpDataStream storedDescription = new SourcesManagement().createAdapterDataStream(ad);
     storedDescription.setCorrespondingAdapterId(elementId);
     installDataSource(storedDescription, username);
     LOG.info("Install source (source URL: {} in backend", ad.getElementId());
@@ -116,37 +115,43 @@ public class AdapterMasterManagement {
   }
 
   /**
-   * First the adapter is stopped removed, then the according data source is deleted
+   * First the adapter is stopped removed, then the corresponding data source is deleted
    * @param elementId
    * @throws AdapterException
    */
   public void deleteAdapter(String elementId) throws AdapterException {
-    // IF Stream adapter delete it
-    boolean isStreamAdapter = isStreamAdapter(elementId);
 
-    if (isStreamAdapter) {
+    // Stop stream adapter
+    if (isStreamAdapter(elementId)) {
       try {
         stopStreamAdapter(elementId);
       } catch (AdapterException e) {
-        LOG.info("Could not stop adapter: " + elementId);
-        LOG.info(e.toString());
+        LOG.info("Could not stop adapter: " + elementId, e);
       }
     }
 
-
+    // Delete adapter
     AdapterDescription ad = adapterInstanceStorage.getAdapter(elementId);
-    String username = ad.getUserName();
     adapterInstanceStorage.deleteAdapter(elementId);
     LOG.info("Successfully deleted adapter: " + elementId);
 
+    // Delete data stream
     UserService userService = getUserService();
     IPipelineElementDescriptionStorageCache requestor = StorageManager.INSTANCE.getPipelineElementStorage();
-
-    if (requestor.getDataStreamById(ad.getElementId()) != null) {
-      requestor.deleteDataStream(requestor.getDataStreamById(ad.getElementId()));
-      userService.deleteOwnSource(username, ad.getElementId());
+    List<SpDataStream> streamsToDelete = requestor
+            .getAllDataStreams()
+            .stream()
+            .filter(spDataStream -> spDataStream.getCorrespondingAdapterId().equals(elementId))
+            .collect(Collectors.toList());
+    String username = ad.getUserName();
+    if (streamsToDelete.size() > 0) {
+      SpDataStream streamToDelete = streamsToDelete.get(0);
+      requestor.deleteDataStream(streamToDelete);
+      userService.deleteOwnSource(username, streamToDelete.getElementId());
       requestor.refreshDataSourceCache();
+      LOG.info("Successfully deleted data stream: " + streamToDelete.getElementId());
     }
+
   }
 
   public List<AdapterDescription> getAllAdapterInstances() throws AdapterException {
@@ -200,7 +205,7 @@ public class AdapterMasterManagement {
 
       try {
         // Find endpoint to start adapter on
-        String baseUrl = findEndpointUrl(ad.getAppId());
+        String baseUrl = WorkerPaths.findEndpointUrl(ad.getAppId());
 
         // Update selected endpoint URL of adapter
         ad.setSelectedEndpointUrl(baseUrl);
@@ -242,11 +247,4 @@ public class AdapterMasterManagement {
     return new AdapterInstanceStorageImpl();
   }
 
-  private String findEndpointUrl(String appId) throws NoServiceEndpointsAvailableException, URISyntaxException {
-    SpServiceUrlProvider serviceUrlProvider = SpServiceUrlProvider.ADAPTER;
-    String endpointUrl = new ExtensionsServiceEndpointGenerator(appId, serviceUrlProvider).getEndpointResourceUrl();
-    URI uri = new URI(endpointUrl);
-    String baseUrl = uri.getScheme() + "://" + uri.getAuthority();
-    return baseUrl;
-  }
 }
diff --git a/streampipes-connect-container-master/src/main/java/org/apache/streampipes/connect/container/master/management/SourcesManagement.java b/streampipes-connect-container-master/src/main/java/org/apache/streampipes/connect/container/master/management/SourcesManagement.java
index ee54fdd..8e3e284 100644
--- a/streampipes-connect-container-master/src/main/java/org/apache/streampipes/connect/container/master/management/SourcesManagement.java
+++ b/streampipes-connect-container-master/src/main/java/org/apache/streampipes/connect/container/master/management/SourcesManagement.java
@@ -22,9 +22,7 @@ import org.apache.streampipes.commons.exceptions.NoServiceEndpointsAvailableExce
 import org.apache.streampipes.connect.adapter.util.TransportFormatGenerator;
 import org.apache.streampipes.connect.api.exception.AdapterException;
 import org.apache.streampipes.connect.container.master.util.AdapterEncryptionService;
-import org.apache.streampipes.container.html.JSONGenerator;
-import org.apache.streampipes.container.html.model.DataSourceDescriptionHtml;
-import org.apache.streampipes.container.html.model.Description;
+import org.apache.streampipes.connect.container.master.util.WorkerPaths;
 import org.apache.streampipes.model.SpDataSet;
 import org.apache.streampipes.model.SpDataStream;
 import org.apache.streampipes.model.connect.adapter.AdapterDescription;
@@ -37,9 +35,7 @@ import org.apache.streampipes.storage.couchdb.impl.AdapterInstanceStorageImpl;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.net.URI;
 import java.net.URISyntaxException;
-import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.List;
 
@@ -47,11 +43,11 @@ public class SourcesManagement {
 
     private Logger logger = LoggerFactory.getLogger(SourcesManagement.class);
 
-    private AdapterInstanceStorageImpl adapterStorage;
+    private AdapterInstanceStorageImpl adapterInstanceStorage;
     private WorkerUrlProvider workerUrlProvider;
 
     public SourcesManagement(AdapterInstanceStorageImpl adapterStorage) {
-      this.adapterStorage = adapterStorage;
+      this.adapterInstanceStorage = adapterStorage;
       this.workerUrlProvider = new WorkerUrlProvider();
     }
 
@@ -59,23 +55,26 @@ public class SourcesManagement {
        this(new AdapterInstanceStorageImpl());
     }
 
-    public void addAdapter(String streamId,
-                           SpDataSet dataSet) throws AdapterException, NoServiceEndpointsAvailableException {
+    public void addSetAdapter(SpDataSet dataSet) throws AdapterException, NoServiceEndpointsAvailableException {
 
+        AdapterSetDescription ad = (AdapterSetDescription) getAndDecryptAdapter(dataSet.getCorrespondingAdapterId());
+        ad.setDataSet(dataSet);
+        ad.setElementId(ad.getElementId() + "/streams/" + dataSet.getDatasetInvocationId());
 
-        String newUrl = getAdapterUrl(streamId);
-        AdapterSetDescription adapterDescription = (AdapterSetDescription) getAdapterDescriptionById(streamId);
-        adapterDescription.setDataSet(dataSet);
-
-        String newId = adapterDescription.getElementId() + "/streams/" + dataSet.getDatasetInvocationId();
-        adapterDescription.setElementId(newId);
-
-        AdapterSetDescription decryptedAdapterDescription =
-                (AdapterSetDescription) new Cloner().adapterDescription(adapterDescription);
-
-        WorkerRestClient.invokeSetAdapter(newUrl, decryptedAdapterDescription);
+        try {
+            String baseUrl = WorkerPaths.findEndpointUrl(ad.getAppId());
+            WorkerRestClient.invokeSetAdapter(baseUrl, ad);
+        } catch (URISyntaxException e) {
+            e.printStackTrace();
+        }
     }
 
+    /**
+     * @param streamId
+     * @param runningInstanceId
+     * @throws AdapterException
+     * @throws NoServiceEndpointsAvailableException
+     */
     public void detachAdapter(String streamId,
                               String runningInstanceId) throws AdapterException, NoServiceEndpointsAvailableException {
         AdapterSetDescription adapterDescription = (AdapterSetDescription) getAdapterDescriptionById(streamId);
@@ -89,7 +88,7 @@ public class SourcesManagement {
 
     private String getAdapterUrl(String streamId) throws NoServiceEndpointsAvailableException {
         String appId = "";
-        List<AdapterDescription> adapterDescriptions = this.adapterStorage.getAllAdapters();
+        List<AdapterDescription> adapterDescriptions = this.adapterInstanceStorage.getAllAdapters();
         for (AdapterDescription ad : adapterDescriptions) {
             if (ad.getElementId().contains(streamId)) {
                 appId = ad.getAppId();
@@ -100,43 +99,9 @@ public class SourcesManagement {
 
     }
 
-    public String getAllAdaptersInstallDescription() throws AdapterException {
-
-        List<AdapterDescription> allAdapters = adapterStorage.getAllAdapters();
-        List<Description> allAdapterDescriptions = new ArrayList<>();
-
-        for (AdapterDescription ad : allAdapters) {
-            URI uri;
-            String uriString = null;
-            try {
-                uriString = ad.getElementId();
-                uri = new URI(uriString);
-            } catch (URISyntaxException e) {
-                logger.error("URI for the sources endpoint is not correct: " + uriString, e);
-                throw new AdapterException("Incorrect source URI: " +uriString);
-            }
-
-
-            List<Description> streams = new ArrayList<>();
-            Description d = new Description(ad.getName(), "", uri.toString());
-            d.setType("set");
-            streams.add(d);
-            DataSourceDescriptionHtml dsd = new DataSourceDescriptionHtml("Adapter Stream",
-                    "This stream is generated by an StreamPipes Connect adapter. ID of adapter: " + ad.getElementId(), uri.toString(), streams);
-            dsd.setType("source");
-            dsd.setAppId(ad.getAppId());
-            dsd.setEditable(!(ad.isInternallyManaged()));
-            allAdapterDescriptions.add(dsd);
-        }
-
-        JSONGenerator json = new JSONGenerator(allAdapterDescriptions);
-
-        return json.buildJson();
-    }
-
     private AdapterDescription getAdapterDescriptionById(String id) {
         AdapterDescription adapterDescription = null;
-        List<AdapterDescription> allAdapters = adapterStorage.getAllAdapters();
+        List<AdapterDescription> allAdapters = adapterInstanceStorage.getAllAdapters();
         for (AdapterDescription a : allAdapters) {
             if (a.getElementId().equals(id)) {
                 adapterDescription = a;
@@ -149,10 +114,7 @@ public class SourcesManagement {
         return decryptedAdapterDescription;
     }
 
-    public SpDataStream getAdapterDataStream(String id) throws AdapterException {
-
-        // get all Adapters and check id
-        AdapterDescription adapterDescription = getAdapterDescriptionById(id);
+    public SpDataStream createAdapterDataStream(AdapterDescription adapterDescription) {
 
         SpDataStream ds;
         if (adapterDescription instanceof AdapterSetDescription) {
@@ -169,9 +131,15 @@ public class SourcesManagement {
 
         ds.setName(adapterDescription.getName());
         ds.setDescription(adapterDescription.getDescription());
-        ds.setCorrespondingAdapterId(adapterDescription.getAppId());
+        ds.setCorrespondingAdapterId(adapterDescription.getElementId());
         ds.setInternallyManaged(true);
 
         return ds;
     }
+
+    private AdapterDescription getAndDecryptAdapter(String adapterId) {
+        AdapterDescription adapter = this.adapterInstanceStorage.getAdapter(adapterId);
+        return new AdapterEncryptionService(new Cloner().adapterDescription(adapter)).decrypt();
+    }
+
 }
diff --git a/streampipes-connect-container-master/src/main/java/org/apache/streampipes/connect/container/master/management/WorkerRestClient.java b/streampipes-connect-container-master/src/main/java/org/apache/streampipes/connect/container/master/management/WorkerRestClient.java
index 0026b60..2240797 100644
--- a/streampipes-connect-container-master/src/main/java/org/apache/streampipes/connect/container/master/management/WorkerRestClient.java
+++ b/streampipes-connect-container-master/src/main/java/org/apache/streampipes/connect/container/master/management/WorkerRestClient.java
@@ -65,9 +65,9 @@ public class WorkerRestClient {
         updateStreamAdapterStatus(adapterStreamDescription.getElementId(), false);
     }
 
-    public static void invokeSetAdapter(String baseUrl,
+    public static void invokeSetAdapter(String endpointUrl,
                                         AdapterSetDescription adapterSetDescription) throws AdapterException {
-        String url = baseUrl + WorkerPaths.getSetInvokePath();
+        String url = endpointUrl + WorkerPaths.getSetInvokePath();
 
         startAdapter(url, adapterSetDescription);
     }
diff --git a/streampipes-connect-container-master/src/main/java/org/apache/streampipes/connect/container/master/util/WorkerPaths.java b/streampipes-connect-container-master/src/main/java/org/apache/streampipes/connect/container/master/util/WorkerPaths.java
index 15572ee..f3af79b 100644
--- a/streampipes-connect-container-master/src/main/java/org/apache/streampipes/connect/container/master/util/WorkerPaths.java
+++ b/streampipes-connect-container-master/src/main/java/org/apache/streampipes/connect/container/master/util/WorkerPaths.java
@@ -17,6 +17,13 @@
  */
 package org.apache.streampipes.connect.container.master.util;
 
+import org.apache.streampipes.commons.exceptions.NoServiceEndpointsAvailableException;
+import org.apache.streampipes.manager.execution.endpoint.ExtensionsServiceEndpointGenerator;
+import org.apache.streampipes.svcdiscovery.api.model.SpServiceUrlProvider;
+
+import java.net.URI;
+import java.net.URISyntaxException;
+
 public class WorkerPaths {
 
   private static final String WorkerMainPath = "/api/v1/worker";
@@ -49,7 +56,13 @@ public class WorkerPaths {
     return WorkerMainPath + "/guess/schema";
   }
 
-
+  public static String findEndpointUrl(String appId) throws NoServiceEndpointsAvailableException, URISyntaxException {
+    SpServiceUrlProvider serviceUrlProvider = SpServiceUrlProvider.ADAPTER;
+    String endpointUrl = new ExtensionsServiceEndpointGenerator(appId, serviceUrlProvider).getEndpointResourceUrl();
+    URI uri = new URI(endpointUrl);
+    String baseUrl = uri.getScheme() + "://" + uri.getAuthority();
+    return baseUrl;
+  }
 
 
 }
diff --git a/streampipes-connect-container-master/src/test/java/org/apache/streampipes/connect/container/master/management/SourcesManagementTest.java b/streampipes-connect-container-master/src/test/java/org/apache/streampipes/connect/container/master/management/SourcesManagementTest.java
index 00fadf9..fb9db01 100644
--- a/streampipes-connect-container-master/src/test/java/org/apache/streampipes/connect/container/master/management/SourcesManagementTest.java
+++ b/streampipes-connect-container-master/src/test/java/org/apache/streampipes/connect/container/master/management/SourcesManagementTest.java
@@ -35,7 +35,6 @@ import org.powermock.modules.junit4.PowerMockRunner;
 import java.util.Arrays;
 import java.util.List;
 
-import static org.junit.Assert.assertEquals;
 import static org.mockito.ArgumentMatchers.any;
 import static org.mockito.ArgumentMatchers.anyString;
 import static org.mockito.Mockito.*;
@@ -61,7 +60,7 @@ public class SourcesManagementTest {
         SourcesManagement sourcesManagement = new SourcesManagement(adapterStorage);
         doNothing().when(WorkerRestClient.class, "invokeSetAdapter", anyString(), any());
 
-        sourcesManagement.addAdapter(ID, new SpDataSet());
+        sourcesManagement.addSetAdapter(new SpDataSet());
 
         verify(adapterStorage, times(1)).getAllAdapters();
         verifyStatic(WorkerRestClient.class, times(1));
@@ -107,32 +106,6 @@ public class SourcesManagementTest {
         sourcesManagement.detachAdapter( ID, "id1");
     }
 
-    @Ignore
-    @Test
-    public void getAllAdaptersInstallDescriptionSuccess() throws Exception {
-
-        AdapterInstanceStorageImpl adapterStorage = mock(AdapterInstanceStorageImpl.class);
-        when(adapterStorage.getAllAdapters()).thenReturn(getAdapterDescriptionList());
-
-        SourcesManagement sourcesManagement = new SourcesManagement(adapterStorage);
-
-        String result = sourcesManagement.getAllAdaptersInstallDescription();
-        assertEquals(getJsonString(), result);
-
-    }
-
-    @Ignore
-    @Test(expected = AdapterException.class)
-    public void getAllAdaptersInstallDescriptionFail() throws Exception {
-        AdapterInstanceStorageImpl adapterStorage = mock(AdapterInstanceStorageImpl.class);
-        AdapterDescription adapterDescription = new GenericAdapterSetDescription();
-        when(adapterStorage.getAllAdapters()).thenReturn(Arrays.asList(adapterDescription));
-        SourcesManagement sourcesManagement = new SourcesManagement(adapterStorage);
-
-        sourcesManagement.getAllAdaptersInstallDescription();
-
-    }
-
     private List<AdapterDescription> getAdapterDescriptionList() {
         GenericAdapterSetDescription adapterSetDescription = new GenericAdapterSetDescription();
 
diff --git a/streampipes-connect-container-worker/src/main/java/org/apache/streampipes/connect/container/worker/rest/AdapterWorkerResource.java b/streampipes-connect-container-worker/src/main/java/org/apache/streampipes/connect/container/worker/rest/AdapterWorkerResource.java
index 79c7607..e423590 100644
--- a/streampipes-connect-container-worker/src/main/java/org/apache/streampipes/connect/container/worker/rest/AdapterWorkerResource.java
+++ b/streampipes-connect-container-worker/src/main/java/org/apache/streampipes/connect/container/worker/rest/AdapterWorkerResource.java
@@ -105,11 +105,11 @@ public class AdapterWorkerResource extends AbstractSharedRestInterface {
         try {
             adapterManagement.invokeSetAdapter(adapterSetDescription);
         } catch (AdapterException e) {
-            logger.error("Error while starting adapter with id " + adapterSetDescription.getUri(), e);
+            logger.error("Error while starting adapter with id " + adapterSetDescription.getElementId(), e);
             return ok(Notifications.error(e.getMessage()));
         }
 
-        String responseMessage = "Set adapter with id " + adapterSetDescription.getUri() + " successfully started";
+        String responseMessage = "Set adapter with id " + adapterSetDescription.getElementId() + " successfully started";
 
         logger.info(responseMessage);
         return ok(Notifications.success(responseMessage));
@@ -125,11 +125,11 @@ public class AdapterWorkerResource extends AbstractSharedRestInterface {
         try {
              adapterManagement.stopSetAdapter(adapterSetDescription);
         } catch (AdapterException e) {
-            logger.error("Error while stopping adapter with id " + adapterSetDescription.getUri(), e);
+            logger.error("Error while stopping adapter with id " + adapterSetDescription.getElementId(), e);
             return ok(Notifications.error(e.getMessage()));
         }
 
-        String responseMessage = "Set adapter with id " + adapterSetDescription.getUri() + " successfully stopped";
+        String responseMessage = "Set adapter with id " + adapterSetDescription.getElementId() + " successfully stopped";
 
         logger.info(responseMessage);
         return ok(Notifications.success(responseMessage));
diff --git a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/setup/CouchDbInstallationStep.java b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/setup/CouchDbInstallationStep.java
index e481c14..444d54f 100644
--- a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/setup/CouchDbInstallationStep.java
+++ b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/setup/CouchDbInstallationStep.java
@@ -31,8 +31,7 @@ import java.util.*;
 
 public class CouchDbInstallationStep implements InstallationStep {
 
-    private static List<String> initRdfEndpointPorts =
-            Collections.singletonList("8099/api/v1/master/sources/");
+    private static List<String> initRdfEndpointPorts = new ArrayList<>();
     private static final String initRdfEndpointHost = "http://localhost:";
 
     private static final String PREPARING_NOTIFICATIONS_TEXT = "Preparing database " +
diff --git a/streampipes-rest/src/main/java/org/apache/streampipes/rest/impl/connect/SourcesResource.java b/streampipes-rest/src/main/java/org/apache/streampipes/rest/impl/connect/SourcesResource.java
index d5fb288..b792711 100644
--- a/streampipes-rest/src/main/java/org/apache/streampipes/rest/impl/connect/SourcesResource.java
+++ b/streampipes-rest/src/main/java/org/apache/streampipes/rest/impl/connect/SourcesResource.java
@@ -23,8 +23,6 @@ import org.apache.streampipes.connect.api.exception.AdapterException;
 import org.apache.streampipes.connect.container.master.management.SourcesManagement;
 import org.apache.streampipes.model.SpDataSet;
 import org.apache.streampipes.model.message.Notifications;
-import org.apache.streampipes.rest.shared.annotation.GsonWithIds;
-import org.apache.streampipes.rest.shared.annotation.JacksonSerialized;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -41,44 +39,17 @@ public class SourcesResource extends AbstractAdapterResource<SourcesManagement>
         super(SourcesManagement::new);
     }
 
-    @GET
-    @Produces(MediaType.APPLICATION_JSON)
-    @GsonWithIds
-    public Response getAllAdaptersInstallDescription() {
-
-        try {
-            String resultingJson = managementService.getAllAdaptersInstallDescription();
-            return ok(resultingJson);
-        } catch (AdapterException e) {
-            LOG.error("Error while getting all adapter descriptions", e);
-            return fail();
-        }
-    }
-
-    @GET
-    @Path("/{id}")
-    @JacksonSerialized
-    @Produces(MediaType.APPLICATION_JSON)
-    public Response getAdapterDataSource(@PathParam("id") String id) {
-        try {
-            return ok(managementService.getAdapterDataStream(id));
-        } catch (AdapterException e) {
-            LOG.error("Error while retrieving DataSourceDescription with id: " + id);
-            return fail();
-        }
-    }
-
     @POST
     @Consumes(MediaType.APPLICATION_JSON)
     @Path("/{streamId}")
     @Produces(MediaType.APPLICATION_JSON)
-    public Response addAdapter(@PathParam("streamId") String elementId,
+    public Response addSetAdapter(@PathParam("streamId") String streamId,
                                SpDataSet dataSet) {
 
         String responseMessage = "Instance of data set " + dataSet.getElementId() + " successfully started";
 
         try {
-            managementService.addAdapter(elementId,  dataSet);
+            managementService.addSetAdapter(dataSet);
         } catch (AdapterException | NoServiceEndpointsAvailableException e) {
             LOG.error("Could not set data set instance: " + dataSet.getElementId(), e);
             return ok(Notifications.error("Could not set data set instance: " + dataSet.getElementId()));
diff --git a/ui/cypress/tests/pipelineElement/AllPipelineElements.ts b/ui/cypress/tests/pipelineElement/AllPipelineElements.ts
index b3fdac4..70a2c05 100644
--- a/ui/cypress/tests/pipelineElement/AllPipelineElements.ts
+++ b/ui/cypress/tests/pipelineElement/AllPipelineElements.ts
@@ -18,7 +18,6 @@
 
 import { ProcessingElementTestUtils } from '../../support/utils/ProcessingElementTestUtils';
 import { ProcessorTest } from '../../support/model/ProcessorTest';
-import { AdapterUtils } from '../../support/utils/AdapterUtils';
 
 const allTests = Cypress.env('processingElements');
 

[incubator-streampipes] 04/04: [STREAMPIPES-438] Remove data stream source id from backend

Posted by ze...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

zehnder pushed a commit to branch STREAMPIPES-438
in repository https://gitbox.apache.org/repos/asf/incubator-streampipes.git

commit 3bdfe1d09762efa679f01f4ed48cd3e23e7c51bb
Author: Philipp Zehnder <ze...@fzi.de>
AuthorDate: Mon Oct 11 14:59:49 2021 +0200

    [STREAMPIPES-438] Remove data stream source id from backend
---
 .../container/master/health/AdapterHealthCheck.java        | 14 ++++++++------
 .../manager/execution/http/PipelineExecutor.java           |  6 +++---
 .../streampipes/rest/impl/connect/SourcesResource.java     |  5 ++---
 3 files changed, 13 insertions(+), 12 deletions(-)

diff --git a/streampipes-connect-container-master/src/main/java/org/apache/streampipes/connect/container/master/health/AdapterHealthCheck.java b/streampipes-connect-container-master/src/main/java/org/apache/streampipes/connect/container/master/health/AdapterHealthCheck.java
index 7063649..132b769 100644
--- a/streampipes-connect-container-master/src/main/java/org/apache/streampipes/connect/container/master/health/AdapterHealthCheck.java
+++ b/streampipes-connect-container-master/src/main/java/org/apache/streampipes/connect/container/master/health/AdapterHealthCheck.java
@@ -79,12 +79,14 @@ public class AdapterHealthCheck {
         Map<String, List<AdapterDescription>> groupByWorker = new HashMap<>();
         allRunningInstancesAdapterDescription.values().forEach(ad -> {
             String selectedEndpointUrl = ad.getSelectedEndpointUrl();
-            if (groupByWorker.containsKey(selectedEndpointUrl)) {
-                groupByWorker.get(selectedEndpointUrl).add(ad);
-            } else {
-                List<AdapterDescription> tmp = new ArrayList<>();
-                tmp.add(ad);
-                groupByWorker.put(selectedEndpointUrl, tmp);
+            if (selectedEndpointUrl != null) {
+                if (groupByWorker.containsKey(selectedEndpointUrl)) {
+                    groupByWorker.get(selectedEndpointUrl).add(ad);
+                } else {
+                    List<AdapterDescription> tmp = new ArrayList<>();
+                    tmp.add(ad);
+                    groupByWorker.put(selectedEndpointUrl, tmp);
+                }
             }
         });
 
diff --git a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/http/PipelineExecutor.java b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/http/PipelineExecutor.java
index b4492bc..9901619 100644
--- a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/http/PipelineExecutor.java
+++ b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/http/PipelineExecutor.java
@@ -157,17 +157,17 @@ public class PipelineExecutor {
   private String findSelectedEndpoint(SpDataSet ds) throws NoServiceEndpointsAvailableException {
     String appId = ds.getAppId() != null ? ds.getAppId() : ds.getCorrespondingAdapterId();
     if (ds.isInternallyManaged()) {
-      return getConnectMasterSourcesUrl(ds.getElementId());
+      return getConnectMasterSourcesUrl();
     } else {
       return new ExtensionsServiceEndpointGenerator(appId, SpServiceUrlProvider.DATA_SET)
               .getEndpointResourceUrl();
     }
   }
 
-  private String getConnectMasterSourcesUrl(String elementId) throws NoServiceEndpointsAvailableException {
+  private String getConnectMasterSourcesUrl() throws NoServiceEndpointsAvailableException {
     List<String> connectMasterEndpoints = SpServiceDiscovery.getServiceDiscovery().getServiceEndpoints(DefaultSpServiceGroups.CORE, true, Collections.singletonList(DefaultSpServiceTags.CONNECT_MASTER.asString()));
     if (connectMasterEndpoints.size() > 0) {
-      return connectMasterEndpoints.get(0) + GlobalStreamPipesConstants.CONNECT_MASTER_SOURCES_ENDPOINT + elementId;
+      return connectMasterEndpoints.get(0) + GlobalStreamPipesConstants.CONNECT_MASTER_SOURCES_ENDPOINT;
     } else {
       throw new NoServiceEndpointsAvailableException("Could not find any available connect master service endpoint");
     }
diff --git a/streampipes-rest/src/main/java/org/apache/streampipes/rest/impl/connect/SourcesResource.java b/streampipes-rest/src/main/java/org/apache/streampipes/rest/impl/connect/SourcesResource.java
index b792711..00293d4 100644
--- a/streampipes-rest/src/main/java/org/apache/streampipes/rest/impl/connect/SourcesResource.java
+++ b/streampipes-rest/src/main/java/org/apache/streampipes/rest/impl/connect/SourcesResource.java
@@ -41,10 +41,9 @@ public class SourcesResource extends AbstractAdapterResource<SourcesManagement>
 
     @POST
     @Consumes(MediaType.APPLICATION_JSON)
-    @Path("/{streamId}")
+    @Path("/")
     @Produces(MediaType.APPLICATION_JSON)
-    public Response addSetAdapter(@PathParam("streamId") String streamId,
-                               SpDataSet dataSet) {
+    public Response addSetAdapter(SpDataSet dataSet) {
 
         String responseMessage = "Instance of data set " + dataSet.getElementId() + " successfully started";