You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@streampipes.apache.org by ze...@apache.org on 2021/10/07 10:27:48 UTC

[incubator-streampipes] 02/03: Working on simplifying adapters

This is an automated email from the ASF dual-hosted git repository.

zehnder pushed a commit to branch STREAMPIPES-438
in repository https://gitbox.apache.org/repos/asf/incubator-streampipes.git

commit c87cec6ff0fa7ef1cecee2d5fa321d152a5c597c
Author: Philipp Zehnder <ze...@fzi.de>
AuthorDate: Wed Oct 6 19:15:37 2021 +0200

    Working on simplifying adapters
---
 .../master/health/AdapterHealthCheck.java          |   4 +-
 .../master/management/AdapterMasterManagement.java |  50 +++++----
 .../AdapterTemplateMasterManagement.java           |   1 +
 .../master/management/DescriptionManagement.java   |  23 ++--
 .../master/management/SourcesManagement.java       |   8 +-
 .../management/WorkerAdministrationManagement.java |  28 ++++-
 .../master/management/WorkerRestClient.java        |   6 +-
 .../master/management/WorkerUrlProvider.java       |  15 +--
 .../management/AdapterMasterManagementTest.java    |  15 ++-
 .../AdapterTemplateMasterManagementTest.java       |   1 +
 .../master/management/SourcesManagementTest.java   |  15 ++-
 .../worker/init/AdapterWorkerContainer.java        |   2 +-
 .../init/ConnectWorkerDescriptionProvider.java     |   3 +
 .../init/ConnectWorkerRegistrationService.java     |   5 +-
 .../extensions/ExtensionsModelSubmitter.java       |   2 +-
 .../model/connect/adapter/AdapterDescription.java  |   3 +
 .../connect/adapter/AdapterDescriptionList.java    |   1 +
 .../apache/streampipes/rest/ResetManagement.java   |   2 +-
 .../rest/impl/connect/AdapterResource.java         |   8 +-
 .../rest/impl/connect/AdapterTemplateResource.java |   1 +
 .../rest/impl/connect/DescriptionResource.java     |   5 +-
 .../rest/impl/connect/SourcesResource.java         |   6 +-
 .../rest/impl/connect/WelcomePageMaster.java       |   2 +-
 .../impl/connect/WorkerAdministrationResource.java |   2 +-
 .../storage/api/IAdapterTemplateStorage.java       |   1 +
 .../streampipes/storage/api/INoSqlStorage.java     |   2 -
 .../storage/couchdb/CouchDbStorageManager.java     |   7 +-
 ...mpl.java => AdapterDescriptionStorageImpl.java} |  12 +--
 ...geImpl.java => AdapterInstanceStorageImpl.java} |   8 +-
 .../couchdb/impl/AdapterTemplateStorageImpl.java   |   3 +-
 .../streampipes/storage/couchdb/utils/Utils.java   |   8 +-
 .../data-marketplace/data-marketplace.component.ts |  26 ++---
 .../connect/services/data-marketplace.service.ts   | 119 ++++++++++-----------
 33 files changed, 201 insertions(+), 193 deletions(-)

diff --git a/streampipes-connect-container-master/src/main/java/org/apache/streampipes/connect/container/master/health/AdapterHealthCheck.java b/streampipes-connect-container-master/src/main/java/org/apache/streampipes/connect/container/master/health/AdapterHealthCheck.java
index 89bdd64..f343c8b 100644
--- a/streampipes-connect-container-master/src/main/java/org/apache/streampipes/connect/container/master/health/AdapterHealthCheck.java
+++ b/streampipes-connect-container-master/src/main/java/org/apache/streampipes/connect/container/master/health/AdapterHealthCheck.java
@@ -22,7 +22,7 @@ import org.apache.streampipes.connect.api.exception.AdapterException;
 import org.apache.streampipes.connect.container.master.management.AdapterMasterManagement;
 import org.apache.streampipes.model.connect.adapter.AdapterDescription;
 import org.apache.streampipes.storage.api.IAdapterStorage;
-import org.apache.streampipes.storage.couchdb.impl.AdapterStorageImpl;
+import org.apache.streampipes.storage.couchdb.impl.AdapterInstanceStorageImpl;
 
 import java.util.List;
 
@@ -33,7 +33,7 @@ public class AdapterHealthCheck {
 
     public void checkAndRestoreAdapters() {
         AdapterMasterManagement adapterMasterManagement = new AdapterMasterManagement();
-        IAdapterStorage adapterStorage = new AdapterStorageImpl();
+        IAdapterStorage adapterStorage = new AdapterInstanceStorageImpl();
 
 
         // Get all adapters
diff --git a/streampipes-connect-container-master/src/main/java/org/apache/streampipes/connect/container/master/management/AdapterMasterManagement.java b/streampipes-connect-container-master/src/main/java/org/apache/streampipes/connect/container/master/management/AdapterMasterManagement.java
index 77342c5..ab70e8e 100644
--- a/streampipes-connect-container-master/src/main/java/org/apache/streampipes/connect/container/master/management/AdapterMasterManagement.java
+++ b/streampipes-connect-container-master/src/main/java/org/apache/streampipes/connect/container/master/management/AdapterMasterManagement.java
@@ -35,7 +35,8 @@ import org.apache.streampipes.model.grounding.EventGrounding;
 import org.apache.streampipes.model.util.Cloner;
 import org.apache.streampipes.storage.api.IAdapterStorage;
 import org.apache.streampipes.storage.api.IPipelineElementDescriptionStorageCache;
-import org.apache.streampipes.storage.couchdb.impl.AdapterStorageImpl;
+import org.apache.streampipes.storage.couchdb.impl.AdapterDescriptionStorageImpl;
+import org.apache.streampipes.storage.couchdb.impl.AdapterInstanceStorageImpl;
 import org.apache.streampipes.storage.management.StorageManager;
 import org.apache.streampipes.svcdiscovery.api.model.SpServiceUrlProvider;
 import org.slf4j.Logger;
@@ -53,20 +54,21 @@ public class AdapterMasterManagement {
 
   private static final Logger LOG = LoggerFactory.getLogger(AdapterMasterManagement.class);
 
-  private IAdapterStorage adapterStorage;
+  private IAdapterStorage adapterInstanceStorage;
   private WorkerUrlProvider workerUrlProvider;
 
   public AdapterMasterManagement() {
-    this.adapterStorage = getAdapterStorage();
+    this.adapterInstanceStorage = getAdapterInstanceStorage();
+    this.adapterDescriptionStorage = new AdapterDescriptionStorageImpl();
     this.workerUrlProvider = new WorkerUrlProvider();
   }
 
   public AdapterMasterManagement(IAdapterStorage adapterStorage) {
-    this.adapterStorage = adapterStorage;
+    this.adapterInstanceStorage = adapterStorage;
   }
 
   public void startAllStreamAdapters(ConnectWorkerContainer connectWorkerContainer) throws AdapterException, NoServiceEndpointsAvailableException {
-    IAdapterStorage adapterStorage = getAdapterStorage();
+    IAdapterStorage adapterStorage = getAdapterInstanceStorage();
     List<AdapterDescription> allAdapters = adapterStorage.getAllAdapters();
 
     for (AdapterDescription ad : allAdapters) {
@@ -97,13 +99,12 @@ public class AdapterMasterManagement {
     ad.setElementId(ad.getElementId() + ":" + uuid);
     ad.setCreatedAt(System.currentTimeMillis());
     ad.setSelectedEndpointUrl(endpointUrl);
-    ad.setCorrespondingServiceGroup(new WorkerUrlProvider().getWorkerServiceGroup(ad.getAppId()));
 
     AdapterDescription encryptedAdapterDescription =
             new AdapterEncryptionService(new Cloner().adapterDescription(ad)).encrypt();
 
     // store in db
-    String adapterId = adapterStorage.storeAdapter(encryptedAdapterDescription);
+    String adapterId = adapterInstanceStorage.storeAdapter(encryptedAdapterDescription);
 
     // start when stream adapter
     if (ad instanceof AdapterStreamDescription) {
@@ -129,7 +130,7 @@ public class AdapterMasterManagement {
   }
 
   public AdapterDescription getAdapter(String id) throws AdapterException {
-    List<AdapterDescription> allAdapters = adapterStorage.getAllAdapters();
+    List<AdapterDescription> allAdapters = adapterInstanceStorage.getAllAdapters();
 
     if (allAdapters != null && id != null) {
       for (AdapterDescription ad : allAdapters) {
@@ -161,9 +162,9 @@ public class AdapterMasterManagement {
     }
 
 
-    AdapterDescription ad = adapterStorage.getAdapter(elementId);
+    AdapterDescription ad = adapterInstanceStorage.getAdapter(elementId);
     String username = ad.getUserName();
-    adapterStorage.deleteAdapter(elementId);
+    adapterInstanceStorage.deleteAdapter(elementId);
     LOG.info("Successfully deleted adapter: " + elementId);
 
     UserService userService = getUserService();
@@ -176,9 +177,20 @@ public class AdapterMasterManagement {
     }
   }
 
-  public List<AdapterDescription> getAllAdapters() throws AdapterException {
+  public List<AdapterDescription> getAllAdapterInstances() throws AdapterException {
 
-    List<AdapterDescription> allAdapters = adapterStorage.getAllAdapters();
+    List<AdapterDescription> allAdapters = adapterInstanceStorage.getAllAdapters();
+
+    if (allAdapters == null) {
+      throw new AdapterException("Could not get all adapters");
+    }
+
+    return allAdapters;
+  }
+
+  public List<AdapterDescription> getAllAdapterDescriptions() throws AdapterException {
+
+    List<AdapterDescription> allAdapters = adapterInstanceStorage.getAllAdapters();
 
     if (allAdapters == null) {
       throw new AdapterException("Could not get all adapters");
@@ -187,7 +199,7 @@ public class AdapterMasterManagement {
     return allAdapters;
   }
 
-  public void stopSetAdapter(String adapterId, String baseUrl, AdapterStorageImpl adapterStorage) throws AdapterException {
+  public void stopSetAdapter(String adapterId, String baseUrl, AdapterInstanceStorageImpl adapterStorage) throws AdapterException {
 
     AdapterSetDescription ad = (AdapterSetDescription) adapterStorage.getAdapter(adapterId);
 
@@ -195,7 +207,7 @@ public class AdapterMasterManagement {
   }
 
   public void stopStreamAdapter(String elementId) throws AdapterException {
-    AdapterDescription ad = adapterStorage.getAdapter(elementId);
+    AdapterDescription ad = adapterInstanceStorage.getAdapter(elementId);
 
     if (!isStreamAdapter(elementId)) {
       throw new AdapterException("Adapter " + elementId + "is not a stream adapter.");
@@ -207,7 +219,7 @@ public class AdapterMasterManagement {
   public void startStreamAdapter(String elementId) throws AdapterException {
     // TODO ensure that adapter is not started twice
 
-    AdapterDescription ad = adapterStorage.getAdapter(elementId);
+    AdapterDescription ad = adapterInstanceStorage.getAdapter(elementId);
     try {
       String endpointUrl = findEndpointUrl(ad);
       URI uri = new URI(endpointUrl);
@@ -216,7 +228,7 @@ public class AdapterMasterManagement {
         throw new AdapterException("Adapter " + elementId + "is not a stream adapter.");
       } else {
         ad.setSelectedEndpointUrl(baseUrl);
-        adapterStorage.updateAdapter(ad);
+        adapterInstanceStorage.updateAdapter(ad);
 
         AdapterDescription decryptedAdapterDescription =
                 new AdapterEncryptionService(new Cloner().adapterDescription(ad)).decrypt();
@@ -232,7 +244,7 @@ public class AdapterMasterManagement {
   }
 
   public boolean isStreamAdapter(String id) {
-    AdapterDescription adapterDescription = adapterStorage.getAdapter(id);
+    AdapterDescription adapterDescription = adapterInstanceStorage.getAdapter(id);
     return isStreamAdapter(adapterDescription);
   }
 
@@ -240,8 +252,8 @@ public class AdapterMasterManagement {
     return adapterDescription instanceof AdapterStreamDescription;
   }
 
-  private IAdapterStorage getAdapterStorage() {
-    return new AdapterStorageImpl();
+  private IAdapterStorage getAdapterInstanceStorage() {
+    return new AdapterInstanceStorageImpl();
   }
 
   private String findEndpointUrl(AdapterDescription adapterDescription) throws NoServiceEndpointsAvailableException {
diff --git a/streampipes-connect-container-master/src/main/java/org/apache/streampipes/connect/container/master/management/AdapterTemplateMasterManagement.java b/streampipes-connect-container-master/src/main/java/org/apache/streampipes/connect/container/master/management/AdapterTemplateMasterManagement.java
index b3df802..fec1eb9 100644
--- a/streampipes-connect-container-master/src/main/java/org/apache/streampipes/connect/container/master/management/AdapterTemplateMasterManagement.java
+++ b/streampipes-connect-container-master/src/main/java/org/apache/streampipes/connect/container/master/management/AdapterTemplateMasterManagement.java
@@ -31,6 +31,7 @@ import java.util.List;
 import java.util.UUID;
 
 
+@Deprecated
 public class AdapterTemplateMasterManagement {
 
     private IAdapterTemplateStorage adapterTemplateStorage;
diff --git a/streampipes-connect-container-master/src/main/java/org/apache/streampipes/connect/container/master/management/DescriptionManagement.java b/streampipes-connect-container-master/src/main/java/org/apache/streampipes/connect/container/master/management/DescriptionManagement.java
index b2e60a0..49f23b2 100644
--- a/streampipes-connect-container-master/src/main/java/org/apache/streampipes/connect/container/master/management/DescriptionManagement.java
+++ b/streampipes-connect-container-master/src/main/java/org/apache/streampipes/connect/container/master/management/DescriptionManagement.java
@@ -19,14 +19,15 @@
 package org.apache.streampipes.connect.container.master.management;
 
 import org.apache.streampipes.connect.adapter.AdapterRegistry;
-import org.apache.streampipes.connect.api.exception.AdapterException;
 import org.apache.streampipes.connect.api.IFormat;
+import org.apache.streampipes.connect.api.exception.AdapterException;
 import org.apache.streampipes.model.connect.adapter.AdapterDescription;
-import org.apache.streampipes.model.connect.adapter.AdapterDescriptionList;
 import org.apache.streampipes.model.connect.grounding.FormatDescriptionList;
 import org.apache.streampipes.model.connect.grounding.ProtocolDescription;
 import org.apache.streampipes.model.connect.grounding.ProtocolDescriptionList;
 import org.apache.streampipes.model.connect.worker.ConnectWorkerContainer;
+import org.apache.streampipes.storage.api.IAdapterStorage;
+import org.apache.streampipes.storage.couchdb.impl.AdapterDescriptionStorageImpl;
 import org.apache.streampipes.storage.couchdb.impl.ConnectionWorkerContainerStorageImpl;
 
 import java.util.List;
@@ -35,6 +36,7 @@ import java.util.Optional;
 
 public class DescriptionManagement {
 
+    @Deprecated
     public ProtocolDescriptionList getProtocols() {
         ConnectionWorkerContainerStorageImpl connectionWorkerContainerStorage = new ConnectionWorkerContainerStorageImpl();
 
@@ -61,22 +63,13 @@ public class DescriptionManagement {
         return result;
     }
 
-    public AdapterDescriptionList getAdapters() {
-        ConnectionWorkerContainerStorageImpl connectionWorkerContainerStorage = new ConnectionWorkerContainerStorageImpl();
-
-        List<ConnectWorkerContainer> allWorkerContainter = connectionWorkerContainerStorage.getAllConnectWorkerContainers();
-
-        AdapterDescriptionList result = new AdapterDescriptionList();
-
-        for (ConnectWorkerContainer connectWorkerContainer : allWorkerContainter) {
-           result.getList().addAll(connectWorkerContainer.getAdapters());
-        }
-
-        return result;
+    public List<AdapterDescription> getAdapters() {
+        IAdapterStorage adapterStorage = new AdapterDescriptionStorageImpl();
+        return adapterStorage.getAllAdapters();
     }
 
     public Optional<AdapterDescription> getAdapter(String id) {
-        return getAdapters().getList().stream()
+        return getAdapters().stream()
                 .filter(desc -> desc.getAppId().equals(id))
                 .findFirst();
     }
diff --git a/streampipes-connect-container-master/src/main/java/org/apache/streampipes/connect/container/master/management/SourcesManagement.java b/streampipes-connect-container-master/src/main/java/org/apache/streampipes/connect/container/master/management/SourcesManagement.java
index e48520e..f362473 100644
--- a/streampipes-connect-container-master/src/main/java/org/apache/streampipes/connect/container/master/management/SourcesManagement.java
+++ b/streampipes-connect-container-master/src/main/java/org/apache/streampipes/connect/container/master/management/SourcesManagement.java
@@ -33,7 +33,7 @@ import org.apache.streampipes.model.connect.adapter.AdapterStreamDescription;
 import org.apache.streampipes.model.grounding.EventGrounding;
 import org.apache.streampipes.model.util.Cloner;
 import org.apache.streampipes.sdk.helpers.SupportedProtocols;
-import org.apache.streampipes.storage.couchdb.impl.AdapterStorageImpl;
+import org.apache.streampipes.storage.couchdb.impl.AdapterInstanceStorageImpl;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -47,17 +47,17 @@ public class SourcesManagement {
 
     private Logger logger = LoggerFactory.getLogger(SourcesManagement.class);
 
-    private AdapterStorageImpl adapterStorage;
+    private AdapterInstanceStorageImpl adapterStorage;
     private WorkerUrlProvider workerUrlProvider;
     private String connectHost = null;
 
-    public SourcesManagement(AdapterStorageImpl adapterStorage) {
+    public SourcesManagement(AdapterInstanceStorageImpl adapterStorage) {
       this.adapterStorage = adapterStorage;
       this.workerUrlProvider = new WorkerUrlProvider();
     }
 
     public SourcesManagement() {
-       this(new AdapterStorageImpl());
+       this(new AdapterInstanceStorageImpl());
     }
 
     public void addAdapter(String streamId, SpDataSet dataSet) throws AdapterException, NoServiceEndpointsAvailableException {
diff --git a/streampipes-connect-container-master/src/main/java/org/apache/streampipes/connect/container/master/management/WorkerAdministrationManagement.java b/streampipes-connect-container-master/src/main/java/org/apache/streampipes/connect/container/master/management/WorkerAdministrationManagement.java
index 2df8f95..a3be118 100644
--- a/streampipes-connect-container-master/src/main/java/org/apache/streampipes/connect/container/master/management/WorkerAdministrationManagement.java
+++ b/streampipes-connect-container-master/src/main/java/org/apache/streampipes/connect/container/master/management/WorkerAdministrationManagement.java
@@ -19,11 +19,15 @@
 package org.apache.streampipes.connect.container.master.management;
 
 import org.apache.streampipes.connect.container.master.health.AdapterHealthCheck;
-import org.apache.streampipes.model.connect.worker.ConnectWorkerContainer;
+import org.apache.streampipes.model.connect.adapter.AdapterDescription;
+import org.apache.streampipes.storage.api.IAdapterStorage;
+import org.apache.streampipes.storage.couchdb.CouchDbStorageManager;
 import org.apache.streampipes.storage.couchdb.impl.ConnectionWorkerContainerStorageImpl;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.util.List;
+
 public class WorkerAdministrationManagement {
 
     private static final Logger LOG = LoggerFactory.getLogger(AdapterMasterManagement.class);
@@ -31,18 +35,34 @@ public class WorkerAdministrationManagement {
     private ConnectionWorkerContainerStorageImpl connectionWorkerContainerStorage;
     private AdapterMasterManagement adapterMasterManagement;
 
+    private IAdapterStorage adapterStorage;
+
     private AdapterHealthCheck adapterHealthCheck;
 
     public WorkerAdministrationManagement() {
         this.connectionWorkerContainerStorage = new ConnectionWorkerContainerStorageImpl();
         this.adapterMasterManagement = new AdapterMasterManagement();
         this.adapterHealthCheck = new AdapterHealthCheck();
+        this.adapterStorage = CouchDbStorageManager.INSTANCE.getAdapterStorage();
     }
 
-    public void register(ConnectWorkerContainer connectWorker) {
-        // TODO how do I register the protocols and adapters of a worker?
+    public void register(List<AdapterDescription> availableAdapterDescription) {
+
+        List<AdapterDescription> alreadyRegisteredAdapters = this.adapterStorage.getAllAdapters();
+
+        availableAdapterDescription.forEach(adapterDescription -> {
+
+            // only install once adapter description per service group
+            boolean alreadyInstalled = alreadyRegisteredAdapters
+                    .stream()
+                    .anyMatch(a -> a.getAppId().equals(adapterDescription.getAppId()) && a.getCorrespondingServiceGroup().equals(adapterDescription.getCorrespondingServiceGroup()));
+            if (!alreadyInstalled) {
+                this.adapterStorage.storeAdapter(adapterDescription);
+            }
+        });
+
+//        this.adapterHealthCheck.checkAndRestoreAdapters();
 
-        this.adapterHealthCheck.checkAndRestoreAdapters();
 
         // Check if already registered
 
diff --git a/streampipes-connect-container-master/src/main/java/org/apache/streampipes/connect/container/master/management/WorkerRestClient.java b/streampipes-connect-container-master/src/main/java/org/apache/streampipes/connect/container/master/management/WorkerRestClient.java
index 2043732..a873f8c 100644
--- a/streampipes-connect-container-master/src/main/java/org/apache/streampipes/connect/container/master/management/WorkerRestClient.java
+++ b/streampipes-connect-container-master/src/main/java/org/apache/streampipes/connect/container/master/management/WorkerRestClient.java
@@ -31,7 +31,7 @@ import org.apache.streampipes.model.runtime.RuntimeOptionsResponse;
 import org.apache.streampipes.model.util.Cloner;
 import org.apache.streampipes.serializers.json.JacksonSerializer;
 import org.apache.streampipes.storage.api.IAdapterStorage;
-import org.apache.streampipes.storage.couchdb.impl.AdapterStorageImpl;
+import org.apache.streampipes.storage.couchdb.impl.AdapterInstanceStorageImpl;
 import org.apache.streampipes.storage.management.StorageDispatcher;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -60,7 +60,7 @@ public class WorkerRestClient {
     public static void stopStreamAdapter(String baseUrl, AdapterStreamDescription adapterStreamDescription) throws AdapterException {
         String url = baseUrl + WorkerPaths.getStreamStopPath();
 
-        AdapterDescription ad = getAdapterDescriptionById(new AdapterStorageImpl(), adapterStreamDescription.getElementId());
+        AdapterDescription ad = getAdapterDescriptionById(new AdapterInstanceStorageImpl(), adapterStreamDescription.getElementId());
 
         stopAdapter(ad, url);
         updateStreamAdapterStatus(adapterStreamDescription.getElementId(), false);
@@ -196,7 +196,7 @@ public class WorkerRestClient {
     }
 
 
-    private static AdapterDescription getAdapterDescriptionById(AdapterStorageImpl adapterStorage, String id) {
+    private static AdapterDescription getAdapterDescriptionById(AdapterInstanceStorageImpl adapterStorage, String id) {
         AdapterDescription adapterDescription = null;
         List<AdapterDescription> allAdapters = adapterStorage.getAllAdapters();
         for (AdapterDescription a : allAdapters) {
diff --git a/streampipes-connect-container-master/src/main/java/org/apache/streampipes/connect/container/master/management/WorkerUrlProvider.java b/streampipes-connect-container-master/src/main/java/org/apache/streampipes/connect/container/master/management/WorkerUrlProvider.java
index 2b93972..9a0648e 100644
--- a/streampipes-connect-container-master/src/main/java/org/apache/streampipes/connect/container/master/management/WorkerUrlProvider.java
+++ b/streampipes-connect-container-master/src/main/java/org/apache/streampipes/connect/container/master/management/WorkerUrlProvider.java
@@ -28,8 +28,6 @@ import org.apache.streampipes.storage.api.IConnectWorkerContainerStorage;
 import org.apache.streampipes.storage.couchdb.impl.ConnectionWorkerContainerStorageImpl;
 import org.apache.streampipes.svcdiscovery.api.model.SpServiceUrlProvider;
 
-import java.util.List;
-
 public class WorkerUrlProvider {
 
   private IConnectWorkerContainerStorage connectWorkerContainerStorage;
@@ -81,18 +79,7 @@ public class WorkerUrlProvider {
   }
 
   private ExtensionsServiceEndpointGenerator getEndpointGenerator(String appId) {
-    SpServiceUrlProvider provider = null;
-    List<ConnectWorkerContainer> allConnectWorkerContainer = this.connectWorkerContainerStorage.getAllConnectWorkerContainers();
-
-    for (ConnectWorkerContainer connectWorkerContainer : allConnectWorkerContainer) {
-      if (connectWorkerContainer.getProtocols().stream().anyMatch(p -> p.getAppId().equals(appId))) {
-        provider = SpServiceUrlProvider.PROTOCOL;
-      } else if (connectWorkerContainer.getAdapters().stream().anyMatch(a -> a.getAppId().equals(appId))) {
-        provider = SpServiceUrlProvider.ADAPTER;
-      }
-    }
-
-    return new ExtensionsServiceEndpointGenerator(appId, provider);
+    return new ExtensionsServiceEndpointGenerator(appId, SpServiceUrlProvider.ADAPTER);
   }
 
 }
diff --git a/streampipes-connect-container-master/src/test/java/org/apache/streampipes/connect/container/master/management/AdapterMasterManagementTest.java b/streampipes-connect-container-master/src/test/java/org/apache/streampipes/connect/container/master/management/AdapterMasterManagementTest.java
index 7b6dfb4..6c874e3 100644
--- a/streampipes-connect-container-master/src/test/java/org/apache/streampipes/connect/container/master/management/AdapterMasterManagementTest.java
+++ b/streampipes-connect-container-master/src/test/java/org/apache/streampipes/connect/container/master/management/AdapterMasterManagementTest.java
@@ -21,7 +21,7 @@ package org.apache.streampipes.connect.container.master.management;
 import org.apache.streampipes.connect.api.exception.AdapterException;
 import org.apache.streampipes.model.connect.adapter.AdapterDescription;
 import org.apache.streampipes.model.connect.adapter.GenericAdapterStreamDescription;
-import org.apache.streampipes.storage.couchdb.impl.AdapterStorageImpl;
+import org.apache.streampipes.storage.couchdb.impl.AdapterInstanceStorageImpl;
 import org.junit.Test;
 
 import java.util.Arrays;
@@ -35,7 +35,7 @@ public class AdapterMasterManagementTest {
 
     @Test(expected = AdapterException.class)
     public void getAdapterFailNull() throws AdapterException {
-        AdapterStorageImpl adapterStorage = mock(AdapterStorageImpl.class);
+        AdapterInstanceStorageImpl adapterStorage = mock(AdapterInstanceStorageImpl.class);
         when(adapterStorage.getAllAdapters()).thenReturn(null);
 
         AdapterMasterManagement adapterMasterManagement = new AdapterMasterManagement(adapterStorage);
@@ -46,7 +46,7 @@ public class AdapterMasterManagementTest {
     @Test(expected = AdapterException.class)
     public void getAdapterFail() throws AdapterException {
         List<AdapterDescription> adapterDescriptions = Arrays.asList(new GenericAdapterStreamDescription());
-        AdapterStorageImpl adapterStorage = mock(AdapterStorageImpl.class);
+        AdapterInstanceStorageImpl adapterStorage = mock(AdapterInstanceStorageImpl.class);
         when(adapterStorage.getAllAdapters()).thenReturn(adapterDescriptions);
 
         AdapterMasterManagement adapterMasterManagement = new AdapterMasterManagement(adapterStorage);
@@ -57,25 +57,24 @@ public class AdapterMasterManagementTest {
     @Test
     public void getAllAdaptersSuccess() throws AdapterException {
         List<AdapterDescription> adapterDescriptions = Arrays.asList(new GenericAdapterStreamDescription());
-        AdapterStorageImpl adapterStorage = mock(AdapterStorageImpl.class);
+        AdapterInstanceStorageImpl adapterStorage = mock(AdapterInstanceStorageImpl.class);
         when(adapterStorage.getAllAdapters()).thenReturn(adapterDescriptions);
 
         AdapterMasterManagement adapterMasterManagement = new AdapterMasterManagement(adapterStorage);
 
-        List<AdapterDescription> result = adapterMasterManagement.getAllAdapters();
+        List<AdapterDescription> result = adapterMasterManagement.getAllAdapterInstances();
 
         assertEquals(1, result.size());
-        assertEquals(GenericAdapterStreamDescription.ID, result.get(0).getUri());
     }
 
     @Test(expected = AdapterException.class)
     public void getAllAdaptersFail() throws AdapterException {
-        AdapterStorageImpl adapterStorage = mock(AdapterStorageImpl.class);
+        AdapterInstanceStorageImpl adapterStorage = mock(AdapterInstanceStorageImpl.class);
         when(adapterStorage.getAllAdapters()).thenReturn(null);
 
         AdapterMasterManagement adapterMasterManagement = new AdapterMasterManagement(adapterStorage);
 
-        adapterMasterManagement.getAllAdapters();
+        adapterMasterManagement.getAllAdapterInstances();
 
     }
 }
diff --git a/streampipes-connect-container-master/src/test/java/org/apache/streampipes/connect/container/master/management/AdapterTemplateMasterManagementTest.java b/streampipes-connect-container-master/src/test/java/org/apache/streampipes/connect/container/master/management/AdapterTemplateMasterManagementTest.java
index 6fc7133..17bc969 100644
--- a/streampipes-connect-container-master/src/test/java/org/apache/streampipes/connect/container/master/management/AdapterTemplateMasterManagementTest.java
+++ b/streampipes-connect-container-master/src/test/java/org/apache/streampipes/connect/container/master/management/AdapterTemplateMasterManagementTest.java
@@ -32,6 +32,7 @@ import static org.junit.Assert.assertNotNull;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.when;
 
+@Deprecated
 public class AdapterTemplateMasterManagementTest {
 
     @Test
diff --git a/streampipes-connect-container-master/src/test/java/org/apache/streampipes/connect/container/master/management/SourcesManagementTest.java b/streampipes-connect-container-master/src/test/java/org/apache/streampipes/connect/container/master/management/SourcesManagementTest.java
index b007fcb..bdad3b9 100644
--- a/streampipes-connect-container-master/src/test/java/org/apache/streampipes/connect/container/master/management/SourcesManagementTest.java
+++ b/streampipes-connect-container-master/src/test/java/org/apache/streampipes/connect/container/master/management/SourcesManagementTest.java
@@ -22,7 +22,7 @@ import org.apache.streampipes.connect.api.exception.AdapterException;
 import org.apache.streampipes.model.SpDataSet;
 import org.apache.streampipes.model.connect.adapter.AdapterDescription;
 import org.apache.streampipes.model.connect.adapter.GenericAdapterSetDescription;
-import org.apache.streampipes.storage.couchdb.impl.AdapterStorageImpl;
+import org.apache.streampipes.storage.couchdb.impl.AdapterInstanceStorageImpl;
 import org.junit.Before;
 import org.junit.Ignore;
 import org.junit.Test;
@@ -56,7 +56,7 @@ public class SourcesManagementTest {
     @Ignore
     @Test
     public void addAdapterSuccess() throws Exception {
-        AdapterStorageImpl adapterStorage = mock(AdapterStorageImpl.class);
+        AdapterInstanceStorageImpl adapterStorage = mock(AdapterInstanceStorageImpl.class);
         when(adapterStorage.getAllAdapters()).thenReturn(getAdapterDescriptionList());
         SourcesManagement sourcesManagement = new SourcesManagement(adapterStorage);
         doNothing().when(WorkerRestClient.class, "invokeSetAdapter", anyString(), any());
@@ -72,7 +72,7 @@ public class SourcesManagementTest {
     @Ignore
     @Test(expected = AdapterException.class)
     public void addAdapterFail() throws Exception {
-        AdapterStorageImpl adapterStorage = mock(AdapterStorageImpl.class);
+        AdapterInstanceStorageImpl adapterStorage = mock(AdapterInstanceStorageImpl.class);
         when(adapterStorage.getAllAdapters()).thenReturn(getAdapterDescriptionList());
         SourcesManagement sourcesManagement = new SourcesManagement(adapterStorage);
 
@@ -84,7 +84,7 @@ public class SourcesManagementTest {
     @Ignore
     @Test
     public void detachAdapterSuccess() throws Exception {
-        AdapterStorageImpl adapterStorage = mock(AdapterStorageImpl.class);
+        AdapterInstanceStorageImpl adapterStorage = mock(AdapterInstanceStorageImpl.class);
         when(adapterStorage.getAllAdapters()).thenReturn(getAdapterDescriptionList());
         SourcesManagement sourcesManagement = new SourcesManagement(adapterStorage);
         doNothing().when(WorkerRestClient.class, "stopSetAdapter", anyString(), any());
@@ -99,7 +99,7 @@ public class SourcesManagementTest {
     @Ignore
     @Test(expected = AdapterException.class)
     public void detachAdapterFail() throws Exception {
-        AdapterStorageImpl adapterStorage = mock(AdapterStorageImpl.class);
+        AdapterInstanceStorageImpl adapterStorage = mock(AdapterInstanceStorageImpl.class);
         when(adapterStorage.getAllAdapters()).thenReturn(getAdapterDescriptionList());
         SourcesManagement sourcesManagement = new SourcesManagement(adapterStorage);
         org.powermock.api.mockito.PowerMockito.doThrow(new AdapterException()).when(WorkerRestClient.class, "stopSetAdapter", anyString(), any());
@@ -111,7 +111,7 @@ public class SourcesManagementTest {
     @Test
     public void getAllAdaptersInstallDescriptionSuccess() throws Exception {
 
-        AdapterStorageImpl adapterStorage = mock(AdapterStorageImpl.class);
+        AdapterInstanceStorageImpl adapterStorage = mock(AdapterInstanceStorageImpl.class);
         when(adapterStorage.getAllAdapters()).thenReturn(getAdapterDescriptionList());
 
         SourcesManagement sourcesManagement = new SourcesManagement(adapterStorage);
@@ -124,9 +124,8 @@ public class SourcesManagementTest {
 
     @Test(expected = AdapterException.class)
     public void getAllAdaptersInstallDescriptionFail() throws Exception {
-        AdapterStorageImpl adapterStorage = mock(AdapterStorageImpl.class);
+        AdapterInstanceStorageImpl adapterStorage = mock(AdapterInstanceStorageImpl.class);
         AdapterDescription adapterDescription = new GenericAdapterSetDescription();
-        adapterDescription.setUri(" ");
         when(adapterStorage.getAllAdapters()).thenReturn(Arrays.asList(adapterDescription));
         SourcesManagement sourcesManagement = new SourcesManagement(adapterStorage);
         sourcesManagement.setConnectHost("host");
diff --git a/streampipes-connect-container-worker/src/main/java/org/apache/streampipes/connect/container/worker/init/AdapterWorkerContainer.java b/streampipes-connect-container-worker/src/main/java/org/apache/streampipes/connect/container/worker/init/AdapterWorkerContainer.java
index 6f868dd..032c633 100644
--- a/streampipes-connect-container-worker/src/main/java/org/apache/streampipes/connect/container/worker/init/AdapterWorkerContainer.java
+++ b/streampipes-connect-container-worker/src/main/java/org/apache/streampipes/connect/container/worker/init/AdapterWorkerContainer.java
@@ -50,7 +50,7 @@ public abstract class AdapterWorkerContainer extends StreamPipesExtensionsServic
 
   @Override
   public void afterServiceRegistered(SpServiceDefinition serviceDef) {
-    new ConnectWorkerRegistrationService().registerWorker(serviceDef.getServiceGroup());
+    new ConnectWorkerRegistrationService().registerWorker(serviceDef);
   }
 
   @Override
diff --git a/streampipes-connect-container-worker/src/main/java/org/apache/streampipes/connect/container/worker/init/ConnectWorkerDescriptionProvider.java b/streampipes-connect-container-worker/src/main/java/org/apache/streampipes/connect/container/worker/init/ConnectWorkerDescriptionProvider.java
index 0b48a29..0fbc203 100644
--- a/streampipes-connect-container-worker/src/main/java/org/apache/streampipes/connect/container/worker/init/ConnectWorkerDescriptionProvider.java
+++ b/streampipes-connect-container-worker/src/main/java/org/apache/streampipes/connect/container/worker/init/ConnectWorkerDescriptionProvider.java
@@ -44,6 +44,7 @@ public class ConnectWorkerDescriptionProvider {
     List<AdapterDescription> adapters = new ArrayList<>();
     for (IAdapter<?> a : DeclarersSingleton.getInstance().getAllAdapters()) {
       AdapterDescription desc = (AdapterDescription) rewrite(a.declareModel());
+      desc.setCorrespondingServiceGroup(serviceGroup);
       adapters.add(desc);
     }
 
@@ -54,11 +55,13 @@ public class ConnectWorkerDescriptionProvider {
         GenericAdapterStreamDescription desc = new GenericAdapterStreamDescription();
         desc.setAppId(protocolDescription.getAppId());
         desc.setProtocolDescription(protocolDescription);
+        desc.setCorrespondingServiceGroup(serviceGroup);
         adapters.add(desc);
       } else if (protocolDescription instanceof ProtocolSetDescription) {
         GenericAdapterSetDescription desc = new GenericAdapterSetDescription();
         desc.setAppId(protocolDescription.getAppId());
         desc.setProtocolDescription(protocolDescription);
+        desc.setCorrespondingServiceGroup(serviceGroup);
         adapters.add(desc);
       }
     }
diff --git a/streampipes-connect-container-worker/src/main/java/org/apache/streampipes/connect/container/worker/init/ConnectWorkerRegistrationService.java b/streampipes-connect-container-worker/src/main/java/org/apache/streampipes/connect/container/worker/init/ConnectWorkerRegistrationService.java
index 44a242e..9de2137 100644
--- a/streampipes-connect-container-worker/src/main/java/org/apache/streampipes/connect/container/worker/init/ConnectWorkerRegistrationService.java
+++ b/streampipes-connect-container-worker/src/main/java/org/apache/streampipes/connect/container/worker/init/ConnectWorkerRegistrationService.java
@@ -18,6 +18,7 @@
 package org.apache.streampipes.connect.container.worker.init;
 
 import org.apache.streampipes.connect.container.worker.management.MasterRestClient;
+import org.apache.streampipes.container.model.SpServiceDefinition;
 import org.apache.streampipes.svcdiscovery.SpServiceDiscovery;
 import org.apache.streampipes.svcdiscovery.api.model.DefaultSpServiceGroups;
 import org.slf4j.Logger;
@@ -30,7 +31,7 @@ public class ConnectWorkerRegistrationService {
 
   private static final Logger LOG = LoggerFactory.getLogger(ConnectWorkerRegistrationService.class);
 
-  public void registerWorker(String serviceGroup) {
+  public void registerWorker(SpServiceDefinition serviceDef) {
     boolean connected = false;
 
     while (!connected) {
@@ -39,7 +40,7 @@ public class ConnectWorkerRegistrationService {
         String masterUrl = coreServices.get(0) + "/streampipes-backend";
         LOG.info("Trying to connect to master: " + masterUrl);
         connected = MasterRestClient.register(masterUrl,
-                new ConnectWorkerDescriptionProvider().getContainerDescription(serviceGroup));
+                new ConnectWorkerDescriptionProvider().getContainerDescription(serviceDef.getServiceGroup()));
 
         if (connected) {
           LOG.info("Successfully connected to master: " + masterUrl + " Worker is now running.");
diff --git a/streampipes-container-extensions/src/main/java/org/apache/streampipes/container/extensions/ExtensionsModelSubmitter.java b/streampipes-container-extensions/src/main/java/org/apache/streampipes/container/extensions/ExtensionsModelSubmitter.java
index a95b185..63b8d15 100644
--- a/streampipes-container-extensions/src/main/java/org/apache/streampipes/container/extensions/ExtensionsModelSubmitter.java
+++ b/streampipes-container-extensions/src/main/java/org/apache/streampipes/container/extensions/ExtensionsModelSubmitter.java
@@ -50,7 +50,7 @@ public abstract class ExtensionsModelSubmitter extends StreamPipesExtensionsServ
 
     @Override
     public void afterServiceRegistered(SpServiceDefinition serviceDef) {
-        new ConnectWorkerRegistrationService().registerWorker(serviceDef.getServiceGroup());
+        new ConnectWorkerRegistrationService().registerWorker(serviceDef);
     }
 
     @Override
diff --git a/streampipes-model/src/main/java/org/apache/streampipes/model/connect/adapter/AdapterDescription.java b/streampipes-model/src/main/java/org/apache/streampipes/model/connect/adapter/AdapterDescription.java
index eeb9189..0a171da 100644
--- a/streampipes-model/src/main/java/org/apache/streampipes/model/connect/adapter/AdapterDescription.java
+++ b/streampipes-model/src/main/java/org/apache/streampipes/model/connect/adapter/AdapterDescription.java
@@ -61,6 +61,9 @@ public abstract class AdapterDescription extends NamedStreamPipesEntity {
     //  Is used to store where the adapter is running to stop it
     private String selectedEndpointUrl;
 
+    /**
+     * This is used to identify all the service within the service group the adapter can be invoked in
+     */
     private String correspondingServiceGroup;
 
     public AdapterDescription() {
diff --git a/streampipes-model/src/main/java/org/apache/streampipes/model/connect/adapter/AdapterDescriptionList.java b/streampipes-model/src/main/java/org/apache/streampipes/model/connect/adapter/AdapterDescriptionList.java
index 5718aa3..81b6484 100644
--- a/streampipes-model/src/main/java/org/apache/streampipes/model/connect/adapter/AdapterDescriptionList.java
+++ b/streampipes-model/src/main/java/org/apache/streampipes/model/connect/adapter/AdapterDescriptionList.java
@@ -25,6 +25,7 @@ import java.util.ArrayList;
 import java.util.List;
 
 @TsModel
+@Deprecated
 public class AdapterDescriptionList extends NamedStreamPipesEntity {
 
     private List<AdapterDescription> list;
diff --git a/streampipes-rest/src/main/java/org/apache/streampipes/rest/ResetManagement.java b/streampipes-rest/src/main/java/org/apache/streampipes/rest/ResetManagement.java
index 52721d9..680879a 100644
--- a/streampipes-rest/src/main/java/org/apache/streampipes/rest/ResetManagement.java
+++ b/streampipes-rest/src/main/java/org/apache/streampipes/rest/ResetManagement.java
@@ -68,7 +68,7 @@ public class ResetManagement {
         AdapterMasterManagement adapterMasterManagement = new AdapterMasterManagement();
 
         try {
-            List<AdapterDescription> allAdapters = adapterMasterManagement.getAllAdapters();
+            List<AdapterDescription> allAdapters = adapterMasterManagement.getAllAdapterInstances();
             allAdapters.forEach(adapterDescription -> {
                 try {
                     adapterMasterManagement.deleteAdapter(adapterDescription.getElementId());
diff --git a/streampipes-rest/src/main/java/org/apache/streampipes/rest/impl/connect/AdapterResource.java b/streampipes-rest/src/main/java/org/apache/streampipes/rest/impl/connect/AdapterResource.java
index 0831269..68e8594 100644
--- a/streampipes-rest/src/main/java/org/apache/streampipes/rest/impl/connect/AdapterResource.java
+++ b/streampipes-rest/src/main/java/org/apache/streampipes/rest/impl/connect/AdapterResource.java
@@ -23,7 +23,6 @@ import org.apache.streampipes.connect.api.exception.AdapterException;
 import org.apache.streampipes.connect.container.master.management.AdapterMasterManagement;
 import org.apache.streampipes.connect.container.master.management.WorkerUrlProvider;
 import org.apache.streampipes.model.connect.adapter.AdapterDescription;
-import org.apache.streampipes.model.connect.adapter.AdapterDescriptionList;
 import org.apache.streampipes.model.message.Notifications;
 import org.apache.streampipes.rest.shared.annotation.JacksonSerialized;
 import org.slf4j.Logger;
@@ -117,7 +116,7 @@ public class AdapterResource extends AbstractAdapterResource<AdapterMasterManage
 
         try {
             managementService.deleteAdapter(elementId);
-            return ok(Notifications.success("Adapter deleted."));
+            return ok(Notifications.success("Adapter with id: " + elementId + " is deleted."));
         } catch (AdapterException e) {
             LOG.error("Error while deleting adapter with id " + elementId, e);
             return ok(Notifications.error(e.getMessage()));
@@ -129,9 +128,8 @@ public class AdapterResource extends AbstractAdapterResource<AdapterMasterManage
     @Produces(MediaType.APPLICATION_JSON)
     public Response getAllAdapters() {
         try {
-            List<AdapterDescription> allAdapterDescription = managementService.getAllAdapters();
-            AdapterDescriptionList result = new AdapterDescriptionList();
-            result.setList(allAdapterDescription);
+            // TODO get all invoced adapters
+            List<AdapterDescription> result = managementService.getAllAdapterInstances();
 
             return ok(result);
         } catch (AdapterException e) {
diff --git a/streampipes-rest/src/main/java/org/apache/streampipes/rest/impl/connect/AdapterTemplateResource.java b/streampipes-rest/src/main/java/org/apache/streampipes/rest/impl/connect/AdapterTemplateResource.java
index aa57494..4763666 100644
--- a/streampipes-rest/src/main/java/org/apache/streampipes/rest/impl/connect/AdapterTemplateResource.java
+++ b/streampipes-rest/src/main/java/org/apache/streampipes/rest/impl/connect/AdapterTemplateResource.java
@@ -31,6 +31,7 @@ import javax.ws.rs.*;
 import javax.ws.rs.core.MediaType;
 import javax.ws.rs.core.Response;
 
+@Deprecated
 @Path("/v2/connect/master/adapters/template")
 public class AdapterTemplateResource extends AbstractAdapterResource<AdapterTemplateMasterManagement> {
 
diff --git a/streampipes-rest/src/main/java/org/apache/streampipes/rest/impl/connect/DescriptionResource.java b/streampipes-rest/src/main/java/org/apache/streampipes/rest/impl/connect/DescriptionResource.java
index 9f9ab7e..f22648e 100644
--- a/streampipes-rest/src/main/java/org/apache/streampipes/rest/impl/connect/DescriptionResource.java
+++ b/streampipes-rest/src/main/java/org/apache/streampipes/rest/impl/connect/DescriptionResource.java
@@ -22,7 +22,6 @@ import org.apache.streampipes.connect.api.exception.AdapterException;
 import org.apache.streampipes.connect.container.master.management.DescriptionManagement;
 import org.apache.streampipes.connect.container.master.management.WorkerUrlProvider;
 import org.apache.streampipes.model.connect.adapter.AdapterDescription;
-import org.apache.streampipes.model.connect.adapter.AdapterDescriptionList;
 import org.apache.streampipes.model.connect.grounding.FormatDescriptionList;
 import org.apache.streampipes.model.connect.grounding.ProtocolDescription;
 import org.apache.streampipes.model.connect.grounding.ProtocolDescriptionList;
@@ -36,6 +35,7 @@ import javax.ws.rs.PathParam;
 import javax.ws.rs.Produces;
 import javax.ws.rs.core.MediaType;
 import javax.ws.rs.core.Response;
+import java.util.List;
 import java.util.Optional;
 
 @Path("/v2/connect/master/description")
@@ -59,6 +59,7 @@ public class DescriptionResource extends AbstractAdapterResource<DescriptionMana
         return ok(result);
     }
 
+    @Deprecated
     @GET
     @JacksonSerialized
     @Path("/protocols")
@@ -74,7 +75,7 @@ public class DescriptionResource extends AbstractAdapterResource<DescriptionMana
     @Path("/adapters")
     @Produces(MediaType.APPLICATION_JSON)
     public Response getAdapters() {
-        AdapterDescriptionList result = managementService.getAdapters();
+        List<AdapterDescription> result = managementService.getAdapters();
 
         return ok(result);
     }
diff --git a/streampipes-rest/src/main/java/org/apache/streampipes/rest/impl/connect/SourcesResource.java b/streampipes-rest/src/main/java/org/apache/streampipes/rest/impl/connect/SourcesResource.java
index cfb611a..8b6d4cd 100644
--- a/streampipes-rest/src/main/java/org/apache/streampipes/rest/impl/connect/SourcesResource.java
+++ b/streampipes-rest/src/main/java/org/apache/streampipes/rest/impl/connect/SourcesResource.java
@@ -75,13 +75,13 @@ public class SourcesResource extends AbstractAdapterResource<SourcesManagement>
     public Response addAdapter(@PathParam("streamId") String elementId,
                                SpDataSet dataSet) {
 
-        String responseMessage = "Instance of data set " + dataSet.getUri() + " successfully started";
+        String responseMessage = "Instance of data set " + dataSet.getElementId() + " successfully started";
 
         try {
             managementService.addAdapter(elementId,  dataSet);
         } catch (AdapterException | NoServiceEndpointsAvailableException e) {
-            LOG.error("Could not set data set instance: " + dataSet.getUri(), e);
-            return ok(Notifications.error("Could not set data set instance: " + dataSet.getUri()));
+            LOG.error("Could not set data set instance: " + dataSet.getElementId(), e);
+            return ok(Notifications.error("Could not set data set instance: " + dataSet.getElementId()));
         }
 
         return ok(Notifications.success(responseMessage));
diff --git a/streampipes-rest/src/main/java/org/apache/streampipes/rest/impl/connect/WelcomePageMaster.java b/streampipes-rest/src/main/java/org/apache/streampipes/rest/impl/connect/WelcomePageMaster.java
index 46e82e1..5730300 100644
--- a/streampipes-rest/src/main/java/org/apache/streampipes/rest/impl/connect/WelcomePageMaster.java
+++ b/streampipes-rest/src/main/java/org/apache/streampipes/rest/impl/connect/WelcomePageMaster.java
@@ -77,7 +77,7 @@ public class WelcomePageMaster extends AbstractAdapterResource<AdapterMasterMana
 	private void getAllRunningAdapters(HtmlCanvas canvas) throws IOException {
 
 		try {
-			for (AdapterDescription ad : managementService.getAllAdapters()) {
+			for (AdapterDescription ad : managementService.getAllAdapterDescriptions()) {
 				canvas.li().write(ad.getElementId())._li();
 				canvas.ul().li().write("Kafka Topic: " + GroundingService.extractTopic(ad))._li()._ul();
 			}
diff --git a/streampipes-rest/src/main/java/org/apache/streampipes/rest/impl/connect/WorkerAdministrationResource.java b/streampipes-rest/src/main/java/org/apache/streampipes/rest/impl/connect/WorkerAdministrationResource.java
index a225b8c..27c7a94 100644
--- a/streampipes-rest/src/main/java/org/apache/streampipes/rest/impl/connect/WorkerAdministrationResource.java
+++ b/streampipes-rest/src/main/java/org/apache/streampipes/rest/impl/connect/WorkerAdministrationResource.java
@@ -51,7 +51,7 @@ public class WorkerAdministrationResource extends AbstractSharedRestInterface {
         // Change this to List<AdapterDescription>
         // How do I store the available AdapterDescriptions when there is no ConnectWorkerContainer
 //        LOG.info("Worker container: " + connectWorkerContainer.getServiceGroup() + " was detected");
-//        this.workerAdministrationManagement.register(connectWorkerContainer);
+        this.workerAdministrationManagement.register(availableAdapterDescription);
         System.out.println(availableAdapterDescription);
 
         return ok(Notifications.success("Worker Container successfully added"));
diff --git a/streampipes-storage-api/src/main/java/org/apache/streampipes/storage/api/IAdapterTemplateStorage.java b/streampipes-storage-api/src/main/java/org/apache/streampipes/storage/api/IAdapterTemplateStorage.java
index add9fa6..b2f7352 100644
--- a/streampipes-storage-api/src/main/java/org/apache/streampipes/storage/api/IAdapterTemplateStorage.java
+++ b/streampipes-storage-api/src/main/java/org/apache/streampipes/storage/api/IAdapterTemplateStorage.java
@@ -23,6 +23,7 @@ import org.apache.streampipes.model.connect.adapter.AdapterDescription;
 
 import java.util.List;
 
+@Deprecated
 public interface IAdapterTemplateStorage {
     List<AdapterDescription> getAllAdapterTemplates();
 
diff --git a/streampipes-storage-api/src/main/java/org/apache/streampipes/storage/api/INoSqlStorage.java b/streampipes-storage-api/src/main/java/org/apache/streampipes/storage/api/INoSqlStorage.java
index 54f8f27..d4e6415 100644
--- a/streampipes-storage-api/src/main/java/org/apache/streampipes/storage/api/INoSqlStorage.java
+++ b/streampipes-storage-api/src/main/java/org/apache/streampipes/storage/api/INoSqlStorage.java
@@ -21,8 +21,6 @@ public interface INoSqlStorage {
 
   IAdapterStorage getAdapterStorage();
 
-  IAdapterTemplateStorage getAdapterTemplateStorage();
-
   ICategoryStorage getCategoryStorageAPI();
 
   ILabelStorage getLabelStorageAPI();
diff --git a/streampipes-storage-couchdb/src/main/java/org/apache/streampipes/storage/couchdb/CouchDbStorageManager.java b/streampipes-storage-couchdb/src/main/java/org/apache/streampipes/storage/couchdb/CouchDbStorageManager.java
index 288ae0f..380cb23 100644
--- a/streampipes-storage-couchdb/src/main/java/org/apache/streampipes/storage/couchdb/CouchDbStorageManager.java
+++ b/streampipes-storage-couchdb/src/main/java/org/apache/streampipes/storage/couchdb/CouchDbStorageManager.java
@@ -26,12 +26,7 @@ public enum CouchDbStorageManager implements INoSqlStorage {
 
   @Override
   public IAdapterStorage getAdapterStorage() {
-    return new AdapterStorageImpl();
-  }
-
-  @Override
-  public IAdapterTemplateStorage getAdapterTemplateStorage() {
-    return new AdapterTemplateStorageImpl();
+    return new AdapterDescriptionStorageImpl();
   }
 
   @Override
diff --git a/streampipes-storage-couchdb/src/main/java/org/apache/streampipes/storage/couchdb/impl/AdapterStorageImpl.java b/streampipes-storage-couchdb/src/main/java/org/apache/streampipes/storage/couchdb/impl/AdapterDescriptionStorageImpl.java
similarity index 87%
copy from streampipes-storage-couchdb/src/main/java/org/apache/streampipes/storage/couchdb/impl/AdapterStorageImpl.java
copy to streampipes-storage-couchdb/src/main/java/org/apache/streampipes/storage/couchdb/impl/AdapterDescriptionStorageImpl.java
index 8dd1fec..327e562 100644
--- a/streampipes-storage-couchdb/src/main/java/org/apache/streampipes/storage/couchdb/impl/AdapterStorageImpl.java
+++ b/streampipes-storage-couchdb/src/main/java/org/apache/streampipes/storage/couchdb/impl/AdapterDescriptionStorageImpl.java
@@ -18,26 +18,26 @@
 
 package org.apache.streampipes.storage.couchdb.impl;
 
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 import org.apache.streampipes.model.connect.adapter.AdapterDescription;
 import org.apache.streampipes.storage.api.IAdapterStorage;
 import org.apache.streampipes.storage.couchdb.dao.AbstractDao;
 import org.apache.streampipes.storage.couchdb.dao.DbCommand;
 import org.apache.streampipes.storage.couchdb.dao.FindCommand;
 import org.apache.streampipes.storage.couchdb.utils.Utils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import java.util.List;
 import java.util.Optional;
 
-public class AdapterStorageImpl extends AbstractDao<AdapterDescription> implements IAdapterStorage {
+public class AdapterDescriptionStorageImpl extends AbstractDao<AdapterDescription> implements IAdapterStorage {
 
-    Logger LOG = LoggerFactory.getLogger(AdapterStorageImpl.class);
+    Logger LOG = LoggerFactory.getLogger(AdapterDescriptionStorageImpl.class);
 
     private static final String SYSTEM_USER = "system";
 
-    public AdapterStorageImpl() {
-        super(Utils::getCouchDbAdapterClient, AdapterDescription.class);
+    public AdapterDescriptionStorageImpl() {
+        super(Utils::getCouchDbAdapterDescriptionClient, AdapterDescription.class);
     }
 
     @Override
diff --git a/streampipes-storage-couchdb/src/main/java/org/apache/streampipes/storage/couchdb/impl/AdapterStorageImpl.java b/streampipes-storage-couchdb/src/main/java/org/apache/streampipes/storage/couchdb/impl/AdapterInstanceStorageImpl.java
similarity index 88%
rename from streampipes-storage-couchdb/src/main/java/org/apache/streampipes/storage/couchdb/impl/AdapterStorageImpl.java
rename to streampipes-storage-couchdb/src/main/java/org/apache/streampipes/storage/couchdb/impl/AdapterInstanceStorageImpl.java
index 8dd1fec..da84fb8 100644
--- a/streampipes-storage-couchdb/src/main/java/org/apache/streampipes/storage/couchdb/impl/AdapterStorageImpl.java
+++ b/streampipes-storage-couchdb/src/main/java/org/apache/streampipes/storage/couchdb/impl/AdapterInstanceStorageImpl.java
@@ -30,14 +30,14 @@ import org.apache.streampipes.storage.couchdb.utils.Utils;
 import java.util.List;
 import java.util.Optional;
 
-public class AdapterStorageImpl extends AbstractDao<AdapterDescription> implements IAdapterStorage {
+public class AdapterInstanceStorageImpl extends AbstractDao<AdapterDescription> implements IAdapterStorage {
 
-    Logger LOG = LoggerFactory.getLogger(AdapterStorageImpl.class);
+    Logger LOG = LoggerFactory.getLogger(AdapterInstanceStorageImpl.class);
 
     private static final String SYSTEM_USER = "system";
 
-    public AdapterStorageImpl() {
-        super(Utils::getCouchDbAdapterClient, AdapterDescription.class);
+    public AdapterInstanceStorageImpl() {
+        super(Utils::getCouchDbAdapterInstanceClient, AdapterDescription.class);
     }
 
     @Override
diff --git a/streampipes-storage-couchdb/src/main/java/org/apache/streampipes/storage/couchdb/impl/AdapterTemplateStorageImpl.java b/streampipes-storage-couchdb/src/main/java/org/apache/streampipes/storage/couchdb/impl/AdapterTemplateStorageImpl.java
index 99d5373..b7fed7b 100644
--- a/streampipes-storage-couchdb/src/main/java/org/apache/streampipes/storage/couchdb/impl/AdapterTemplateStorageImpl.java
+++ b/streampipes-storage-couchdb/src/main/java/org/apache/streampipes/storage/couchdb/impl/AdapterTemplateStorageImpl.java
@@ -30,9 +30,10 @@ import org.apache.streampipes.storage.couchdb.utils.Utils;
 import java.util.List;
 import java.util.Optional;
 
+@Deprecated
 public class AdapterTemplateStorageImpl extends AbstractDao<AdapterDescription> implements IAdapterTemplateStorage {
 
-  Logger LOG = LoggerFactory.getLogger(AdapterStorageImpl.class);
+  Logger LOG = LoggerFactory.getLogger(AdapterInstanceStorageImpl.class);
 
   private static final String SYSTEM_USER = "system";
 
diff --git a/streampipes-storage-couchdb/src/main/java/org/apache/streampipes/storage/couchdb/utils/Utils.java b/streampipes-storage-couchdb/src/main/java/org/apache/streampipes/storage/couchdb/utils/Utils.java
index bbfd343..ed360bc 100644
--- a/streampipes-storage-couchdb/src/main/java/org/apache/streampipes/storage/couchdb/utils/Utils.java
+++ b/streampipes-storage-couchdb/src/main/java/org/apache/streampipes/storage/couchdb/utils/Utils.java
@@ -68,8 +68,12 @@ public class Utils {
     return getCouchDbGsonClient("assetdashboard");
   }
 
-  public static CouchDbClient getCouchDbAdapterClient() {
-    return getCouchDbAdapterClient("adapter");
+  public static CouchDbClient getCouchDbAdapterInstanceClient() {
+    return getCouchDbAdapterClient("adapterinstance");
+  }
+
+  public static CouchDbClient getCouchDbAdapterDescriptionClient() {
+    return getCouchDbAdapterClient("adapterdescription");
   }
 
   public static CouchDbClient getCouchDbPipelineClient() {
diff --git a/ui/src/app/connect/components/data-marketplace/data-marketplace.component.ts b/ui/src/app/connect/components/data-marketplace/data-marketplace.component.ts
index 2d0a49e..26c1b8d 100644
--- a/ui/src/app/connect/components/data-marketplace/data-marketplace.component.ts
+++ b/ui/src/app/connect/components/data-marketplace/data-marketplace.component.ts
@@ -91,10 +91,10 @@ export class DataMarketplaceComponent implements OnInit {
     this.adapterDescriptions = [];
 
     this.dataMarketplaceService
-      .getGenericAndSpecificAdapterDescriptions()
+      .getAdapterDescriptions()
       .subscribe((allAdapters) => {
-        this.adapterDescriptions = this.adapterDescriptions.concat(allAdapters[0]);
-        this.adapterDescriptions = this.adapterDescriptions.concat(allAdapters[1]);
+        this.adapterDescriptions = allAdapters;
+        // this.adapterDescriptions = this.adapterDescriptions.concat(allAdapters[1]);
         this.adapterDescriptions
           .sort((a, b) => a.name.localeCompare(b.name));
         this.filteredAdapterDescriptions = this.adapterDescriptions;
@@ -105,16 +105,16 @@ export class DataMarketplaceComponent implements OnInit {
         this.adapterLoadingError = true;
       });
 
-    this.dataMarketplaceService.getAdapterTemplates().subscribe(adapterTemplates => {
-      adapterTemplates.forEach((adapterTemplate) => {
-        (adapterTemplate as any).isTemplate = true;
-      });
-
-      this.adapterDescriptions = this.adapterDescriptions.concat(adapterTemplates);
-      this.adapterDescriptions
-        .sort((a, b) => a.name.localeCompare(b.name));
-      this.filteredAdapterDescriptions = this.adapterDescriptions;
-    });
+    // this.dataMarketplaceService.getAdapterTemplates().subscribe(adapterTemplates => {
+    //   adapterTemplates.forEach((adapterTemplate) => {
+    //     (adapterTemplate as any).isTemplate = true;
+    //   });
+    //
+    //   this.adapterDescriptions = this.adapterDescriptions.concat(adapterTemplates);
+    //   this.adapterDescriptions
+    //     .sort((a, b) => a.name.localeCompare(b.name));
+    //   this.filteredAdapterDescriptions = this.adapterDescriptions;
+    // });
   }
 
   getAdaptersRunning(): void {
diff --git a/ui/src/app/connect/services/data-marketplace.service.ts b/ui/src/app/connect/services/data-marketplace.service.ts
index 9f1a9b7..50f73bd 100644
--- a/ui/src/app/connect/services/data-marketplace.service.ts
+++ b/ui/src/app/connect/services/data-marketplace.service.ts
@@ -23,20 +23,14 @@ import { AuthStatusService } from '../../services/auth-status.service';
 import { ConnectService } from './connect.service';
 import {
   AdapterDescription,
-  AdapterDescriptionList,
   AdapterDescriptionUnion,
-  EventSchema,
   GenericAdapterSetDescription,
   GenericAdapterStreamDescription,
   Message,
-  ProtocolDescription,
-  ProtocolDescriptionList,
-  SpDataSet,
-  SpDataStream,
   SpecificAdapterSetDescription,
   SpecificAdapterStreamDescription
 } from '../../core-model/gen/streampipes-model';
-import { Observable, zip } from 'rxjs';
+import { Observable } from 'rxjs';
 import { PlatformServicesCommons } from '../../platform-services/apis/commons.service';
 
 @Injectable()
@@ -54,21 +48,14 @@ export class DataMarketplaceService {
   }
 
   getAdapterDescriptions(): Observable<AdapterDescriptionUnion[]> {
-    return this.requestAdapterDescriptions('/master/description/adapters').pipe(map(response => {
-      return (response as any[])
-        .map(resp => AdapterDescription.fromDataUnion(resp))
-        .filter(ad => this.connectService.isSpecificDescription(ad));
-    }));
+    return this.requestAdapterDescriptions('/master/description/adapters');
+
   }
 
   getAdapters(): Observable<AdapterDescriptionUnion[]> {
     return this.requestAdapterDescriptions('/master/adapters');
   }
 
-  getAdapterTemplates(): Observable<AdapterDescriptionUnion[]> {
-    return this.requestAdapterDescriptions('/master/adapters/template/all');
-  }
-
   requestAdapterDescriptions(path: string): Observable<AdapterDescriptionUnion[]> {
     return this.http
       .get(
@@ -76,8 +63,7 @@ export class DataMarketplaceService {
         path
       )
       .pipe(map(response => {
-        const adapterDescriptionList: AdapterDescriptionList = AdapterDescriptionList.fromData(response as AdapterDescriptionList);
-        return adapterDescriptionList.list;
+        return (response as any[]).map(p => AdapterDescription.fromDataUnion(p));
       }));
   }
 
@@ -120,50 +106,53 @@ export class DataMarketplaceService {
     );
   }
 
-  getProtocols(): Observable<AdapterDescriptionUnion[]> {
-    return this.http
-      .get(
-        `${this.connectPath}/master/description/protocols`
-      )
-      .pipe(map(response => {
-        const adapterDescriptions: AdapterDescriptionUnion[] = [];
-        const protocols: ProtocolDescription[] = (ProtocolDescriptionList.fromData(response as ProtocolDescriptionList)).list;
-
-        for (const protocol of protocols) {
-          let newAdapterDescription: AdapterDescriptionUnion;
-          if (protocol.sourceType === 'SET') {
-            newAdapterDescription = new GenericAdapterSetDescription();
-            newAdapterDescription['@class'] = 'org.apache.streampipes.model.connect.adapter.GenericAdapterSetDescription';
-            newAdapterDescription.dataSet = new SpDataSet();
-            newAdapterDescription.dataSet.eventSchema = new EventSchema();
-          } else if (protocol.sourceType === 'STREAM') {
-            newAdapterDescription = new GenericAdapterStreamDescription();
-            newAdapterDescription['@class'] = 'org.apache.streampipes.model.connect.adapter.GenericAdapterStreamDescription';
-            newAdapterDescription.dataStream = new SpDataStream();
-            newAdapterDescription.dataStream.eventSchema = new EventSchema();
-          }
-          newAdapterDescription.appId = protocol.appId;
-          newAdapterDescription.name = protocol.name;
-          newAdapterDescription.description = protocol.description;
-          newAdapterDescription.iconUrl = protocol.iconUrl;
-          newAdapterDescription.uri = newAdapterDescription.elementId;
-          newAdapterDescription.category = protocol.category;
-          newAdapterDescription.includedAssets = protocol.includedAssets;
-          newAdapterDescription.includesAssets = protocol.includesAssets;
-          newAdapterDescription.includedLocales = protocol.includedLocales;
-          newAdapterDescription.includesLocales = protocol.includesLocales;
-
-          if (
-            newAdapterDescription instanceof GenericAdapterSetDescription ||
-            newAdapterDescription instanceof GenericAdapterStreamDescription
-          ) {
-            newAdapterDescription.protocolDescription = protocol;
-          }
-          adapterDescriptions.push(newAdapterDescription);
-        }
-        return adapterDescriptions;
-      }));
-  }
+  /**
+   * @deprecated The method should not be used
+   */
+  // getProtocols(): Observable<AdapterDescriptionUnion[]> {
+  //   return this.http
+  //     .get(
+  //       `${this.connectPath}/master/description/protocols`
+  //     )
+  //     .pipe(map(response => {
+  //       const adapterDescriptions: AdapterDescriptionUnion[] = [];
+  //       const protocols: ProtocolDescription[] = (ProtocolDescriptionList.fromData(response as ProtocolDescriptionList)).list;
+  //
+  //       for (const protocol of protocols) {
+  //         let newAdapterDescription: AdapterDescriptionUnion;
+  //         if (protocol.sourceType === 'SET') {
+  //           newAdapterDescription = new GenericAdapterSetDescription();
+  //           newAdapterDescription['@class'] = 'org.apache.streampipes.model.connect.adapter.GenericAdapterSetDescription';
+  //           newAdapterDescription.dataSet = new SpDataSet();
+  //           newAdapterDescription.dataSet.eventSchema = new EventSchema();
+  //         } else if (protocol.sourceType === 'STREAM') {
+  //           newAdapterDescription = new GenericAdapterStreamDescription();
+  //           newAdapterDescription['@class'] = 'org.apache.streampipes.model.connect.adapter.GenericAdapterStreamDescription';
+  //           newAdapterDescription.dataStream = new SpDataStream();
+  //           newAdapterDescription.dataStream.eventSchema = new EventSchema();
+  //         }
+  //         newAdapterDescription.appId = protocol.appId;
+  //         newAdapterDescription.name = protocol.name;
+  //         newAdapterDescription.description = protocol.description;
+  //         newAdapterDescription.iconUrl = protocol.iconUrl;
+  //         newAdapterDescription.uri = newAdapterDescription.elementId;
+  //         newAdapterDescription.category = protocol.category;
+  //         newAdapterDescription.includedAssets = protocol.includedAssets;
+  //         newAdapterDescription.includesAssets = protocol.includesAssets;
+  //         newAdapterDescription.includedLocales = protocol.includedLocales;
+  //         newAdapterDescription.includesLocales = protocol.includesLocales;
+  //
+  //         if (
+  //           newAdapterDescription instanceof GenericAdapterSetDescription ||
+  //           newAdapterDescription instanceof GenericAdapterStreamDescription
+  //         ) {
+  //           newAdapterDescription.protocolDescription = protocol;
+  //         }
+  //         adapterDescriptions.push(newAdapterDescription);
+  //       }
+  //       return adapterDescriptions;
+  //     }));
+  // }
 
   // sortStaticProperties(sp: StaticProperty) {
   //   if (sp instanceof AlternativesStaticProperty) {
@@ -182,9 +171,9 @@ export class DataMarketplaceService {
   //   }
   // }
 
-  getGenericAndSpecificAdapterDescriptions(): Observable<[AdapterDescriptionUnion[], AdapterDescriptionUnion[]]> {
-    return zip(this.getAdapterDescriptions(), this.getProtocols());
-  }
+  // getGenericAndSpecificAdapterDescriptions(): Observable<[AdapterDescriptionUnion[], AdapterDescriptionUnion[]]> {
+  //   return zip(this.getAdapterDescriptions(), this.getProtocols());
+  // }
 
   cloneAdapterDescription(toClone: AdapterDescriptionUnion): AdapterDescriptionUnion {
     let result: AdapterDescriptionUnion;