You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@streampipes.apache.org by ri...@apache.org on 2021/06/21 20:47:47 UTC

[incubator-streampipes] branch STREAMPIPES-319 updated: [STREAMPIPES-319] Improve service discovery for Connect worker and pipelines

This is an automated email from the ASF dual-hosted git repository.

riemer pushed a commit to branch STREAMPIPES-319
in repository https://gitbox.apache.org/repos/asf/incubator-streampipes.git


The following commit(s) were added to refs/heads/STREAMPIPES-319 by this push:
     new adefce4  [STREAMPIPES-319] Improve service discovery for Connect worker and pipelines
adefce4 is described below

commit adefce45d3343c2502db56f10069b71d34845732
Author: Dominik Riemer <ri...@fzi.de>
AuthorDate: Mon Jun 21 22:47:29 2021 +0200

    [STREAMPIPES-319] Improve service discovery for Connect worker and pipelines
---
 streampipes-backend/pom.xml                        |   4 +
 .../backend/StreamPipesBackendApplication.java     |  17 ++--
 .../backend/StreamPipesResourceConfig.java         |   2 +-
 .../NoServiceEndpointsAvailableException.java      |   9 +-
 .../master/management/AdapterMasterManagement.java |  44 ++++-----
 .../master/management/DescriptionManagement.java   |  23 ++---
 .../master/management/GuessManagement.java         |  21 ++--
 .../master/management/SourcesManagement.java       |  14 ++-
 .../connect/container/master/management/Utils.java |  58 -----------
 .../management/WorkerAdministrationManagement.java |  32 +++---
 .../master/management/WorkerRestClient.java        |  77 +++++----------
 .../master/management/WorkerUrlProvider.java       |  81 +++++++++++++++
 .../container/master/rest/AdapterResource.java     |  32 +++---
 .../container/master/rest/DescriptionResource.java |  41 ++++----
 .../master/rest/RuntimeResolvableResource.java     |  16 +--
 .../container/master/rest/SourcesResource.java     |   5 +-
 .../master/rest/WorkerAdministrationResource.java  |   2 +-
 .../connect/container/master/util/Utils.java       |  15 ++-
 .../connect/container/master/util/WorkerPaths.java |  68 +++++++++++++
 .../master/management/WorkerRestClientTest.java    |   4 +-
 .../init/AdapterServiceResourceProvider.java       |   2 +-
 .../worker/init/AdapterWorkerContainer.java        |   7 +-
 .../init/ConnectWorkerDescriptionProvider.java     |  19 +---
 .../init/ConnectWorkerRegistrationService.java     |   9 +-
 .../worker/init/ConnectWorkerTagProvider.java      |   3 +-
 .../container/worker/rest/AdapterResource.java     |   2 +-
 .../container/worker/rest/GuessResource.java       |   2 +-
 .../worker/rest/HttpServerAdapterResource.java     |   2 +-
 .../container/worker/rest/ProtocolResource.java    |   2 +-
 .../worker/rest/RuntimeResolvableResource.java     |   2 +-
 .../container/worker/rest/WelcomePageWorker.java   |   2 +-
 .../container/worker/rest/WorkerResource.java      |   2 +-
 .../container/base/StreamPipesServiceBase.java     |  19 +++-
 .../extensions/ExtensionsModelSubmitter.java       |   2 +-
 .../init/PipelineElementServiceTagProvider.java    |   3 +-
 .../api/DataProcessorPipelineElementResource.java  |   4 +-
 .../api/DataSinkPipelineElementResource.java       |   4 +-
 .../api/DataStreamPipelineElementResource.java     |   4 +-
 .../container/html/page/WelcomePageGenerator.java  |   7 +-
 ...ndpoint.java => ExtensionsServiceEndpoint.java} |   6 +-
 ...tem.java => ExtensionsServiceEndpointItem.java} |  10 +-
 .../model/connect/adapter/AdapterDescription.java  |  11 +++
 .../connect/worker/ConnectWorkerContainer.java     |  29 +++---
 .../model/dashboard/DashboardEntity.java           |   3 +-
 .../streampipes/manager/assets/AssetFetcher.java   |  15 +--
 .../streampipes/manager/assets/AssetManager.java   |   9 +-
 .../manager/endpoint/EndpointFetcher.java          |  22 ++---
 .../manager/endpoint/EndpointItemFetcher.java      |  20 ++--
 .../http/ExtensionsServiceEndpointGenerator.java   |  65 ++++++++++++
 .../http/PipelineElementEndpointGenerator.java     |  53 ----------
 .../manager/execution/http/PipelineExecutor.java   |  63 ++++++++----
 .../streampipes/manager/operations/Operations.java |   6 +-
 .../manager/setup/CouchDbInstallationStep.java     |   8 +-
 .../manager/setup/InstallationConfiguration.java   |   4 +-
 .../setup/PipelineElementInstallationStep.java     |  12 +--
 .../verification/DataProcessorVerifier.java        |   9 +-
 .../manager/verification/DataSinkVerifier.java     |   9 +-
 .../manager/verification/DataStreamVerifier.java   |   9 +-
 .../manager/verification/ElementVerifier.java      |   9 +-
 ...java => ExtensionsServiceEndpointResource.java} |  39 ++++----
 .../svcdiscovery/api/ISpServiceDiscovery.java      |  16 ++-
 .../api/model/DefaultSpServiceGroups.java          |   1 +
 .../api/model/DefaultSpServiceTags.java            |  10 +-
 .../api/model/SpServicePathPrefix.java             |   7 +-
 .../api/model/SpServiceRegistrationRequest.java    | 109 +++++++++++++++++++++
 .../svcdiscovery/api/model/SpServiceTagPrefix.java |   3 +-
 .../api/model/SpServiceUrlProvider.java            |  27 +++--
 .../consul/SpConsulServiceDiscovery.java           |  67 ++++++-------
 .../consul/model/HealthCheckConfiguration.java     |  15 ++-
 .../base/StreamPipesExtensionsServiceBase.java     |   3 +-
 ...java => IExtensionsServiceEndpointStorage.java} |  10 +-
 .../streampipes/storage/api/INoSqlStorage.java     |   2 +-
 .../storage/couchdb/CouchDbStorageManager.java     |   4 +-
 ...a => ExtensionsServiceEndpointStorageImpl.java} |  37 ++++---
 .../connect/services/data-marketplace.service.ts   |   6 +-
 ui/src/app/core-model/gen/streampipes-model.ts     |  26 ++---
 76 files changed, 831 insertions(+), 585 deletions(-)

diff --git a/streampipes-backend/pom.xml b/streampipes-backend/pom.xml
index e90508f..1185e21 100644
--- a/streampipes-backend/pom.xml
+++ b/streampipes-backend/pom.xml
@@ -69,6 +69,10 @@
             <groupId>io.swagger.core.v3</groupId>
             <artifactId>swagger-jaxrs2</artifactId>
         </dependency>
+        <dependency>
+            <groupId>org.springframework.boot</groupId>
+            <artifactId>spring-boot-starter-web</artifactId>
+        </dependency>
 
         <!-- Dependency convergence -->
         <dependency>
diff --git a/streampipes-backend/src/main/java/org/apache/streampipes/backend/StreamPipesBackendApplication.java b/streampipes-backend/src/main/java/org/apache/streampipes/backend/StreamPipesBackendApplication.java
index 43ad4a8..99e650e 100644
--- a/streampipes-backend/src/main/java/org/apache/streampipes/backend/StreamPipesBackendApplication.java
+++ b/streampipes-backend/src/main/java/org/apache/streampipes/backend/StreamPipesBackendApplication.java
@@ -31,7 +31,6 @@ import org.apache.streampipes.storage.management.StorageDispatcher;
 import org.apache.streampipes.svcdiscovery.api.model.DefaultSpServiceGroups;
 import org.apache.streampipes.svcdiscovery.api.model.DefaultSpServiceTags;
 import org.apache.streampipes.svcdiscovery.api.model.SpServiceTag;
-import org.apache.streampipes.svcdiscovery.api.model.SpServiceTagPrefix;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.springframework.boot.autoconfigure.EnableAutoConfiguration;
@@ -45,7 +44,10 @@ import javax.annotation.PostConstruct;
 import javax.annotation.PreDestroy;
 import javax.servlet.ServletContextListener;
 import java.net.UnknownHostException;
-import java.util.*;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
 import java.util.concurrent.Executors;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
@@ -237,13 +239,14 @@ public class StreamPipesBackendApplication extends StreamPipesServiceBase {
   @Override
   protected List<SpServiceTag> getServiceTags() {
     return Arrays.asList(
-            createSysTag(DefaultSpServiceTags.CORE),
-            createSysTag(DefaultSpServiceTags.CONNECT_MASTER),
-            createSysTag(DefaultSpServiceTags.STREAMPIPES_CLIENT)
+            DefaultSpServiceTags.CORE,
+            DefaultSpServiceTags.CONNECT_MASTER,
+            DefaultSpServiceTags.STREAMPIPES_CLIENT
     );
   }
 
-  private SpServiceTag createSysTag(String value) {
-    return SpServiceTag.create(SpServiceTagPrefix.SYSTEM, value);
+  @Override
+  protected String getHealthCheckPath() {
+    return "/streampipes-backend";
   }
 }
diff --git a/streampipes-backend/src/main/java/org/apache/streampipes/backend/StreamPipesResourceConfig.java b/streampipes-backend/src/main/java/org/apache/streampipes/backend/StreamPipesResourceConfig.java
index 748a0da..1c9f03b 100644
--- a/streampipes-backend/src/main/java/org/apache/streampipes/backend/StreamPipesResourceConfig.java
+++ b/streampipes-backend/src/main/java/org/apache/streampipes/backend/StreamPipesResourceConfig.java
@@ -82,7 +82,7 @@ public class StreamPipesResourceConfig extends ResourceConfig {
     register(PipelineNoUserResource.class);
     register(PipelineTemplate.class);
     register(PipelineResource.class);
-    register(RdfEndpoint.class);
+    register(ExtensionsServiceEndpointResource.class);
     register(SemanticEventConsumer.class);
     register(SemanticEventProcessingAgent.class);
     register(SemanticEventProducer.class);
diff --git a/streampipes-service-discovery-api/src/main/java/org/apache/streampipes/svcdiscovery/api/model/DefaultSpServiceGroups.java b/streampipes-commons/src/main/java/org/apache/streampipes/commons/exceptions/NoServiceEndpointsAvailableException.java
similarity index 79%
copy from streampipes-service-discovery-api/src/main/java/org/apache/streampipes/svcdiscovery/api/model/DefaultSpServiceGroups.java
copy to streampipes-commons/src/main/java/org/apache/streampipes/commons/exceptions/NoServiceEndpointsAvailableException.java
index d263f07..13f4e38 100644
--- a/streampipes-service-discovery-api/src/main/java/org/apache/streampipes/svcdiscovery/api/model/DefaultSpServiceGroups.java
+++ b/streampipes-commons/src/main/java/org/apache/streampipes/commons/exceptions/NoServiceEndpointsAvailableException.java
@@ -15,10 +15,11 @@
  * limitations under the License.
  *
  */
-package org.apache.streampipes.svcdiscovery.api.model;
+package org.apache.streampipes.commons.exceptions;
 
-public class DefaultSpServiceGroups {
-
-  public static final String CORE = "core";
+public class NoServiceEndpointsAvailableException extends Exception {
 
+  public NoServiceEndpointsAvailableException(String message) {
+    super(message);
+  }
 }
diff --git a/streampipes-connect-container-master/src/main/java/org/apache/streampipes/connect/container/master/management/AdapterMasterManagement.java b/streampipes-connect-container-master/src/main/java/org/apache/streampipes/connect/container/master/management/AdapterMasterManagement.java
index c5703fc..faef9ee 100644
--- a/streampipes-connect-container-master/src/main/java/org/apache/streampipes/connect/container/master/management/AdapterMasterManagement.java
+++ b/streampipes-connect-container-master/src/main/java/org/apache/streampipes/connect/container/master/management/AdapterMasterManagement.java
@@ -18,11 +18,12 @@
 
 package org.apache.streampipes.connect.container.master.management;
 
+import org.apache.streampipes.commons.exceptions.NoServiceEndpointsAvailableException;
 import org.apache.streampipes.commons.exceptions.SepaParseException;
-import org.apache.streampipes.config.backend.BackendConfig;
 import org.apache.streampipes.connect.adapter.GroundingService;
 import org.apache.streampipes.connect.api.exception.AdapterException;
 import org.apache.streampipes.connect.container.master.util.AdapterEncryptionService;
+import org.apache.streampipes.connect.container.master.util.Utils;
 import org.apache.streampipes.manager.storage.UserService;
 import org.apache.streampipes.manager.verification.DataStreamVerifier;
 import org.apache.streampipes.model.SpDataStream;
@@ -50,16 +51,18 @@ public class AdapterMasterManagement {
   private static final Logger LOG = LoggerFactory.getLogger(AdapterMasterManagement.class);
 
   private IAdapterStorage adapterStorage;
+  private WorkerUrlProvider workerUrlProvider;
 
   public AdapterMasterManagement() {
     this.adapterStorage = getAdapterStorage();
+    this.workerUrlProvider = new WorkerUrlProvider();
   }
 
   public AdapterMasterManagement(IAdapterStorage adapterStorage) {
     this.adapterStorage = adapterStorage;
   }
 
-  public void startAllStreamAdapters(ConnectWorkerContainer connectWorkerContainer) throws AdapterException {
+  public void startAllStreamAdapters(ConnectWorkerContainer connectWorkerContainer) throws AdapterException, NoServiceEndpointsAvailableException {
     IAdapterStorage adapterStorage = getAdapterStorage();
     List<AdapterDescription> allAdapters = adapterStorage.getAllAdapters();
 
@@ -67,10 +70,10 @@ public class AdapterMasterManagement {
       if (ad instanceof AdapterStreamDescription) {
         AdapterDescription decryptedAdapterDescription =
                 new AdapterEncryptionService(new Cloner().adapterDescription(ad)).decrypt();
-        String wUrl = new Utils().getWorkerUrl(decryptedAdapterDescription);
+        String wUrl = workerUrlProvider.getWorkerUrlForAdapter(decryptedAdapterDescription);
 
-        if (wUrl.equals(connectWorkerContainer.getEndpointUrl())) {
-          String url = Utils.addUserNameToApi(connectWorkerContainer.getEndpointUrl(),
+        if (wUrl.equals(connectWorkerContainer.getServiceGroup())) {
+          String url = Utils.addUserNameToApi(connectWorkerContainer.getServiceGroup(),
                   decryptedAdapterDescription.getUserName());
 
           WorkerRestClient.invokeStreamAdapter(url, (AdapterStreamDescription) decryptedAdapterDescription);
@@ -81,7 +84,7 @@ public class AdapterMasterManagement {
   }
 
   public String addAdapter(AdapterDescription ad,
-                           String baseUrl,
+                           String endpointUrl,
                            String username)
           throws AdapterException {
 
@@ -90,12 +93,9 @@ public class AdapterMasterManagement {
     ad.setEventGrounding(eventGrounding);
 
     String uuid = UUID.randomUUID().toString();
-
-//    String newId = ConnectContainerConfig.INSTANCE.getConnectContainerMasterUrl() + "api/v1/" + username + "/master/sources/" + uuid;
-    String newId = BackendConfig.INSTANCE.getBackendApiUrl() + "api/v2/connect/" + username + "/master/sources/" + uuid;
-
-    ad.setElementId(newId);
+    ad.setElementId(ad.getElementId() + ":" + uuid);
     ad.setCreatedAt(System.currentTimeMillis());
+    ad.setSelectedEndpointUrl(endpointUrl);
 
     AdapterDescription encryptedAdapterDescription =
             new AdapterEncryptionService(new Cloner().adapterDescription(ad)).encrypt();
@@ -105,16 +105,12 @@ public class AdapterMasterManagement {
     // start when stream adapter
     if (ad instanceof AdapterStreamDescription) {
       // TODO
-      WorkerRestClient.invokeStreamAdapter(baseUrl, adapterId);
+      WorkerRestClient.invokeStreamAdapter(endpointUrl, adapterId);
       LOG.info("Start adapter");
     }
 
-    // backend url is used to install data source in streampipes
-    String backendBaseUrl = BackendConfig.INSTANCE.getBackendApiUrl() + "api/v2/";
-    String requestUrl = backendBaseUrl + "noauth/users/" + username + "/element";
-
-    LOG.info("Install source (source URL: " + newId + " in backend over URL: " + requestUrl);
-    SpDataStream storedDescription = new SourcesManagement().getAdapterDataStream(newId);
+    LOG.info("Install source (source URL: {} in backend", ad.getElementId());
+    SpDataStream storedDescription = new SourcesManagement().getAdapterDataStream(ad.getElementId());
     installDataSource(storedDescription, username);
 
     return storedDescription.getElementId();
@@ -124,7 +120,7 @@ public class AdapterMasterManagement {
     try {
       new DataStreamVerifier(stream).verifyAndAdd(username, true, true);
     } catch (SepaParseException e) {
-      LOG.error("Error while installing data source: " + stream.getElementId(), e);
+      LOG.error("Error while installing data source: {}", stream.getElementId(), e);
       throw new AdapterException();
     }
   }
@@ -143,14 +139,14 @@ public class AdapterMasterManagement {
     throw new AdapterException("Could not find adapter with id: " + id);
   }
 
-  public void deleteAdapter(String id, String baseUrl) throws AdapterException {
+  public void deleteAdapter(String id) throws AdapterException {
     //        // IF Stream adapter delete it
     boolean isStreamAdapter = isStreamAdapter(id);
+    AdapterDescription ad = adapterStorage.getAdapter(id);
 
     if (isStreamAdapter) {
-      stopStreamAdapter(id, baseUrl);
+      stopStreamAdapter(id, ad.getSelectedEndpointUrl());
     }
-    AdapterDescription ad = adapterStorage.getAdapter(id);
     String username = ad.getUserName();
 
     adapterStorage.deleteAdapter(id);
@@ -190,16 +186,16 @@ public class AdapterMasterManagement {
       throw new AdapterException("Adapter " + adapterId + "is not a stream adapter.");
     } else {
       WorkerRestClient.stopStreamAdapter(baseUrl, (AdapterStreamDescription) ad);
-
     }
   }
 
   public void startStreamAdapter(String adapterId, String baseUrl) throws AdapterException {
     AdapterDescription ad = adapterStorage.getAdapter(adapterId);
-
     if (!isStreamAdapter(adapterId)) {
       throw new AdapterException("Adapter " + adapterId + "is not a stream adapter.");
     } else {
+      ad.setSelectedEndpointUrl(baseUrl);
+      adapterStorage.updateAdapter(ad);
       WorkerRestClient.invokeStreamAdapter(baseUrl, (AdapterStreamDescription) ad);
     }
   }
diff --git a/streampipes-connect-container-master/src/main/java/org/apache/streampipes/connect/container/master/management/DescriptionManagement.java b/streampipes-connect-container-master/src/main/java/org/apache/streampipes/connect/container/master/management/DescriptionManagement.java
index 18a21a8..b2e60a0 100644
--- a/streampipes-connect-container-master/src/main/java/org/apache/streampipes/connect/container/master/management/DescriptionManagement.java
+++ b/streampipes-connect-container-master/src/main/java/org/apache/streampipes/connect/container/master/management/DescriptionManagement.java
@@ -87,27 +87,16 @@ public class DescriptionManagement {
                 .findFirst();
     }
 
-    public String getAdapterAssets(AdapterDescription desc, String baseUrl) throws AdapterException {
-        return WorkerRestClient.getAdapterAssets(baseUrl, desc);
+    public String getAssets(String baseUrl) throws AdapterException {
+        return WorkerRestClient.getAssets(baseUrl);
     }
 
-    public byte[] getAdapterIconAsset(AdapterDescription desc, String baseUrl) throws AdapterException {
-        return WorkerRestClient.getAdapterIconAsset(baseUrl, desc);
+    public byte[] getIconAsset(String baseUrl) throws AdapterException {
+        return WorkerRestClient.getIconAsset(baseUrl);
     }
 
-    public String getAdapterDocumentationAsset(AdapterDescription desc, String baseUrl) throws AdapterException {
-        return WorkerRestClient.getAdapterDocumentationAsset(baseUrl, desc);
+    public String getDocumentationAsset(String baseUrl) throws AdapterException {
+        return WorkerRestClient.getDocumentationAsset(baseUrl);
     }
 
-    public String getProtocolAssets(ProtocolDescription desc, String baseUrl) throws AdapterException {
-        return WorkerRestClient.getProtocolAssets(baseUrl, desc);
-    }
-
-    public byte[] getProtocolIconAsset(ProtocolDescription desc, String baseUrl) throws AdapterException {
-        return WorkerRestClient.getProtocolIconAsset(baseUrl, desc);
-    }
-
-    public String getProtocolDocumentationAsset(ProtocolDescription desc, String baseUrl) throws AdapterException {
-        return WorkerRestClient.getProtocolDocumentationAsset(baseUrl, desc);
-    }
 }
diff --git a/streampipes-connect-container-master/src/main/java/org/apache/streampipes/connect/container/master/management/GuessManagement.java b/streampipes-connect-container-master/src/main/java/org/apache/streampipes/connect/container/master/management/GuessManagement.java
index e825369..0d867bb 100644
--- a/streampipes-connect-container-master/src/main/java/org/apache/streampipes/connect/container/master/management/GuessManagement.java
+++ b/streampipes-connect-container-master/src/main/java/org/apache/streampipes/connect/container/master/management/GuessManagement.java
@@ -25,9 +25,11 @@ import org.apache.http.client.fluent.Request;
 import org.apache.http.client.fluent.Response;
 import org.apache.http.entity.ContentType;
 import org.apache.http.util.EntityUtils;
+import org.apache.streampipes.commons.exceptions.NoServiceEndpointsAvailableException;
 import org.apache.streampipes.connect.api.exception.AdapterException;
 import org.apache.streampipes.connect.api.exception.ParseException;
 import org.apache.streampipes.connect.api.exception.WorkerAdapterException;
+import org.apache.streampipes.connect.container.master.util.WorkerPaths;
 import org.apache.streampipes.model.connect.adapter.AdapterDescription;
 import org.apache.streampipes.model.connect.guess.GuessSchema;
 import org.apache.streampipes.model.message.ErrorMessage;
@@ -40,19 +42,19 @@ import java.io.IOException;
 public class GuessManagement {
 
     private static Logger LOG = LoggerFactory.getLogger(GuessManagement.class);
+    private WorkerUrlProvider workerUrlProvider;
 
     public GuessManagement() {
-
+        this.workerUrlProvider = new WorkerUrlProvider();
     }
 
     public GuessSchema guessSchema(AdapterDescription adapterDescription) throws AdapterException, ParseException, WorkerAdapterException {
-        String workerUrl = new Utils().getWorkerUrl(adapterDescription);
-
-        workerUrl = workerUrl + "api/v1/admin@streampipes.de/worker/guess/schema";
+        try {
+            String workerUrl = workerUrlProvider.getWorkerBaseUrl(adapterDescription.getAppId());
 
-        ObjectMapper mapper = JacksonSerializer.getObjectMapper();
+            workerUrl = workerUrl + WorkerPaths.getGuessSchemaPath();
 
-        try {
+            ObjectMapper mapper = JacksonSerializer.getObjectMapper();
             String ad = mapper.writeValueAsString(adapterDescription);
             LOG.info("Guess schema at: " + workerUrl);
             Response requestResponse = Request.Post(workerUrl)
@@ -65,8 +67,7 @@ public class GuessManagement {
             String responseString = EntityUtils.toString(httpResponse.getEntity());
 
             if (httpResponse.getStatusLine().getStatusCode() == HttpStatus.SC_OK) {
-                GuessSchema guessSchema = mapper.readValue(responseString, GuessSchema.class);
-                return guessSchema;
+                return mapper.readValue(responseString, GuessSchema.class);
             }  else {
                     ErrorMessage errorMessage = mapper.readValue(responseString, ErrorMessage.class);
 
@@ -74,9 +75,9 @@ public class GuessManagement {
                     throw new WorkerAdapterException(errorMessage);
             }
 
-        } catch (IOException e) {
+        } catch (IOException | NoServiceEndpointsAvailableException e) {
             LOG.error(e.getMessage());
-            throw new AdapterException("Error in connect worker: " + workerUrl, e);
+            throw new AdapterException("Error in connect worker: ", e);
         }
     }
 
diff --git a/streampipes-connect-container-master/src/main/java/org/apache/streampipes/connect/container/master/management/SourcesManagement.java b/streampipes-connect-container-master/src/main/java/org/apache/streampipes/connect/container/master/management/SourcesManagement.java
index 94f9148..bba0366 100644
--- a/streampipes-connect-container-master/src/main/java/org/apache/streampipes/connect/container/master/management/SourcesManagement.java
+++ b/streampipes-connect-container-master/src/main/java/org/apache/streampipes/connect/container/master/management/SourcesManagement.java
@@ -18,9 +18,11 @@
 
 package org.apache.streampipes.connect.container.master.management;
 
+import org.apache.streampipes.commons.exceptions.NoServiceEndpointsAvailableException;
 import org.apache.streampipes.connect.api.exception.AdapterException;
 import org.apache.streampipes.connect.adapter.util.TransportFormatGenerator;
 import org.apache.streampipes.connect.container.master.util.AdapterEncryptionService;
+import org.apache.streampipes.connect.container.master.util.Utils;
 import org.apache.streampipes.container.html.JSONGenerator;
 import org.apache.streampipes.container.html.model.DataSourceDescriptionHtml;
 import org.apache.streampipes.container.html.model.Description;
@@ -47,6 +49,7 @@ public class SourcesManagement {
     private Logger logger = LoggerFactory.getLogger(SourcesManagement.class);
 
     private AdapterStorageImpl adapterStorage;
+    private WorkerUrlProvider workerUrlProvider;
     private String connectHost = null;
 
     public SourcesManagement(AdapterStorageImpl adapterStorage) {
@@ -54,10 +57,11 @@ public class SourcesManagement {
     }
 
     public SourcesManagement() {
-        this.adapterStorage = new AdapterStorageImpl();
+        this(new AdapterStorageImpl());
+       this.workerUrlProvider = new WorkerUrlProvider();
     }
 
-    public void addAdapter(String streamId, SpDataSet dataSet, String username) throws AdapterException {
+    public void addAdapter(String streamId, SpDataSet dataSet, String username) throws AdapterException, NoServiceEndpointsAvailableException {
 
 
         String newUrl = getAdapterUrl(streamId, username);
@@ -74,7 +78,7 @@ public class SourcesManagement {
         WorkerRestClient.invokeSetAdapter(newUrl, decryptedAdapterDescription);
     }
 
-    public void detachAdapter(String streamId, String runningInstanceId, String username) throws AdapterException {
+    public void detachAdapter(String streamId, String runningInstanceId, String username) throws AdapterException, NoServiceEndpointsAvailableException {
         AdapterSetDescription adapterDescription = (AdapterSetDescription) getAdapterDescriptionById(streamId);
 
         String newId = adapterDescription.getUri() + "/streams/" + runningInstanceId;
@@ -85,7 +89,7 @@ public class SourcesManagement {
         WorkerRestClient.stopSetAdapter(newUrl, adapterDescription);
     }
 
-    private String getAdapterUrl(String streamId, String username) {
+    private String getAdapterUrl(String streamId, String username) throws NoServiceEndpointsAvailableException {
         String appId = "";
         List<AdapterDescription> adapterDescriptions = this.adapterStorage.getAllAdapters();
         for (AdapterDescription ad : adapterDescriptions) {
@@ -93,7 +97,7 @@ public class SourcesManagement {
                 appId = ad.getAppId();
             }
         }
-        String workerUrl = new Utils().getWorkerUrlById(appId);
+        String workerUrl = workerUrlProvider.getWorkerBaseUrl(appId);
 
         return Utils.addUserNameToApi(workerUrl, username);
 
diff --git a/streampipes-connect-container-master/src/main/java/org/apache/streampipes/connect/container/master/management/Utils.java b/streampipes-connect-container-master/src/main/java/org/apache/streampipes/connect/container/master/management/Utils.java
deleted file mode 100644
index bd53be0..0000000
--- a/streampipes-connect-container-master/src/main/java/org/apache/streampipes/connect/container/master/management/Utils.java
+++ /dev/null
@@ -1,58 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-
-package org.apache.streampipes.connect.container.master.management;
-
-import org.apache.streampipes.model.connect.adapter.AdapterDescription;
-import org.apache.streampipes.model.connect.adapter.GenericAdapterDescription;
-import org.apache.streampipes.model.connect.grounding.ProtocolDescription;
-
-public class Utils {
-    private WorkerAdministrationManagement workerAdministrationManagement;
-
-    public Utils() {
-        this.workerAdministrationManagement = new WorkerAdministrationManagement();
-    }
-
-    public static String addUserNameToApi(String url, String userName) {
-        return  url + "api/v1/" + userName + "/";
-    }
-
-    public String getWorkerUrl(AdapterDescription adapterDescription) {
-        String id = "";
-
-        if (adapterDescription instanceof GenericAdapterDescription) {
-            id = ((GenericAdapterDescription) (adapterDescription)).getProtocolDescription().getAppId();
-        } else {
-            id = adapterDescription.getAppId();
-        }
-
-        return this.workerAdministrationManagement.getWorkerUrl(id);
-    }
-
-    public String getWorkerUrl(ProtocolDescription protocolDescription) {
-        String id =  protocolDescription.getAppId();
-
-        return this.workerAdministrationManagement.getWorkerUrl(id);
-    }
-
-
-    public String getWorkerUrlById(String id) {
-        return this.workerAdministrationManagement.getWorkerUrl(id);
-    }
-}
diff --git a/streampipes-connect-container-master/src/main/java/org/apache/streampipes/connect/container/master/management/WorkerAdministrationManagement.java b/streampipes-connect-container-master/src/main/java/org/apache/streampipes/connect/container/master/management/WorkerAdministrationManagement.java
index ed3e99a..74d627e 100644
--- a/streampipes-connect-container-master/src/main/java/org/apache/streampipes/connect/container/master/management/WorkerAdministrationManagement.java
+++ b/streampipes-connect-container-master/src/main/java/org/apache/streampipes/connect/container/master/management/WorkerAdministrationManagement.java
@@ -18,13 +18,14 @@
 
 package org.apache.streampipes.connect.container.master.management;
 
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
+import org.apache.streampipes.commons.exceptions.NoServiceEndpointsAvailableException;
 import org.apache.streampipes.connect.api.exception.AdapterException;
 import org.apache.streampipes.model.connect.adapter.AdapterDescription;
 import org.apache.streampipes.model.connect.grounding.ProtocolDescription;
 import org.apache.streampipes.model.connect.worker.ConnectWorkerContainer;
 import org.apache.streampipes.storage.couchdb.impl.ConnectionWorkerContainerStorageImpl;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import java.util.List;
 
@@ -48,17 +49,17 @@ public class WorkerAdministrationManagement {
 
         boolean alreadyRegistered = false;
         for (ConnectWorkerContainer c : allConnectWorkerContainers) {
-            if (c.getEndpointUrl().equals(connectWorker.getEndpointUrl())) {
+            if (c.getServiceGroup().equals(connectWorker.getServiceGroup())) {
                 boolean adaptersChanged = false;
 
                 for (AdapterDescription a : c.getAdapters()) {
-                    if (!connectWorker.getAdapters().stream().anyMatch(ad -> ad.getAdapterId().equals(a.getAdapterId()))) {
+                    if (connectWorker.getAdapters().stream().noneMatch(ad -> ad.getAdapterId().equals(a.getAdapterId()))) {
                         adaptersChanged = true;
                     }
                 }
 
                 for (ProtocolDescription p : c.getProtocols()) {
-                    if (!connectWorker.getProtocols().stream().anyMatch(pr -> pr.getAppId().equals(p.getAppId()))) {
+                    if (connectWorker.getProtocols().stream().noneMatch(pr -> pr.getAppId().equals(p.getAppId()))) {
                         adaptersChanged = true;
                     }
                 }
@@ -66,40 +67,29 @@ public class WorkerAdministrationManagement {
                 if (!adaptersChanged) {
                     alreadyRegistered = true;
                 } else {
-                    LOG.info("Remove old connect worker: " + connectWorker.getEndpointUrl());
+                    LOG.info("Remove old connect worker: " + connectWorker.getServiceGroup());
                     this.connectionWorkerContainerStorage.deleteConnectWorkerContainer(c.getId());
                 }
             }
-
         }
 
         // IF NOT REGISTERED
         // Store Connect Worker in DB
         if (!alreadyRegistered) {
             this.connectionWorkerContainerStorage.storeConnectWorkerContainer(connectWorker);
-            LOG.info("Stored new connect worker: " + connectWorker.getEndpointUrl() + " in database");
+            LOG.info("Stored new connect worker: " + connectWorker.getServiceGroup() + " in database");
         } else {
             try {
                 this.adapterMasterManagement.startAllStreamAdapters(connectWorker);
             } catch (AdapterException e) {
-                LOG.error("Could not start adapters on worker: " + connectWorker.getEndpointUrl());
+                LOG.error("Could not start adapters on worker: " + connectWorker.getServiceGroup());
+            } catch (NoServiceEndpointsAvailableException e) {
+                LOG.error("Could not start adapter due to missing endpoint");
             }
         }
     }
 
-    public String getWorkerUrl(String id) {
-        String workerUrl = "";
 
-        List<ConnectWorkerContainer> allConnectWorkerContainer = this.connectionWorkerContainerStorage.getAllConnectWorkerContainers();
 
-        for (ConnectWorkerContainer connectWorkerContainer : allConnectWorkerContainer) {
-            if (connectWorkerContainer.getProtocols().stream().anyMatch(p -> p.getAppId().equals(id))) {
-                workerUrl = connectWorkerContainer.getEndpointUrl();
-            } else if (connectWorkerContainer.getAdapters().stream().anyMatch(a -> a.getAppId().equals(id))) {
-                workerUrl = connectWorkerContainer.getEndpointUrl();
-            }
-        }
 
-        return workerUrl;
-    }
 }
diff --git a/streampipes-connect-container-master/src/main/java/org/apache/streampipes/connect/container/master/management/WorkerRestClient.java b/streampipes-connect-container-master/src/main/java/org/apache/streampipes/connect/container/master/management/WorkerRestClient.java
index fb91f5f..cb37eb2 100644
--- a/streampipes-connect-container-master/src/main/java/org/apache/streampipes/connect/container/master/management/WorkerRestClient.java
+++ b/streampipes-connect-container-master/src/main/java/org/apache/streampipes/connect/container/master/management/WorkerRestClient.java
@@ -22,10 +22,10 @@ import org.apache.http.client.fluent.Request;
 import org.apache.http.entity.ContentType;
 import org.apache.streampipes.connect.api.exception.AdapterException;
 import org.apache.streampipes.connect.container.master.util.AdapterEncryptionService;
+import org.apache.streampipes.connect.container.master.util.WorkerPaths;
 import org.apache.streampipes.model.connect.adapter.AdapterDescription;
 import org.apache.streampipes.model.connect.adapter.AdapterSetDescription;
 import org.apache.streampipes.model.connect.adapter.AdapterStreamDescription;
-import org.apache.streampipes.model.connect.grounding.ProtocolDescription;
 import org.apache.streampipes.model.runtime.RuntimeOptionsRequest;
 import org.apache.streampipes.model.runtime.RuntimeOptionsResponse;
 import org.apache.streampipes.model.util.Cloner;
@@ -47,37 +47,36 @@ public class WorkerRestClient {
 
     private static final Logger logger = LoggerFactory.getLogger(WorkerRestClient.class);
 
-    public static void invokeStreamAdapter(String baseUrl, String adapterId) throws AdapterException {
-        invokeStreamAdapter(baseUrl, (AdapterStreamDescription) getAndDecryptAdapter(adapterId));
+    public static void invokeStreamAdapter(String endpointUrl, String adapterId) throws AdapterException {
+        invokeStreamAdapter(endpointUrl, (AdapterStreamDescription) getAndDecryptAdapter(adapterId));
     }
 
-    public static void invokeStreamAdapter(String baseUrl, AdapterStreamDescription adapterStreamDescription) throws AdapterException {
-
-        String url = baseUrl + "worker/stream/invoke";
+    public static void invokeStreamAdapter(String endpointUrl, AdapterStreamDescription adapterStreamDescription) throws AdapterException {
+        String url = endpointUrl + WorkerPaths.getStreamInvokePath();
 
         startAdapter(url, adapterStreamDescription);
         updateStreamAdapterStatus(adapterStreamDescription.getId(), true);
     }
 
     public static void stopStreamAdapter(String baseUrl, AdapterStreamDescription adapterStreamDescription) throws AdapterException {
-        String url = baseUrl + "worker/stream/stop";
+        String url = baseUrl + WorkerPaths.getStreamStopPath();
 
         AdapterDescription ad = getAdapterDescriptionById(new AdapterStorageImpl(), adapterStreamDescription.getUri());
 
-        stopAdapter(adapterStreamDescription.getId(), ad, url);
+        stopAdapter(ad, url);
         updateStreamAdapterStatus(adapterStreamDescription.getId(), false);
     }
 
     public static void invokeSetAdapter(String baseUrl, AdapterSetDescription adapterSetDescription) throws AdapterException {
-        String url = baseUrl + "worker/set/invoke";
+        String url = baseUrl + WorkerPaths.getSetInvokePath();
 
         startAdapter(url, adapterSetDescription);
     }
 
     public static void stopSetAdapter(String baseUrl, AdapterSetDescription adapterSetDescription) throws AdapterException {
-        String url = baseUrl + "worker/set/stop";
+        String url = baseUrl + WorkerPaths.getSetStopPath();
 
-        stopAdapter(adapterSetDescription.getUri(), adapterSetDescription, url);
+        stopAdapter(adapterSetDescription, url);
     }
 
     public static void startAdapter(String url, AdapterDescription ad) throws AdapterException {
@@ -106,11 +105,12 @@ public class WorkerRestClient {
     }
 
 
-    public static void stopAdapter(String adapterId, AdapterDescription ad, String url) throws AdapterException {
+    public static void stopAdapter(AdapterDescription ad,
+                                   String url) throws AdapterException {
 
         // Stop execution of adatper
         try {
-            logger.info("Trying to stopAdapter adpater on endpoint: " + url);
+            logger.info("Trying to stopAdapter adapter on endpoint: " + url);
 
             String adapterDescription = JacksonSerializer.getObjectMapper().writeValueAsString(ad);
 
@@ -130,9 +130,10 @@ public class WorkerRestClient {
 
     }
 
-    public static RuntimeOptionsResponse getConfiguration(String workerEndpoint, String elementId, String username, RuntimeOptionsRequest runtimeOptionsRequest) throws AdapterException {
-        String element = encodeValue(elementId);
-        String url = workerEndpoint + "api/v1/" + username + "/worker/resolvable/" + element + "/configurations";
+    public static RuntimeOptionsResponse getConfiguration(String workerEndpoint,
+                                                          String appId,
+                                                          RuntimeOptionsRequest runtimeOptionsRequest) throws AdapterException {
+        String url = workerEndpoint + WorkerPaths.getRuntimeResolvablePath(appId);
 
         try {
             String payload = JacksonSerializer.getObjectMapper().writeValueAsString(runtimeOptionsRequest);
@@ -148,20 +149,11 @@ public class WorkerRestClient {
             e.printStackTrace();
             throw new AdapterException("Could not resolve runtime configurations from " + url);
         }
-
-    }
-
-    public static String getAdapterAssets(String baseUrl,  AdapterDescription ad) throws AdapterException {
-        return getAssets(baseUrl + "worker/adapters", ad.getAppId());
     }
 
-    public static String getProtocolAssets(String baseUrl,  ProtocolDescription ad) throws AdapterException {
-        return getAssets(baseUrl + "worker/protocol", ad.getAppId());
-    }
-
-    private static String getAssets(String baseUrl,  String  appId) throws AdapterException {
-        String url = baseUrl + "/" + appId + "/assets";
-        logger.info("Trying to Assets from endpoint: " + url + " for adapter: " + appId);
+    public static String getAssets(String workerPath) throws AdapterException {
+        String url = workerPath + "/assets";
+        logger.info("Trying to Assets from endpoint: " + url);
 
         try {
             return Request.Get(url)
@@ -170,22 +162,13 @@ public class WorkerRestClient {
                     .execute().returnContent().asString();
         } catch (IOException e) {
             logger.error(e.getMessage());
-            throw new AdapterException("Could not get assets endpoint: " + url + " for adapter: " + appId);
+            throw new AdapterException("Could not get assets endpoint: " + url);
         }
 
     }
 
-    public static byte[] getAdapterIconAsset(String baseUrl,  AdapterDescription ad) throws AdapterException {
-        return getIconAsset(baseUrl + "worker/adapters", ad.getAppId());
-    }
-
-    public static byte[] getProtocolIconAsset(String baseUrl,  ProtocolDescription ad) throws AdapterException {
-        return getIconAsset(baseUrl + "worker/protocols", ad.getAppId());
-
-    }
-
-    private static byte[] getIconAsset(String baseUrl,  String appId) throws AdapterException {
-        String url = baseUrl + "/" + appId + "/assets/icon";
+    public static byte[] getIconAsset(String baseUrl) throws AdapterException {
+        String url = baseUrl + "/assets/icon";
 
         try {
             byte[] responseString = Request.Get(url)
@@ -199,16 +182,8 @@ public class WorkerRestClient {
         }
     }
 
-    public static String getAdapterDocumentationAsset(String baseUrl,  AdapterDescription ad) throws AdapterException {
-        return getDocumentationAsset(baseUrl + "worker/adapters", ad.getAppId());
-    }
-
-    public static String getProtocolDocumentationAsset(String baseUrl,  ProtocolDescription ad) throws AdapterException {
-        return getDocumentationAsset(baseUrl + "worker/protocols", ad.getAppId());
-    }
-
-    private static String getDocumentationAsset(String baseUrl,  String appId) throws AdapterException  {
-        String url = baseUrl + "/" + appId + "/assets/documentation";
+    public static String getDocumentationAsset(String baseUrl) throws AdapterException  {
+        String url = baseUrl + "/assets/documentation";
        
         try {
             return Request.Get(url)
@@ -217,7 +192,7 @@ public class WorkerRestClient {
                     .execute().returnContent().asString();
         } catch (IOException e) {
             logger.error(e.getMessage());
-            throw new AdapterException("Could not get documentation endpoint: " + url + " for adapter: " + appId);
+            throw new AdapterException("Could not get documentation endpoint: " + url);
         }
     }
 
diff --git a/streampipes-connect-container-master/src/main/java/org/apache/streampipes/connect/container/master/management/WorkerUrlProvider.java b/streampipes-connect-container-master/src/main/java/org/apache/streampipes/connect/container/master/management/WorkerUrlProvider.java
new file mode 100644
index 0000000..efc5b4c
--- /dev/null
+++ b/streampipes-connect-container-master/src/main/java/org/apache/streampipes/connect/container/master/management/WorkerUrlProvider.java
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.apache.streampipes.connect.container.master.management;
+
+import org.apache.streampipes.commons.exceptions.NoServiceEndpointsAvailableException;
+import org.apache.streampipes.manager.execution.http.ExtensionsServiceEndpointGenerator;
+import org.apache.streampipes.model.connect.adapter.AdapterDescription;
+import org.apache.streampipes.model.connect.adapter.GenericAdapterDescription;
+import org.apache.streampipes.model.connect.grounding.ProtocolDescription;
+import org.apache.streampipes.model.connect.worker.ConnectWorkerContainer;
+import org.apache.streampipes.storage.api.IConnectWorkerContainerStorage;
+import org.apache.streampipes.storage.couchdb.impl.ConnectionWorkerContainerStorageImpl;
+import org.apache.streampipes.svcdiscovery.api.model.SpServiceUrlProvider;
+
+import java.util.List;
+
+public class WorkerUrlProvider {
+
+  private IConnectWorkerContainerStorage connectWorkerContainerStorage;
+
+  public WorkerUrlProvider() {
+      this.connectWorkerContainerStorage = new ConnectionWorkerContainerStorageImpl();
+  }
+
+  public String getWorkerUrlForAdapter(AdapterDescription adapterDescription) throws NoServiceEndpointsAvailableException {
+    String id = "";
+
+    if (adapterDescription instanceof GenericAdapterDescription) {
+      id = ((GenericAdapterDescription) (adapterDescription)).getProtocolDescription().getAppId();
+    } else {
+      id = adapterDescription.getAppId();
+    }
+
+    return getWorkerUrl(id);
+  }
+
+  public String getWorkerUrlForProtocol(ProtocolDescription protocolDescription) throws NoServiceEndpointsAvailableException {
+    String id =  protocolDescription.getAppId();
+
+    return getWorkerUrl(id);
+  }
+
+  public String getWorkerUrl(String appId) throws NoServiceEndpointsAvailableException {
+    return getEndpointGenerator(appId).getEndpointResourceUrl();
+  }
+
+  public String getWorkerBaseUrl(String appId) throws NoServiceEndpointsAvailableException {
+    return getEndpointGenerator(appId).getEndpointBaseUrl();
+  }
+
+  private ExtensionsServiceEndpointGenerator getEndpointGenerator(String appId) {
+    SpServiceUrlProvider provider = null;
+    List<ConnectWorkerContainer> allConnectWorkerContainer = this.connectWorkerContainerStorage.getAllConnectWorkerContainers();
+
+    for (ConnectWorkerContainer connectWorkerContainer : allConnectWorkerContainer) {
+      if (connectWorkerContainer.getProtocols().stream().anyMatch(p -> p.getAppId().equals(appId))) {
+        provider = SpServiceUrlProvider.PROTOCOL;
+      } else if (connectWorkerContainer.getAdapters().stream().anyMatch(a -> a.getAppId().equals(appId))) {
+        provider = SpServiceUrlProvider.ADAPTER;
+      }
+    }
+
+    return new ExtensionsServiceEndpointGenerator(appId, provider);
+  }
+
+}
diff --git a/streampipes-connect-container-master/src/main/java/org/apache/streampipes/connect/container/master/rest/AdapterResource.java b/streampipes-connect-container-master/src/main/java/org/apache/streampipes/connect/container/master/rest/AdapterResource.java
index 8a9887d..b887d98 100644
--- a/streampipes-connect-container-master/src/main/java/org/apache/streampipes/connect/container/master/rest/AdapterResource.java
+++ b/streampipes-connect-container-master/src/main/java/org/apache/streampipes/connect/container/master/rest/AdapterResource.java
@@ -18,9 +18,10 @@
 
 package org.apache.streampipes.connect.container.master.rest;
 
+import org.apache.streampipes.commons.exceptions.NoServiceEndpointsAvailableException;
 import org.apache.streampipes.connect.api.exception.AdapterException;
 import org.apache.streampipes.connect.container.master.management.AdapterMasterManagement;
-import org.apache.streampipes.connect.container.master.management.Utils;
+import org.apache.streampipes.connect.container.master.management.WorkerUrlProvider;
 import org.apache.streampipes.model.connect.adapter.AdapterDescription;
 import org.apache.streampipes.model.connect.adapter.AdapterDescriptionList;
 import org.apache.streampipes.model.message.Notifications;
@@ -37,9 +38,11 @@ import java.util.List;
 public class AdapterResource extends AbstractAdapterResource<AdapterMasterManagement> {
 
     private Logger LOG = LoggerFactory.getLogger(AdapterResource.class);
+    private WorkerUrlProvider workerUrlProvider;
 
     public AdapterResource() {
         super(AdapterMasterManagement::new);
+        this.workerUrlProvider = new WorkerUrlProvider();
     }
 
     @POST
@@ -52,9 +55,9 @@ public class AdapterResource extends AbstractAdapterResource<AdapterMasterManage
         LOG.info("User: " + username + " starts adapter " + adapterDescription.getAdapterId());
 
         try {
-            String workerUrl = getModifiedWorkerUrl(adapterDescription, username);
-            adapterId = managementService.addAdapter(adapterDescription, workerUrl, username);
-        } catch (AdapterException e) {
+            String workerBaseUrl = workerUrlProvider.getWorkerBaseUrl(adapterDescription.getAppId());
+            adapterId = managementService.addAdapter(adapterDescription, workerBaseUrl, username);
+        } catch (AdapterException | NoServiceEndpointsAvailableException e) {
             LOG.error("Error while starting adapter with id " + adapterDescription.getAppId(), e);
             return ok(Notifications.error(e.getMessage()));
         }
@@ -86,8 +89,7 @@ public class AdapterResource extends AbstractAdapterResource<AdapterMasterManage
     public Response stopAdapter(@PathParam("id") String adapterId,
                                 @PathParam("username") String username) {
         try {
-            String workerUrl = getModifiedWorkerUrl(adapterId, username);
-            managementService.stopStreamAdapter(adapterId, workerUrl);
+            managementService.stopStreamAdapter(adapterId, getAdapterDescription(adapterId).getSelectedEndpointUrl());
             return ok(Notifications.success("Adapter started"));
         } catch (AdapterException e) {
             LOG.error("Could not stop adapter with id " +adapterId, e);
@@ -102,10 +104,10 @@ public class AdapterResource extends AbstractAdapterResource<AdapterMasterManage
     public Response startAdapter(@PathParam("id") String adapterId,
                                 @PathParam("username") String username) {
         try {
-            String workerUrl = getModifiedWorkerUrl(adapterId, username);
+            String workerUrl =  workerUrlProvider.getWorkerBaseUrl(getAdapterDescription(adapterId).getAppId());
             managementService.startStreamAdapter(adapterId, workerUrl);
             return ok(Notifications.success("Adapter stopped"));
-        } catch (AdapterException e) {
+        } catch (AdapterException | NoServiceEndpointsAvailableException e) {
             LOG.error("Could not start adapter with id " +adapterId, e);
             return ok(Notifications.error(e.getMessage()));
         }
@@ -119,8 +121,7 @@ public class AdapterResource extends AbstractAdapterResource<AdapterMasterManage
                                   @PathParam("username") String username) {
 
         try {
-            String workerUrl = getModifiedWorkerUrl(adapterId, username);
-            managementService.deleteAdapter(adapterId, workerUrl);
+            managementService.deleteAdapter(adapterId);
             return ok(Notifications.success("Adapter deleted."));
         } catch (AdapterException e) {
             LOG.error("Error while deleting adapter with id " + adapterId, e);
@@ -148,15 +149,4 @@ public class AdapterResource extends AbstractAdapterResource<AdapterMasterManage
         return managementService.getAdapter(adapterId);
     }
 
-    private String getModifiedWorkerUrl(String adapterId,
-                                        String username) throws AdapterException {
-        AdapterDescription adapterDescription = getAdapterDescription(adapterId);
-        return getModifiedWorkerUrl(adapterDescription, username);
-    }
-
-    private String getModifiedWorkerUrl(AdapterDescription adapterDescription,
-                                        String username) throws AdapterException {
-        String workerUrl = new Utils().getWorkerUrlById(adapterDescription.getAppId());
-        return Utils.addUserNameToApi(workerUrl, username);
-    }
 }
diff --git a/streampipes-connect-container-master/src/main/java/org/apache/streampipes/connect/container/master/rest/DescriptionResource.java b/streampipes-connect-container-master/src/main/java/org/apache/streampipes/connect/container/master/rest/DescriptionResource.java
index 7132e09..a247193 100644
--- a/streampipes-connect-container-master/src/main/java/org/apache/streampipes/connect/container/master/rest/DescriptionResource.java
+++ b/streampipes-connect-container-master/src/main/java/org/apache/streampipes/connect/container/master/rest/DescriptionResource.java
@@ -17,9 +17,10 @@
 
 package org.apache.streampipes.connect.container.master.rest;
 
+import org.apache.streampipes.commons.exceptions.NoServiceEndpointsAvailableException;
 import org.apache.streampipes.connect.api.exception.AdapterException;
 import org.apache.streampipes.connect.container.master.management.DescriptionManagement;
-import org.apache.streampipes.connect.container.master.management.Utils;
+import org.apache.streampipes.connect.container.master.management.WorkerUrlProvider;
 import org.apache.streampipes.model.connect.adapter.AdapterDescription;
 import org.apache.streampipes.model.connect.adapter.AdapterDescriptionList;
 import org.apache.streampipes.model.connect.grounding.FormatDescriptionList;
@@ -41,9 +42,11 @@ import java.util.Optional;
 public class DescriptionResource extends AbstractAdapterResource<DescriptionManagement> {
 
     private static final Logger LOG = LoggerFactory.getLogger(DescriptionResource.class);
+    private WorkerUrlProvider workerUrlProvider;
 
     public DescriptionResource() {
         super(DescriptionManagement::new);
+        workerUrlProvider = new WorkerUrlProvider();
     }
 
     @GET
@@ -86,19 +89,17 @@ public class DescriptionResource extends AbstractAdapterResource<DescriptionMana
             Optional<AdapterDescription> adapterDescriptionOptional = managementService.getAdapter(id);
             if (adapterDescriptionOptional.isPresent()) {
                 AdapterDescription adapterDescription = adapterDescriptionOptional.get();
-                String workerUrl = new Utils().getWorkerUrl(adapterDescription);
-                String newUrl = Utils.addUserNameToApi(workerUrl, userName);
+                String workerUrl = workerUrlProvider.getWorkerUrlForAdapter(adapterDescription);
 
-                result = managementService.getAdapterAssets(adapterDescription, newUrl);
+                result = managementService.getAssets(workerUrl);
             }
 
             Optional<ProtocolDescription> protocolDescriptionOptional  = managementService.getProtocol(id);
             if (protocolDescriptionOptional.isPresent()) {
                 ProtocolDescription protocolDescription = protocolDescriptionOptional.get();
-                String workerUrl = new Utils().getWorkerUrl(protocolDescription);
-                String newUrl = Utils.addUserNameToApi(workerUrl, userName);
+                String workerUrl = workerUrlProvider.getWorkerUrlForProtocol(protocolDescription);
 
-                result = managementService.getProtocolAssets(protocolDescription, newUrl);
+                result = managementService.getAssets(workerUrl);
             }
 
             if (result == null) {
@@ -110,6 +111,8 @@ public class DescriptionResource extends AbstractAdapterResource<DescriptionMana
         } catch (AdapterException e) {
             LOG.error("Not found adapter with id " + id, e);
             return fail();
+        } catch (NoServiceEndpointsAvailableException e) {
+            return fail();
         }
     }
 
@@ -124,19 +127,17 @@ public class DescriptionResource extends AbstractAdapterResource<DescriptionMana
             Optional<AdapterDescription> adapterDescriptionOptional = managementService.getAdapter(id);
             if (adapterDescriptionOptional.isPresent()) {
                 AdapterDescription adapterDescription = adapterDescriptionOptional.get();
-                String workerUrl = new Utils().getWorkerUrl(adapterDescription);
-                String newUrl = Utils.addUserNameToApi(workerUrl, userName);
+                String workerUrl = workerUrlProvider.getWorkerUrlForAdapter(adapterDescription);
 
-                result = managementService.getAdapterIconAsset(adapterDescription, newUrl);
+                result = managementService.getIconAsset(workerUrl);
             }
 
             Optional<ProtocolDescription> protocolDescriptionOptional  = managementService.getProtocol(id);
             if (protocolDescriptionOptional.isPresent()) {
                 ProtocolDescription protocolDescription = protocolDescriptionOptional.get();
-                String workerUrl = new Utils().getWorkerUrl(protocolDescription);
-                String newUrl = Utils.addUserNameToApi(workerUrl, userName);
+                String workerUrl = workerUrlProvider.getWorkerUrlForProtocol(protocolDescription);
 
-                result = managementService.getProtocolIconAsset(protocolDescription, newUrl);
+                result = managementService.getIconAsset(workerUrl);
             }
 
             if (result == null) {
@@ -148,6 +149,8 @@ public class DescriptionResource extends AbstractAdapterResource<DescriptionMana
         } catch (AdapterException e) {
             LOG.error("Not found adapter with id " + id);
             return fail();
+        } catch (NoServiceEndpointsAvailableException e) {
+            return fail();
         }
     }
 
@@ -161,19 +164,17 @@ public class DescriptionResource extends AbstractAdapterResource<DescriptionMana
             Optional<AdapterDescription> adapterDescriptionOptional = managementService.getAdapter(id);
             if (adapterDescriptionOptional.isPresent()) {
                 AdapterDescription adapterDescription = adapterDescriptionOptional.get();
-                String workerUrl = new Utils().getWorkerUrl(adapterDescription);
-                String newUrl = Utils.addUserNameToApi(workerUrl, userName);
+                String workerUrl = workerUrlProvider.getWorkerUrlForAdapter(adapterDescription);
 
-                result =  managementService.getAdapterDocumentationAsset(adapterDescription, newUrl);
+                result =  managementService.getDocumentationAsset(workerUrl);
             }
 
             Optional<ProtocolDescription> protocolDescriptionOptional  = managementService.getProtocol(id);
             if (protocolDescriptionOptional.isPresent()) {
                 ProtocolDescription protocolDescription = protocolDescriptionOptional.get();
-                String workerUrl = new Utils().getWorkerUrl(protocolDescription);
-                String newUrl = Utils.addUserNameToApi(workerUrl, userName);
+                String workerUrl = workerUrlProvider.getWorkerUrlForProtocol(protocolDescription);
 
-                result =  managementService.getProtocolDocumentationAsset(protocolDescription, newUrl);
+                result =  managementService.getDocumentationAsset(workerUrl);
             }
 
             if (result == null) {
@@ -185,6 +186,8 @@ public class DescriptionResource extends AbstractAdapterResource<DescriptionMana
         } catch (AdapterException e) {
             LOG.error("Not found adapter with id " + id, e);
             return fail();
+        } catch (NoServiceEndpointsAvailableException e) {
+            return fail();
         }
     }
 }
diff --git a/streampipes-connect-container-master/src/main/java/org/apache/streampipes/connect/container/master/rest/RuntimeResolvableResource.java b/streampipes-connect-container-master/src/main/java/org/apache/streampipes/connect/container/master/rest/RuntimeResolvableResource.java
index df640df..0294dcb 100644
--- a/streampipes-connect-container-master/src/main/java/org/apache/streampipes/connect/container/master/rest/RuntimeResolvableResource.java
+++ b/streampipes-connect-container-master/src/main/java/org/apache/streampipes/connect/container/master/rest/RuntimeResolvableResource.java
@@ -18,9 +18,11 @@
 
 package org.apache.streampipes.connect.container.master.rest;
 
+import org.apache.streampipes.commons.exceptions.NoServiceEndpointsAvailableException;
 import org.apache.streampipes.connect.api.exception.AdapterException;
 import org.apache.streampipes.connect.container.master.management.WorkerAdministrationManagement;
 import org.apache.streampipes.connect.container.master.management.WorkerRestClient;
+import org.apache.streampipes.connect.container.master.management.WorkerUrlProvider;
 import org.apache.streampipes.model.runtime.RuntimeOptionsRequest;
 import org.apache.streampipes.model.runtime.RuntimeOptionsResponse;
 import org.apache.streampipes.rest.shared.annotation.JacksonSerialized;
@@ -33,9 +35,11 @@ import javax.ws.rs.core.Response;
 public class RuntimeResolvableResource extends AbstractAdapterResource<WorkerAdministrationManagement> {
 
     private static final String SP_NS =  "https://streampipes.org/vocabulary/v1/";
+    private WorkerUrlProvider workerUrlProvider;
 
     public RuntimeResolvableResource() {
         super(WorkerAdministrationManagement::new);
+        this.workerUrlProvider = new WorkerUrlProvider();
     }
 
     @POST
@@ -43,22 +47,18 @@ public class RuntimeResolvableResource extends AbstractAdapterResource<WorkerAdm
     @JacksonSerialized
     @Produces(MediaType.APPLICATION_JSON)
     @Consumes(MediaType.APPLICATION_JSON)
-    public Response fetchConfigurations(@PathParam("id") String elementId,
+    public Response fetchConfigurations(@PathParam("id") String appId,
                                         @PathParam("username") String username,
                                         RuntimeOptionsRequest runtimeOptionsRequest) {
 
         // TODO add solution for formats
-//        ResolvesContainerProvidedOptions runtimeResolvableOptions = RuntimeResovable.getRuntimeResolvableFormat(elementId);
-
-        String id = elementId.replaceAll("sp:", SP_NS);
-        String workerEndpoint = managementService.getWorkerUrl(id);
 
         try {
-
-            RuntimeOptionsResponse result = WorkerRestClient.getConfiguration(workerEndpoint, elementId, username, runtimeOptionsRequest);
+            String workerEndpoint = workerUrlProvider.getWorkerBaseUrl(appId);
+            RuntimeOptionsResponse result = WorkerRestClient.getConfiguration(workerEndpoint, appId, runtimeOptionsRequest);
 
             return ok(result);
-        } catch (AdapterException e) {
+        } catch (AdapterException | NoServiceEndpointsAvailableException e) {
             e.printStackTrace();
             return fail();
         }
diff --git a/streampipes-connect-container-master/src/main/java/org/apache/streampipes/connect/container/master/rest/SourcesResource.java b/streampipes-connect-container-master/src/main/java/org/apache/streampipes/connect/container/master/rest/SourcesResource.java
index 9cfb19b..7d5e678 100644
--- a/streampipes-connect-container-master/src/main/java/org/apache/streampipes/connect/container/master/rest/SourcesResource.java
+++ b/streampipes-connect-container-master/src/main/java/org/apache/streampipes/connect/container/master/rest/SourcesResource.java
@@ -18,6 +18,7 @@
 
 package org.apache.streampipes.connect.container.master.rest;
 
+import org.apache.streampipes.commons.exceptions.NoServiceEndpointsAvailableException;
 import org.apache.streampipes.connect.api.exception.AdapterException;
 import org.apache.streampipes.connect.container.master.management.SourcesManagement;
 import org.apache.streampipes.model.SpDataSet;
@@ -79,7 +80,7 @@ public class SourcesResource extends AbstractAdapterResource<SourcesManagement>
 
         try {
             managementService.addAdapter(elementId,  dataSet, username);
-        } catch (AdapterException e) {
+        } catch (AdapterException | NoServiceEndpointsAvailableException e) {
             LOG.error("Could not set data set instance: " + dataSet.getUri(), e);
             return ok(Notifications.error("Could not set data set instance: " + dataSet.getUri()));
         }
@@ -98,7 +99,7 @@ public class SourcesResource extends AbstractAdapterResource<SourcesManagement>
 
         try {
             managementService.detachAdapter(elementId, runningInstanceId, username);
-        } catch (AdapterException e) {
+        } catch (AdapterException | NoServiceEndpointsAvailableException e) {
             LOG.error("Could not set set id "+ elementId  + " with instance id: "+ runningInstanceId, e);
             return fail();
         }
diff --git a/streampipes-connect-container-master/src/main/java/org/apache/streampipes/connect/container/master/rest/WorkerAdministrationResource.java b/streampipes-connect-container-master/src/main/java/org/apache/streampipes/connect/container/master/rest/WorkerAdministrationResource.java
index d421f4c..b601d60 100644
--- a/streampipes-connect-container-master/src/main/java/org/apache/streampipes/connect/container/master/rest/WorkerAdministrationResource.java
+++ b/streampipes-connect-container-master/src/main/java/org/apache/streampipes/connect/container/master/rest/WorkerAdministrationResource.java
@@ -47,7 +47,7 @@ public class WorkerAdministrationResource extends AbstractSharedRestInterface {
     @JacksonSerialized
     @Produces(MediaType.APPLICATION_JSON)
     public Response addWorkerContainer(ConnectWorkerContainer connectWorkerContainer) {
-        LOG.info("Worker container: " + connectWorkerContainer.getEndpointUrl() + " was detected");
+        LOG.info("Worker container: " + connectWorkerContainer.getServiceGroup() + " was detected");
         this.workerAdministrationManagement.register(connectWorkerContainer);
 
         return ok(Notifications.success("Worker Container sucessfully added"));
diff --git a/streampipes-storage-api/src/main/java/org/apache/streampipes/storage/api/IRdfEndpointStorage.java b/streampipes-connect-container-master/src/main/java/org/apache/streampipes/connect/container/master/util/Utils.java
similarity index 72%
copy from streampipes-storage-api/src/main/java/org/apache/streampipes/storage/api/IRdfEndpointStorage.java
copy to streampipes-connect-container-master/src/main/java/org/apache/streampipes/connect/container/master/util/Utils.java
index 861bbe3..846944f 100644
--- a/streampipes-storage-api/src/main/java/org/apache/streampipes/storage/api/IRdfEndpointStorage.java
+++ b/streampipes-connect-container-master/src/main/java/org/apache/streampipes/connect/container/master/util/Utils.java
@@ -16,17 +16,14 @@
  *
  */
 
-package org.apache.streampipes.storage.api;
+package org.apache.streampipes.connect.container.master.util;
 
-import org.apache.streampipes.model.client.endpoint.RdfEndpoint;
+public class Utils {
 
-import java.util.List;
+    public static String addUserNameToApi(String url, String userName) {
+        //return url;
+        return  url + "/api/v1/" + userName + "/";
+    }
 
-public interface IRdfEndpointStorage {
 
-    void addRdfEndpoint(RdfEndpoint rdfEndpoint);
-
-    void removeRdfEndpoint(String rdfEndpointId);
-
-    List<RdfEndpoint> getRdfEndpoints();
 }
diff --git a/streampipes-connect-container-master/src/main/java/org/apache/streampipes/connect/container/master/util/WorkerPaths.java b/streampipes-connect-container-master/src/main/java/org/apache/streampipes/connect/container/master/util/WorkerPaths.java
new file mode 100644
index 0000000..2b5a624
--- /dev/null
+++ b/streampipes-connect-container-master/src/main/java/org/apache/streampipes/connect/container/master/util/WorkerPaths.java
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.apache.streampipes.connect.container.master.util;
+
+public class WorkerPaths {
+
+  private static final String WorkerMainPath = "/api/v1/worker";
+  private static final String Slash = "/";
+
+  public static String getStreamInvokePath() {
+    return WorkerMainPath + "/stream/invoke";
+  }
+
+  public static String getStreamStopPath() {
+    return WorkerMainPath + "/stream/stop";
+  }
+
+  public static String getSetInvokePath() {
+    return WorkerMainPath +  "/set/invoke";
+  }
+
+  public static String getSetStopPath() {
+    return WorkerMainPath + "/set/stop";
+  }
+
+  public static String getRuntimeResolvablePath(String elementId) {
+    return WorkerMainPath + "/resolvable/" + elementId + "/configurations";
+  }
+
+  public static String getAdaptersPath() {
+    return WorkerMainPath +  "/adapters";
+  }
+
+  public static String getProtocolsPath() {
+    return WorkerMainPath + "/protocols";
+  }
+
+  public static String getAdaptersPath(String appId) {
+    return getAdaptersPath() + Slash + appId;
+  }
+
+  public static String getProtocolsPath(String appId) {
+    return getProtocolsPath() + Slash + appId;
+  }
+
+  public static String getGuessSchemaPath() {
+    return WorkerMainPath + "/guess/schema";
+  }
+
+
+
+
+}
diff --git a/streampipes-connect-container-master/src/test/java/org/apache/streampipes/connect/container/master/management/WorkerRestClientTest.java b/streampipes-connect-container-master/src/test/java/org/apache/streampipes/connect/container/master/management/WorkerRestClientTest.java
index ae186f3..eae5336 100644
--- a/streampipes-connect-container-master/src/test/java/org/apache/streampipes/connect/container/master/management/WorkerRestClientTest.java
+++ b/streampipes-connect-container-master/src/test/java/org/apache/streampipes/connect/container/master/management/WorkerRestClientTest.java
@@ -59,7 +59,7 @@ public class WorkerRestClientTest {
         WorkerRestClient.stopStreamAdapter("", description);
 
         verifyStatic(WorkerRestClient.class, times(1));
-        WorkerRestClient.stopAdapter(anyString(), any(), eq("worker/stream/stop"));
+        WorkerRestClient.stopAdapter(any(), eq("worker/stream/stop"));
 
     }
 
@@ -109,7 +109,7 @@ public class WorkerRestClientTest {
         WorkerRestClient.stopSetAdapter("", description);
 
         verifyStatic(WorkerRestClient.class, times(1));
-        WorkerRestClient.stopAdapter(anyString(), any(), eq("worker/set/stop"));
+        WorkerRestClient.stopAdapter(any(), eq("worker/set/stop"));
 
     }
 
diff --git a/streampipes-connect-container-worker/src/main/java/org/apache/streampipes/connect/container/worker/init/AdapterServiceResourceProvider.java b/streampipes-connect-container-worker/src/main/java/org/apache/streampipes/connect/container/worker/init/AdapterServiceResourceProvider.java
index 9b69ebd..2db077a 100644
--- a/streampipes-connect-container-worker/src/main/java/org/apache/streampipes/connect/container/worker/init/AdapterServiceResourceProvider.java
+++ b/streampipes-connect-container-worker/src/main/java/org/apache/streampipes/connect/container/worker/init/AdapterServiceResourceProvider.java
@@ -28,7 +28,7 @@ public class AdapterServiceResourceProvider implements ExtensionsResourceProvide
 
   @Override
   public List<Class<?>> getResourceClasses() {
-    return Arrays.asList(//WelcomePageWorker.class,
+    return Arrays.asList(WelcomePageWorker.class,
             GuessResource.class,
             RuntimeResolvableResource.class,
             WorkerResource.class,
diff --git a/streampipes-connect-container-worker/src/main/java/org/apache/streampipes/connect/container/worker/init/AdapterWorkerContainer.java b/streampipes-connect-container-worker/src/main/java/org/apache/streampipes/connect/container/worker/init/AdapterWorkerContainer.java
index b6b3f6a..4f434f8 100644
--- a/streampipes-connect-container-worker/src/main/java/org/apache/streampipes/connect/container/worker/init/AdapterWorkerContainer.java
+++ b/streampipes-connect-container-worker/src/main/java/org/apache/streampipes/connect/container/worker/init/AdapterWorkerContainer.java
@@ -49,11 +49,16 @@ public abstract class AdapterWorkerContainer extends StreamPipesExtensionsServic
 
   @Override
   public void afterServiceRegistered(SpServiceDefinition serviceDef) {
-    new ConnectWorkerRegistrationService().registerWorker();
+    new ConnectWorkerRegistrationService().registerWorker(serviceDef.getServiceGroup());
   }
 
   @Override
   public void onExit() {
     deregisterService(DeclarersSingleton.getInstance().getServiceId());
   }
+
+  @Override
+  protected String getHealthCheckPath() {
+    return "/worker";
+  }
 }
diff --git a/streampipes-connect-container-worker/src/main/java/org/apache/streampipes/connect/container/worker/init/ConnectWorkerDescriptionProvider.java b/streampipes-connect-container-worker/src/main/java/org/apache/streampipes/connect/container/worker/init/ConnectWorkerDescriptionProvider.java
index ac98daf..1ec7506 100644
--- a/streampipes-connect-container-worker/src/main/java/org/apache/streampipes/connect/container/worker/init/ConnectWorkerDescriptionProvider.java
+++ b/streampipes-connect-container-worker/src/main/java/org/apache/streampipes/connect/container/worker/init/ConnectWorkerDescriptionProvider.java
@@ -23,7 +23,6 @@ import org.apache.streampipes.container.init.DeclarersSingleton;
 import org.apache.streampipes.container.locales.LabelGenerator;
 import org.apache.streampipes.model.base.NamedStreamPipesEntity;
 import org.apache.streampipes.model.connect.adapter.AdapterDescription;
-import org.apache.streampipes.model.connect.adapter.GenericAdapterDescription;
 import org.apache.streampipes.model.connect.grounding.ProtocolDescription;
 import org.apache.streampipes.model.connect.worker.ConnectWorkerContainer;
 import org.slf4j.Logger;
@@ -37,32 +36,24 @@ public class ConnectWorkerDescriptionProvider {
 
   private static final Logger LOG = LoggerFactory.getLogger(ConnectWorkerDescriptionProvider.class);
 
-  public ConnectWorkerContainer getContainerDescription(String endpointUrl) {
+  public ConnectWorkerContainer getContainerDescription(String serviceGroup) {
 
     List<AdapterDescription> adapters = new ArrayList<>();
     for (IAdapter<?> a : DeclarersSingleton.getInstance().getAllAdapters()) {
-      AdapterDescription desc = (AdapterDescription) rewrite(a.declareModel(), endpointUrl);
+      AdapterDescription desc = (AdapterDescription) rewrite(a.declareModel());
       adapters.add(desc);
     }
 
     List<ProtocolDescription> protocols = new ArrayList<>();
     for (IProtocol p : DeclarersSingleton.getInstance().getAllProtocols()) {
-      ProtocolDescription desc = (ProtocolDescription) rewrite(p.declareModel(), endpointUrl);
+      ProtocolDescription desc = (ProtocolDescription) rewrite(p.declareModel());
       protocols.add(desc);
     }
 
-    return new ConnectWorkerContainer(endpointUrl, protocols, adapters);
+    return new ConnectWorkerContainer(serviceGroup, protocols, adapters);
   }
 
-  private NamedStreamPipesEntity rewrite(NamedStreamPipesEntity entity, String endpointUrl) {
-    if (!(entity instanceof GenericAdapterDescription)) {
-      if (entity instanceof  ProtocolDescription) {
-        entity.setElementId(endpointUrl +  "protocol/" + entity.getElementId());
-      } else if (entity instanceof  AdapterDescription) {
-        entity.setElementId(endpointUrl + "adapter/" + entity.getElementId());
-      }
-    }
-
+  private NamedStreamPipesEntity rewrite(NamedStreamPipesEntity entity) {
     // TODO remove after full internationalization support has been implemented
     if (entity.isIncludesLocales()) {
       LabelGenerator lg = new LabelGenerator(entity);
diff --git a/streampipes-connect-container-worker/src/main/java/org/apache/streampipes/connect/container/worker/init/ConnectWorkerRegistrationService.java b/streampipes-connect-container-worker/src/main/java/org/apache/streampipes/connect/container/worker/init/ConnectWorkerRegistrationService.java
index ab89a05..e23d1ed 100644
--- a/streampipes-connect-container-worker/src/main/java/org/apache/streampipes/connect/container/worker/init/ConnectWorkerRegistrationService.java
+++ b/streampipes-connect-container-worker/src/main/java/org/apache/streampipes/connect/container/worker/init/ConnectWorkerRegistrationService.java
@@ -18,7 +18,6 @@
 package org.apache.streampipes.connect.container.worker.init;
 
 import org.apache.streampipes.connect.container.worker.management.MasterRestClient;
-import org.apache.streampipes.container.init.DeclarersSingleton;
 import org.apache.streampipes.svcdiscovery.SpServiceDiscovery;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -29,18 +28,14 @@ public class ConnectWorkerRegistrationService {
 
   private static final Logger LOG = LoggerFactory.getLogger(ConnectWorkerRegistrationService.class);
 
-  public void registerWorker() {
+  public void registerWorker(String serviceGroup) {
     String masterUrl = getConnectMasterUrl() + "/streampipes-backend";
-    String workerUrl = "http://"
-            + DeclarersSingleton.getInstance().getHostName()
-            + ":" + DeclarersSingleton.getInstance().getPort() + "/";
-
     boolean connected = false;
 
     while (!connected) {
       LOG.info("Trying to connect to master: " + masterUrl);
       connected = MasterRestClient.register(masterUrl,
-              new ConnectWorkerDescriptionProvider().getContainerDescription(workerUrl));
+              new ConnectWorkerDescriptionProvider().getContainerDescription(serviceGroup));
 
       if (!connected) {
         LOG.info("Retrying in 5 seconds");
diff --git a/streampipes-connect-container-worker/src/main/java/org/apache/streampipes/connect/container/worker/init/ConnectWorkerTagProvider.java b/streampipes-connect-container-worker/src/main/java/org/apache/streampipes/connect/container/worker/init/ConnectWorkerTagProvider.java
index d261c89..107cfaa 100644
--- a/streampipes-connect-container-worker/src/main/java/org/apache/streampipes/connect/container/worker/init/ConnectWorkerTagProvider.java
+++ b/streampipes-connect-container-worker/src/main/java/org/apache/streampipes/connect/container/worker/init/ConnectWorkerTagProvider.java
@@ -23,7 +23,6 @@ import org.apache.streampipes.container.init.DeclarersSingleton;
 import org.apache.streampipes.container.util.ServiceDefinitionUtil;
 import org.apache.streampipes.svcdiscovery.api.model.DefaultSpServiceTags;
 import org.apache.streampipes.svcdiscovery.api.model.SpServiceTag;
-import org.apache.streampipes.svcdiscovery.api.model.SpServiceTagPrefix;
 
 import java.util.ArrayList;
 import java.util.Collection;
@@ -37,7 +36,7 @@ public class ConnectWorkerTagProvider {
     Collection<IProtocol> protocols = DeclarersSingleton.getInstance().getAllProtocols();
     tags.addAll(ServiceDefinitionUtil.extractAppIdsFromAdapters(adapters));
     tags.addAll(ServiceDefinitionUtil.extractAppIdsFromProtocols(protocols));
-    tags.add(SpServiceTag.create(SpServiceTagPrefix.SYSTEM, DefaultSpServiceTags.CONNECT_WORKER));
+    tags.add(DefaultSpServiceTags.CONNECT_WORKER);
 
     return tags;
   }
diff --git a/streampipes-connect-container-worker/src/main/java/org/apache/streampipes/connect/container/worker/rest/AdapterResource.java b/streampipes-connect-container-worker/src/main/java/org/apache/streampipes/connect/container/worker/rest/AdapterResource.java
index 724924d..0c0d4f9 100644
--- a/streampipes-connect-container-worker/src/main/java/org/apache/streampipes/connect/container/worker/rest/AdapterResource.java
+++ b/streampipes-connect-container-worker/src/main/java/org/apache/streampipes/connect/container/worker/rest/AdapterResource.java
@@ -34,7 +34,7 @@ import java.io.IOException;
 import java.net.URL;
 import java.util.List;
 
-@Path("/api/v1/{username}/worker/adapters")
+@Path("/api/v1/worker/adapters")
 public class AdapterResource extends AbstractSharedRestInterface {
 
     private AdapterWorkerManagement adapterWorkerManagement;
diff --git a/streampipes-connect-container-worker/src/main/java/org/apache/streampipes/connect/container/worker/rest/GuessResource.java b/streampipes-connect-container-worker/src/main/java/org/apache/streampipes/connect/container/worker/rest/GuessResource.java
index 8642e86..228dac8 100644
--- a/streampipes-connect-container-worker/src/main/java/org/apache/streampipes/connect/container/worker/rest/GuessResource.java
+++ b/streampipes-connect-container-worker/src/main/java/org/apache/streampipes/connect/container/worker/rest/GuessResource.java
@@ -36,7 +36,7 @@ import javax.ws.rs.core.MediaType;
 import javax.ws.rs.core.Response;
 
 
-@Path("/api/v1/{username}/worker/guess")
+@Path("/api/v1/worker/guess")
 public class GuessResource extends AbstractSharedRestInterface {
 
   private static final Logger logger = LoggerFactory.getLogger(GuessResource.class);
diff --git a/streampipes-connect-container-worker/src/main/java/org/apache/streampipes/connect/container/worker/rest/HttpServerAdapterResource.java b/streampipes-connect-container-worker/src/main/java/org/apache/streampipes/connect/container/worker/rest/HttpServerAdapterResource.java
index edcd7d8..7a4f1b3 100644
--- a/streampipes-connect-container-worker/src/main/java/org/apache/streampipes/connect/container/worker/rest/HttpServerAdapterResource.java
+++ b/streampipes-connect-container-worker/src/main/java/org/apache/streampipes/connect/container/worker/rest/HttpServerAdapterResource.java
@@ -24,7 +24,7 @@ import javax.ws.rs.Path;
 import javax.ws.rs.PathParam;
 import javax.ws.rs.core.Response;
 
-@Path("/api/v1/{username}/worker/live")
+@Path("/api/v1/worker/live")
 public class HttpServerAdapterResource {
 
   @POST
diff --git a/streampipes-connect-container-worker/src/main/java/org/apache/streampipes/connect/container/worker/rest/ProtocolResource.java b/streampipes-connect-container-worker/src/main/java/org/apache/streampipes/connect/container/worker/rest/ProtocolResource.java
index 5d8fade..5dbd005 100644
--- a/streampipes-connect-container-worker/src/main/java/org/apache/streampipes/connect/container/worker/rest/ProtocolResource.java
+++ b/streampipes-connect-container-worker/src/main/java/org/apache/streampipes/connect/container/worker/rest/ProtocolResource.java
@@ -34,7 +34,7 @@ import java.io.IOException;
 import java.net.URL;
 import java.util.List;
 
-@Path("/api/v1/{username}/worker/protocols")
+@Path("/api/v1/worker/protocols")
 public class ProtocolResource extends AbstractSharedRestInterface {
 
     private AdapterWorkerManagement adapterWorkerManagement;
diff --git a/streampipes-connect-container-worker/src/main/java/org/apache/streampipes/connect/container/worker/rest/RuntimeResolvableResource.java b/streampipes-connect-container-worker/src/main/java/org/apache/streampipes/connect/container/worker/rest/RuntimeResolvableResource.java
index 0f3d17f..dd8417f 100644
--- a/streampipes-connect-container-worker/src/main/java/org/apache/streampipes/connect/container/worker/rest/RuntimeResolvableResource.java
+++ b/streampipes-connect-container-worker/src/main/java/org/apache/streampipes/connect/container/worker/rest/RuntimeResolvableResource.java
@@ -32,7 +32,7 @@ import javax.ws.rs.core.MediaType;
 import javax.ws.rs.core.Response;
 import java.util.List;
 
-@Path("/api/v1/{username}/worker/resolvable")
+@Path("/api/v1/worker/resolvable")
 public class RuntimeResolvableResource extends AbstractSharedRestInterface {
 
     @POST
diff --git a/streampipes-connect-container-worker/src/main/java/org/apache/streampipes/connect/container/worker/rest/WelcomePageWorker.java b/streampipes-connect-container-worker/src/main/java/org/apache/streampipes/connect/container/worker/rest/WelcomePageWorker.java
index 7237360..c07ec73 100644
--- a/streampipes-connect-container-worker/src/main/java/org/apache/streampipes/connect/container/worker/rest/WelcomePageWorker.java
+++ b/streampipes-connect-container-worker/src/main/java/org/apache/streampipes/connect/container/worker/rest/WelcomePageWorker.java
@@ -32,7 +32,7 @@ import javax.ws.rs.core.MediaType;
 import java.io.IOException;
 import java.util.Collection;
 
-@Path("/")
+@Path("/worker")
 public class WelcomePageWorker extends AbstractSharedRestInterface {
 
     private String id;
diff --git a/streampipes-connect-container-worker/src/main/java/org/apache/streampipes/connect/container/worker/rest/WorkerResource.java b/streampipes-connect-container-worker/src/main/java/org/apache/streampipes/connect/container/worker/rest/WorkerResource.java
index 1397fd2..cbc0537 100644
--- a/streampipes-connect-container-worker/src/main/java/org/apache/streampipes/connect/container/worker/rest/WorkerResource.java
+++ b/streampipes-connect-container-worker/src/main/java/org/apache/streampipes/connect/container/worker/rest/WorkerResource.java
@@ -35,7 +35,7 @@ import javax.ws.rs.Produces;
 import javax.ws.rs.core.MediaType;
 import javax.ws.rs.core.Response;
 
-@Path("/api/v1/{username}/worker")
+@Path("/api/v1/worker")
 public class WorkerResource extends AbstractSharedRestInterface {
 
     private static final Logger logger = LoggerFactory.getLogger(WorkerResource.class);
diff --git a/streampipes-container-base/src/main/java/org/apache/streampipes/container/base/StreamPipesServiceBase.java b/streampipes-container-base/src/main/java/org/apache/streampipes/container/base/StreamPipesServiceBase.java
index e915775..6ebf29c 100644
--- a/streampipes-container-base/src/main/java/org/apache/streampipes/container/base/StreamPipesServiceBase.java
+++ b/streampipes-container-base/src/main/java/org/apache/streampipes/container/base/StreamPipesServiceBase.java
@@ -19,6 +19,7 @@ package org.apache.streampipes.container.base;
 
 import org.apache.streampipes.commons.networking.Networking;
 import org.apache.streampipes.svcdiscovery.SpServiceDiscovery;
+import org.apache.streampipes.svcdiscovery.api.model.SpServiceRegistrationRequest;
 import org.apache.streampipes.svcdiscovery.api.model.SpServiceTag;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -53,13 +54,17 @@ public abstract class StreamPipesServiceBase {
   private void registerService(String serviceGroup,
                                String serviceId,
                                Integer defaultPort) throws UnknownHostException {
+    SpServiceRegistrationRequest req = SpServiceRegistrationRequest.from(
+            serviceGroup,
+            serviceId,
+            getHostname(),
+            getPort(defaultPort),
+            getServiceTags(),
+            getHealthCheckPath());
+
     SpServiceDiscovery
             .getServiceDiscovery()
-            .registerService(serviceGroup,
-                    serviceId,
-                    getHostname(),
-                    getPort(defaultPort),
-                    getServiceTags());
+            .registerService(req);
   }
 
   protected String getHostname() throws UnknownHostException {
@@ -77,4 +82,8 @@ public abstract class StreamPipesServiceBase {
     SpServiceDiscovery.getServiceDiscovery().deregisterService(serviceId);
   }
 
+  protected String getHealthCheckPath() {
+    return "";
+  }
+
 }
diff --git a/streampipes-container-extensions/src/main/java/org/apache/streampipes/container/extensions/ExtensionsModelSubmitter.java b/streampipes-container-extensions/src/main/java/org/apache/streampipes/container/extensions/ExtensionsModelSubmitter.java
index ac9f572..82addbe 100644
--- a/streampipes-container-extensions/src/main/java/org/apache/streampipes/container/extensions/ExtensionsModelSubmitter.java
+++ b/streampipes-container-extensions/src/main/java/org/apache/streampipes/container/extensions/ExtensionsModelSubmitter.java
@@ -50,7 +50,7 @@ public abstract class ExtensionsModelSubmitter extends StreamPipesExtensionsServ
 
     @Override
     public void afterServiceRegistered(SpServiceDefinition serviceDef) {
-        new ConnectWorkerRegistrationService().registerWorker();
+        new ConnectWorkerRegistrationService().registerWorker(serviceDef.getServiceGroup());
     }
 
     @Override
diff --git a/streampipes-container-standalone/src/main/java/org/apache/streampipes/container/standalone/init/PipelineElementServiceTagProvider.java b/streampipes-container-standalone/src/main/java/org/apache/streampipes/container/standalone/init/PipelineElementServiceTagProvider.java
index 44ff9ba..793e8ef 100644
--- a/streampipes-container-standalone/src/main/java/org/apache/streampipes/container/standalone/init/PipelineElementServiceTagProvider.java
+++ b/streampipes-container-standalone/src/main/java/org/apache/streampipes/container/standalone/init/PipelineElementServiceTagProvider.java
@@ -22,7 +22,6 @@ import org.apache.streampipes.container.init.DeclarersSingleton;
 import org.apache.streampipes.container.util.ServiceDefinitionUtil;
 import org.apache.streampipes.svcdiscovery.api.model.DefaultSpServiceTags;
 import org.apache.streampipes.svcdiscovery.api.model.SpServiceTag;
-import org.apache.streampipes.svcdiscovery.api.model.SpServiceTagPrefix;
 
 import java.util.Collection;
 import java.util.List;
@@ -32,7 +31,7 @@ public class PipelineElementServiceTagProvider {
   public List<SpServiceTag> extractServiceTags() {
     Collection<Declarer<?>> declarers = DeclarersSingleton.getInstance().getDeclarers().values();
     List<SpServiceTag> serviceTags = ServiceDefinitionUtil.extractAppIds(declarers);
-    serviceTags.add(SpServiceTag.create(SpServiceTagPrefix.SYSTEM, DefaultSpServiceTags.PE));
+    serviceTags.add(DefaultSpServiceTags.PE);
 
     return serviceTags;
   }
diff --git a/streampipes-container/src/main/java/org/apache/streampipes/container/api/DataProcessorPipelineElementResource.java b/streampipes-container/src/main/java/org/apache/streampipes/container/api/DataProcessorPipelineElementResource.java
index 34f64dd..3d33348 100644
--- a/streampipes-container/src/main/java/org/apache/streampipes/container/api/DataProcessorPipelineElementResource.java
+++ b/streampipes-container/src/main/java/org/apache/streampipes/container/api/DataProcessorPipelineElementResource.java
@@ -19,7 +19,7 @@
 package org.apache.streampipes.container.api;
 
 import org.apache.streampipes.commons.constants.InstanceIdExtractor;
-import org.apache.streampipes.commons.constants.PipelineElementPrefix;
+import org.apache.streampipes.svcdiscovery.api.model.SpServicePathPrefix;
 import org.apache.streampipes.container.declarer.SemanticEventProcessingAgentDeclarer;
 import org.apache.streampipes.container.init.DeclarersSingleton;
 import org.apache.streampipes.model.graph.DataProcessorInvocation;
@@ -31,7 +31,7 @@ import org.apache.streampipes.sdk.extractor.ProcessingElementParameterExtractor;
 import javax.ws.rs.Path;
 import java.util.Map;
 
-@Path(PipelineElementPrefix.DATA_PROCESSOR)
+@Path(SpServicePathPrefix.DATA_PROCESSOR)
 public class DataProcessorPipelineElementResource extends InvocablePipelineElementResource<DataProcessorInvocation,
         SemanticEventProcessingAgentDeclarer, ProcessingElementParameterExtractor> {
 
diff --git a/streampipes-container/src/main/java/org/apache/streampipes/container/api/DataSinkPipelineElementResource.java b/streampipes-container/src/main/java/org/apache/streampipes/container/api/DataSinkPipelineElementResource.java
index ae46ae1..1303216 100644
--- a/streampipes-container/src/main/java/org/apache/streampipes/container/api/DataSinkPipelineElementResource.java
+++ b/streampipes-container/src/main/java/org/apache/streampipes/container/api/DataSinkPipelineElementResource.java
@@ -19,7 +19,7 @@
 package org.apache.streampipes.container.api;
 
 import org.apache.streampipes.commons.constants.InstanceIdExtractor;
-import org.apache.streampipes.commons.constants.PipelineElementPrefix;
+import org.apache.streampipes.svcdiscovery.api.model.SpServicePathPrefix;
 import org.apache.streampipes.container.declarer.SemanticEventConsumerDeclarer;
 import org.apache.streampipes.container.init.DeclarersSingleton;
 import org.apache.streampipes.model.graph.DataSinkInvocation;
@@ -30,7 +30,7 @@ import org.apache.streampipes.sdk.extractor.DataSinkParameterExtractor;
 import javax.ws.rs.Path;
 import java.util.Map;
 
-@Path(PipelineElementPrefix.DATA_SINK)
+@Path(SpServicePathPrefix.DATA_SINK)
 public class DataSinkPipelineElementResource extends InvocablePipelineElementResource<DataSinkInvocation,
         SemanticEventConsumerDeclarer, DataSinkParameterExtractor> {
 
diff --git a/streampipes-container/src/main/java/org/apache/streampipes/container/api/DataStreamPipelineElementResource.java b/streampipes-container/src/main/java/org/apache/streampipes/container/api/DataStreamPipelineElementResource.java
index 063a823..23421e0 100644
--- a/streampipes-container/src/main/java/org/apache/streampipes/container/api/DataStreamPipelineElementResource.java
+++ b/streampipes-container/src/main/java/org/apache/streampipes/container/api/DataStreamPipelineElementResource.java
@@ -18,7 +18,7 @@
 
 package org.apache.streampipes.container.api;
 
-import org.apache.streampipes.commons.constants.PipelineElementPrefix;
+import org.apache.streampipes.svcdiscovery.api.model.SpServicePathPrefix;
 import org.apache.streampipes.container.assets.AssetZipGenerator;
 import org.apache.streampipes.container.declarer.DataSetDeclarer;
 import org.apache.streampipes.container.declarer.DataStreamDeclarer;
@@ -33,7 +33,7 @@ import javax.ws.rs.core.MediaType;
 import java.io.IOException;
 import java.util.Map;
 
-@Path(PipelineElementPrefix.DATA_STREAM)
+@Path(SpServicePathPrefix.DATA_STREAM)
 public class DataStreamPipelineElementResource extends AbstractPipelineElementResource<DataStreamDeclarer> {
 
   @Override
diff --git a/streampipes-container/src/main/java/org/apache/streampipes/container/html/page/WelcomePageGenerator.java b/streampipes-container/src/main/java/org/apache/streampipes/container/html/page/WelcomePageGenerator.java
index ecbce27..140c1fc 100644
--- a/streampipes-container/src/main/java/org/apache/streampipes/container/html/page/WelcomePageGenerator.java
+++ b/streampipes-container/src/main/java/org/apache/streampipes/container/html/page/WelcomePageGenerator.java
@@ -25,6 +25,7 @@ import org.apache.streampipes.model.SpDataSet;
 import org.apache.streampipes.model.SpDataStream;
 import org.apache.streampipes.model.base.NamedStreamPipesEntity;
 import org.apache.streampipes.model.graph.DataSinkDescription;
+import org.apache.streampipes.sdk.utils.Assets;
 
 import java.io.IOException;
 import java.util.ArrayList;
@@ -72,8 +73,10 @@ public class WelcomePageGenerator {
     desc.setElementId(entity.getElementId());
     desc.setAppId(entity.getAppId());
     desc.setEditable(!(entity.isInternallyManaged()));
-    desc.setIncludesDocs(entity.isIncludesAssets());
-    desc.setIncludesIcon(entity.isIncludesAssets());
+    desc.setIncludesDocs(entity.isIncludesAssets()
+            && entity.getIncludedAssets().contains(Assets.DOCUMENTATION));
+    desc.setIncludesIcon(entity.isIncludesAssets()
+            && entity.getIncludedAssets().contains(Assets.ICON));
     String uri = baseUri;
     if (declarer instanceof SemanticEventConsumerDeclarer) {
       uri += "sec/";
diff --git a/streampipes-model-client/src/main/java/org/apache/streampipes/model/client/endpoint/RdfEndpoint.java b/streampipes-model-client/src/main/java/org/apache/streampipes/model/client/endpoint/ExtensionsServiceEndpoint.java
similarity index 91%
rename from streampipes-model-client/src/main/java/org/apache/streampipes/model/client/endpoint/RdfEndpoint.java
rename to streampipes-model-client/src/main/java/org/apache/streampipes/model/client/endpoint/ExtensionsServiceEndpoint.java
index 74c07e1..f164d80 100644
--- a/streampipes-model-client/src/main/java/org/apache/streampipes/model/client/endpoint/RdfEndpoint.java
+++ b/streampipes-model-client/src/main/java/org/apache/streampipes/model/client/endpoint/ExtensionsServiceEndpoint.java
@@ -20,18 +20,18 @@ package org.apache.streampipes.model.client.endpoint;
 
 import com.google.gson.annotations.SerializedName;
 
-public class RdfEndpoint {
+public class ExtensionsServiceEndpoint {
 
     private @SerializedName("_id") String id;
     private @SerializedName("_rev") String rev;
 
     private String endpointUrl;
 
-    public RdfEndpoint() {
+    public ExtensionsServiceEndpoint() {
 
     }
 
-    public RdfEndpoint(String endpointUrl) {
+    public ExtensionsServiceEndpoint(String endpointUrl) {
         this.endpointUrl = endpointUrl;
     }
 
diff --git a/streampipes-model-client/src/main/java/org/apache/streampipes/model/client/endpoint/RdfEndpointItem.java b/streampipes-model-client/src/main/java/org/apache/streampipes/model/client/endpoint/ExtensionsServiceEndpointItem.java
similarity index 90%
rename from streampipes-model-client/src/main/java/org/apache/streampipes/model/client/endpoint/RdfEndpointItem.java
rename to streampipes-model-client/src/main/java/org/apache/streampipes/model/client/endpoint/ExtensionsServiceEndpointItem.java
index 54863ab..12dc8e9 100644
--- a/streampipes-model-client/src/main/java/org/apache/streampipes/model/client/endpoint/RdfEndpointItem.java
+++ b/streampipes-model-client/src/main/java/org/apache/streampipes/model/client/endpoint/ExtensionsServiceEndpointItem.java
@@ -23,7 +23,7 @@ import org.apache.streampipes.model.shared.annotation.TsModel;
 import java.util.List;
 
 @TsModel
-public class RdfEndpointItem {
+public class ExtensionsServiceEndpointItem {
 
     private String name;
     private String description;
@@ -38,9 +38,9 @@ public class RdfEndpointItem {
     private boolean installed;
     private boolean editable;
 
-    private List<RdfEndpointItem> streams;
+    private List<ExtensionsServiceEndpointItem> streams;
 
-    public RdfEndpointItem() {
+    public ExtensionsServiceEndpointItem() {
 
     }
 
@@ -68,11 +68,11 @@ public class RdfEndpointItem {
         this.uri = uri;
     }
 
-    public List<RdfEndpointItem> getStreams() {
+    public List<ExtensionsServiceEndpointItem> getStreams() {
         return streams;
     }
 
-    public void setStreams(List<RdfEndpointItem> streams) {
+    public void setStreams(List<ExtensionsServiceEndpointItem> streams) {
         this.streams = streams;
     }
 
diff --git a/streampipes-model/src/main/java/org/apache/streampipes/model/connect/adapter/AdapterDescription.java b/streampipes-model/src/main/java/org/apache/streampipes/model/connect/adapter/AdapterDescription.java
index d434e68..a6c99a1 100644
--- a/streampipes-model/src/main/java/org/apache/streampipes/model/connect/adapter/AdapterDescription.java
+++ b/streampipes-model/src/main/java/org/apache/streampipes/model/connect/adapter/AdapterDescription.java
@@ -67,6 +67,8 @@ public abstract class AdapterDescription extends NamedStreamPipesEntity {
 
     private long createdAt;
 
+    private String selectedEndpointUrl;
+
     public AdapterDescription() {
         super();
         this.rules = new ArrayList<>();
@@ -103,6 +105,7 @@ public abstract class AdapterDescription extends NamedStreamPipesEntity {
         this.icon = other.getIcon();
         this.category = new Cloner().epaTypes(other.getCategory());
         this.createdAt = other.getCreatedAt();
+        this.selectedEndpointUrl = other.getSelectedEndpointUrl();
         if (other.getEventGrounding() != null) this.eventGrounding = new EventGrounding(other.getEventGrounding());
     }
 
@@ -236,4 +239,12 @@ public abstract class AdapterDescription extends NamedStreamPipesEntity {
     public void setCreatedAt(long createdAt) {
         this.createdAt = createdAt;
     }
+
+    public String getSelectedEndpointUrl() {
+        return selectedEndpointUrl;
+    }
+
+    public void setSelectedEndpointUrl(String selectedEndpointUrl) {
+        this.selectedEndpointUrl = selectedEndpointUrl;
+    }
 }
diff --git a/streampipes-model/src/main/java/org/apache/streampipes/model/connect/worker/ConnectWorkerContainer.java b/streampipes-model/src/main/java/org/apache/streampipes/model/connect/worker/ConnectWorkerContainer.java
index f41ad26..d6b2cff 100644
--- a/streampipes-model/src/main/java/org/apache/streampipes/model/connect/worker/ConnectWorkerContainer.java
+++ b/streampipes-model/src/main/java/org/apache/streampipes/model/connect/worker/ConnectWorkerContainer.java
@@ -22,6 +22,7 @@ import com.google.gson.annotations.SerializedName;
 import org.apache.streampipes.model.base.UnnamedStreamPipesEntity;
 import org.apache.streampipes.model.connect.adapter.AdapterDescription;
 import org.apache.streampipes.model.connect.grounding.ProtocolDescription;
+import org.apache.streampipes.model.util.ElementIdGenerator;
 
 import java.util.ArrayList;
 import java.util.List;
@@ -30,30 +31,34 @@ public class ConnectWorkerContainer extends UnnamedStreamPipesEntity {
 
     private @SerializedName("_rev") String rev;
 
+    private String serviceGroup;
+
+    private List<ProtocolDescription> protocols;
+
+    private List<AdapterDescription> adapters;
+
     public ConnectWorkerContainer() {
         super();
         this.adapters = new ArrayList<>();
         this.protocols = new ArrayList<>();
     }
 
-    private String endpointUrl;
-
-    private List<ProtocolDescription> protocols;
-
-    private List<AdapterDescription> adapters;
-
-    public ConnectWorkerContainer(String endpointUrl, List<ProtocolDescription> protocols, List<AdapterDescription> adapters) {
-        this.endpointUrl = endpointUrl;
+    public ConnectWorkerContainer(String serviceGroup,
+                                  List<ProtocolDescription> protocols,
+                                  List<AdapterDescription> adapters) {
+        super();
+        this.elementId = ElementIdGenerator.makeElementIdFromAppId(serviceGroup);
+        this.serviceGroup = serviceGroup;
         this.protocols = protocols;
         this.adapters = adapters;
     }
 
-    public String getEndpointUrl() {
-        return endpointUrl;
+    public String getServiceGroup() {
+        return serviceGroup;
     }
 
-    public void setEndpointUrl(String endpointUrl) {
-        this.endpointUrl = endpointUrl;
+    public void setServiceGroup(String serviceGroup) {
+        this.serviceGroup = serviceGroup;
     }
 
     public List<ProtocolDescription> getProtocols() {
diff --git a/streampipes-model/src/main/java/org/apache/streampipes/model/dashboard/DashboardEntity.java b/streampipes-model/src/main/java/org/apache/streampipes/model/dashboard/DashboardEntity.java
index 8aa2e21..dca69f2 100644
--- a/streampipes-model/src/main/java/org/apache/streampipes/model/dashboard/DashboardEntity.java
+++ b/streampipes-model/src/main/java/org/apache/streampipes/model/dashboard/DashboardEntity.java
@@ -21,14 +21,13 @@ package org.apache.streampipes.model.dashboard;
 import com.fasterxml.jackson.annotation.JsonProperty;
 import com.fasterxml.jackson.annotation.JsonSubTypes;
 import com.google.gson.annotations.SerializedName;
-import org.apache.streampipes.model.base.UnnamedStreamPipesEntity;
 import org.apache.streampipes.model.datalake.DataExplorerWidgetModel;
 
 @JsonSubTypes({
         @JsonSubTypes.Type(DashboardWidgetModel.class),
         @JsonSubTypes.Type(DataExplorerWidgetModel.class)
 })
-public abstract class DashboardEntity extends UnnamedStreamPipesEntity {
+public abstract class DashboardEntity {
 
   @JsonProperty("_id")
   private @SerializedName("_id") String id;
diff --git a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/assets/AssetFetcher.java b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/assets/AssetFetcher.java
index 06d0891..e0b50b5 100644
--- a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/assets/AssetFetcher.java
+++ b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/assets/AssetFetcher.java
@@ -18,8 +18,9 @@
 package org.apache.streampipes.manager.assets;
 
 import org.apache.http.client.fluent.Request;
-import org.apache.streampipes.commons.constants.PipelineElementUrl;
-import org.apache.streampipes.manager.execution.http.PipelineElementEndpointGenerator;
+import org.apache.streampipes.commons.exceptions.NoServiceEndpointsAvailableException;
+import org.apache.streampipes.svcdiscovery.api.model.SpServiceUrlProvider;
+import org.apache.streampipes.manager.execution.http.ExtensionsServiceEndpointGenerator;
 
 import java.io.IOException;
 import java.io.InputStream;
@@ -28,17 +29,17 @@ public class AssetFetcher {
 
   private static final String ASSET_ENDPOINT_APPENDIX = "/assets";
 
-  private PipelineElementUrl pipelineElementUrl;
+  private SpServiceUrlProvider spServiceUrlProvider;
   private String appId;
 
-  public AssetFetcher(PipelineElementUrl pipelineElementUrl,
+  public AssetFetcher(SpServiceUrlProvider spServiceUrlProvider,
                       String appId) {
-    this.pipelineElementUrl = pipelineElementUrl;
+    this.spServiceUrlProvider = spServiceUrlProvider;
     this.appId = appId;
   }
 
-  public InputStream fetchPipelineElementAssets() throws IOException {
-    String endpointUrl = new PipelineElementEndpointGenerator(appId, pipelineElementUrl).getEndpointResourceUrl();
+  public InputStream fetchPipelineElementAssets() throws IOException, NoServiceEndpointsAvailableException {
+    String endpointUrl = new ExtensionsServiceEndpointGenerator(appId, spServiceUrlProvider).getEndpointResourceUrl();
     return Request
             .Get(endpointUrl + ASSET_ENDPOINT_APPENDIX)
             .execute()
diff --git a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/assets/AssetManager.java b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/assets/AssetManager.java
index 19096c7..8fd172b 100644
--- a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/assets/AssetManager.java
+++ b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/assets/AssetManager.java
@@ -19,7 +19,8 @@ package org.apache.streampipes.manager.assets;
 
 import org.apache.commons.io.FileUtils;
 import org.apache.streampipes.commons.constants.GlobalStreamPipesConstants;
-import org.apache.streampipes.commons.constants.PipelineElementUrl;
+import org.apache.streampipes.commons.exceptions.NoServiceEndpointsAvailableException;
+import org.apache.streampipes.svcdiscovery.api.model.SpServiceUrlProvider;
 
 import java.io.File;
 import java.io.IOException;
@@ -42,9 +43,9 @@ public class AssetManager {
     return Files.readAllBytes(Paths.get(getAssetPath(appId, assetName)));
   }
 
-  public static void storeAsset(PipelineElementUrl pipelineElementUrl,
-                                String appId) throws IOException {
-    InputStream assetStream = new AssetFetcher(pipelineElementUrl, appId)
+  public static void storeAsset(SpServiceUrlProvider spServiceUrlProvider,
+                                String appId) throws IOException, NoServiceEndpointsAvailableException {
+    InputStream assetStream = new AssetFetcher(spServiceUrlProvider, appId)
             .fetchPipelineElementAssets();
     new AssetExtractor(assetStream, appId).extractAssetContents();
   }
diff --git a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/endpoint/EndpointFetcher.java b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/endpoint/EndpointFetcher.java
index a518056..1aa54c8 100644
--- a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/endpoint/EndpointFetcher.java
+++ b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/endpoint/EndpointFetcher.java
@@ -17,7 +17,7 @@
  */
 package org.apache.streampipes.manager.endpoint;
 
-import org.apache.streampipes.model.client.endpoint.RdfEndpoint;
+import org.apache.streampipes.model.client.endpoint.ExtensionsServiceEndpoint;
 import org.apache.streampipes.storage.management.StorageDispatcher;
 import org.apache.streampipes.svcdiscovery.SpServiceDiscovery;
 
@@ -29,21 +29,21 @@ import java.util.stream.Stream;
 
 public class EndpointFetcher {
 
-  public List<RdfEndpoint> getEndpoints() {
-    List<String> endpoints = SpServiceDiscovery.getServiceDiscovery().getActivePeEndpoints();
-    List<RdfEndpoint> servicerdRdfEndpoints = new LinkedList<>();
+  public List<ExtensionsServiceEndpoint> getEndpoints() {
+    List<String> endpoints = SpServiceDiscovery.getServiceDiscovery().getActivePipelineElementEndpoints();
+    List<ExtensionsServiceEndpoint> servicerdExtensionsServiceEndpoints = new LinkedList<>();
 
     for (String endpoint : endpoints) {
-      RdfEndpoint rdfEndpoint =
-              new RdfEndpoint(endpoint);
-      servicerdRdfEndpoints.add(rdfEndpoint);
+      ExtensionsServiceEndpoint extensionsServiceEndpoint =
+              new ExtensionsServiceEndpoint(endpoint);
+      servicerdExtensionsServiceEndpoints.add(extensionsServiceEndpoint);
     }
-    List<RdfEndpoint> databasedRdfEndpoints = StorageDispatcher.INSTANCE.getNoSqlStore()
+    List<ExtensionsServiceEndpoint> databasedExtensionsServiceEndpoints = StorageDispatcher.INSTANCE.getNoSqlStore()
             .getRdfEndpointStorage()
-            .getRdfEndpoints();
+            .getExtensionsServiceEndpoints();
 
-    List<RdfEndpoint> concatList =
-            Stream.of(databasedRdfEndpoints, servicerdRdfEndpoints)
+    List<ExtensionsServiceEndpoint> concatList =
+            Stream.of(databasedExtensionsServiceEndpoints, servicerdExtensionsServiceEndpoints)
                     .flatMap(Collection::stream)
                     .collect(Collectors.toList());
 
diff --git a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/endpoint/EndpointItemFetcher.java b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/endpoint/EndpointItemFetcher.java
index 9986b15..a6745d2 100644
--- a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/endpoint/EndpointItemFetcher.java
+++ b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/endpoint/EndpointItemFetcher.java
@@ -21,8 +21,8 @@ package org.apache.streampipes.manager.endpoint;
 import com.fasterxml.jackson.core.type.TypeReference;
 import org.apache.http.client.fluent.Request;
 import org.apache.http.message.BasicHeader;
-import org.apache.streampipes.model.client.endpoint.RdfEndpoint;
-import org.apache.streampipes.model.client.endpoint.RdfEndpointItem;
+import org.apache.streampipes.model.client.endpoint.ExtensionsServiceEndpoint;
+import org.apache.streampipes.model.client.endpoint.ExtensionsServiceEndpointItem;
 import org.apache.streampipes.serializers.json.JacksonSerializer;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -35,19 +35,19 @@ import java.util.List;
 public class EndpointItemFetcher {
     Logger logger = LoggerFactory.getLogger(EndpointItemFetcher.class);
 
-    private List<RdfEndpoint> rdfEndpoints;
+    private List<ExtensionsServiceEndpoint> extensionsServiceEndpoints;
 
-    public EndpointItemFetcher(List<RdfEndpoint> rdfEndpoints) {
-        this.rdfEndpoints = rdfEndpoints;
+    public EndpointItemFetcher(List<ExtensionsServiceEndpoint> extensionsServiceEndpoints) {
+        this.extensionsServiceEndpoints = extensionsServiceEndpoints;
     }
 
-    public List<RdfEndpointItem> getItems() {
-        List<RdfEndpointItem> endpointItems = new ArrayList<>();
-        rdfEndpoints.forEach(e -> endpointItems.addAll(getEndpointItems(e)));
+    public List<ExtensionsServiceEndpointItem> getItems() {
+        List<ExtensionsServiceEndpointItem> endpointItems = new ArrayList<>();
+        extensionsServiceEndpoints.forEach(e -> endpointItems.addAll(getEndpointItems(e)));
         return endpointItems;
     }
 
-    private List<RdfEndpointItem> getEndpointItems(RdfEndpoint e) {
+    private List<ExtensionsServiceEndpointItem> getEndpointItems(ExtensionsServiceEndpoint e) {
         try {
             String result = Request.Get(e.getEndpointUrl())
                     .addHeader(new BasicHeader("Accept", MediaType.APPLICATION_JSON))
@@ -56,7 +56,7 @@ public class EndpointItemFetcher {
                     .returnContent()
                     .asString();
 
-            return JacksonSerializer.getObjectMapper().readValue(result, new TypeReference<List<RdfEndpointItem>>() {});
+            return JacksonSerializer.getObjectMapper().readValue(result, new TypeReference<List<ExtensionsServiceEndpointItem>>() {});
         } catch (IOException e1) {
             logger.warn("Processing Element Descriptions could not be fetched from RDF endpoint: " + e.getEndpointUrl());
             return new ArrayList<>();
diff --git a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/http/ExtensionsServiceEndpointGenerator.java b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/http/ExtensionsServiceEndpointGenerator.java
new file mode 100644
index 0000000..4f73d8d
--- /dev/null
+++ b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/http/ExtensionsServiceEndpointGenerator.java
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.apache.streampipes.manager.execution.http;
+
+import org.apache.streampipes.commons.exceptions.NoServiceEndpointsAvailableException;
+import org.apache.streampipes.svcdiscovery.SpServiceDiscovery;
+import org.apache.streampipes.svcdiscovery.api.model.DefaultSpServiceGroups;
+import org.apache.streampipes.svcdiscovery.api.model.SpServiceUrlProvider;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Collections;
+import java.util.List;
+
+public class ExtensionsServiceEndpointGenerator {
+
+  private static final Logger LOG = LoggerFactory.getLogger(ExtensionsServiceEndpointGenerator.class);
+
+  private String appId;
+  private SpServiceUrlProvider spServiceUrlProvider;
+
+  public ExtensionsServiceEndpointGenerator(String appId,
+                                            SpServiceUrlProvider spServiceUrlProvider) {
+    this.appId = appId;
+    this.spServiceUrlProvider = spServiceUrlProvider;
+  }
+
+  public String getEndpointResourceUrl() throws NoServiceEndpointsAvailableException {
+    return spServiceUrlProvider.getInvocationUrl(selectService(), appId);
+  }
+
+  public String getEndpointBaseUrl() throws NoServiceEndpointsAvailableException {
+    return selectService();
+  }
+
+  private List<String> getServiceEndpoints() {
+    return SpServiceDiscovery.getServiceDiscovery().getServiceEndpoints(DefaultSpServiceGroups.EXT, true,
+            Collections.singletonList(this.spServiceUrlProvider.getServiceTag(appId).asString()));
+  }
+
+  private String selectService() throws NoServiceEndpointsAvailableException {
+    List<String> serviceEndpoints = getServiceEndpoints();
+    if (serviceEndpoints.size() > 0) {
+      return getServiceEndpoints().get(0);
+    } else {
+      LOG.error("Could not find any service endpoints for appId {}, serviceTag {}", appId, this.spServiceUrlProvider.getServiceTag(appId));
+      throw new NoServiceEndpointsAvailableException("Could not find any matching service endpoints");
+    }
+  }
+}
diff --git a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/http/PipelineElementEndpointGenerator.java b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/http/PipelineElementEndpointGenerator.java
deleted file mode 100644
index 3449f9e..0000000
--- a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/http/PipelineElementEndpointGenerator.java
+++ /dev/null
@@ -1,53 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-package org.apache.streampipes.manager.execution.http;
-
-import org.apache.streampipes.commons.constants.PipelineElementUrl;
-import org.apache.streampipes.svcdiscovery.SpServiceDiscovery;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.util.Collections;
-import java.util.List;
-
-public class PipelineElementEndpointGenerator {
-
-  private static final Logger LOG = LoggerFactory.getLogger(PipelineElementEndpointGenerator.class);
-
-  private String appId;
-  private PipelineElementUrl pipelineElementUrl;
-
-  public PipelineElementEndpointGenerator(String appId,
-                                          PipelineElementUrl pipelineElementUrl) {
-    this.appId = appId;
-    this.pipelineElementUrl = pipelineElementUrl;
-  }
-
-  public String getEndpointResourceUrl() {
-    List<String> endpoints = getServiceEndpoints();
-    return pipelineElementUrl.getInvocationUrl(selectService(endpoints), appId);
-  }
-
-  private List<String> getServiceEndpoints() {
-    return SpServiceDiscovery.getServiceDiscovery().getServiceEndpoints("pe", true, Collections.singletonList(this.appId));
-  }
-
-  private String selectService(List<String> availableServices) {
-    return availableServices.get(0);
-  }
-}
diff --git a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/http/PipelineExecutor.java b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/http/PipelineExecutor.java
index 8cae245..64cd83c 100644
--- a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/http/PipelineExecutor.java
+++ b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/http/PipelineExecutor.java
@@ -18,7 +18,9 @@
 
 package org.apache.streampipes.manager.execution.http;
 
-import org.apache.streampipes.commons.constants.PipelineElementUrl;
+import org.apache.streampipes.commons.exceptions.NoServiceEndpointsAvailableException;
+import org.apache.streampipes.model.pipeline.PipelineElementStatus;
+import org.apache.streampipes.svcdiscovery.api.model.SpServiceUrlProvider;
 import org.apache.streampipes.manager.execution.status.PipelineStatusManager;
 import org.apache.streampipes.manager.secret.SecretProvider;
 import org.apache.streampipes.manager.util.TemporaryGraphStorage;
@@ -80,31 +82,56 @@ public class PipelineExecutor {
 
     decryptSecrets(graphs);
 
-    graphs.forEach(g -> g.setSelectedEndpointUrl(new PipelineElementEndpointGenerator(
-            g.getAppId(),
-            getPipelineElementType(g))
-            .getEndpointResourceUrl()));
+    List<InvocableStreamPipesEntity> failedServices = new ArrayList<>();
+      graphs.forEach(g -> {
+        try {
+          g.setSelectedEndpointUrl(findSelectedEndpoint(g));
+        } catch (NoServiceEndpointsAvailableException e) {
+          failedServices.add(g);
+        }
+      });
 
-    PipelineOperationStatus status = new GraphSubmitter(pipeline.getPipelineId(),
-            pipeline.getName(), graphs, dataSets)
-            .invokeGraphs();
+    PipelineOperationStatus status;
+    if (failedServices.size() == 0) {
 
-    encryptSecrets(graphs);
+      status = new GraphSubmitter(pipeline.getPipelineId(),
+              pipeline.getName(), graphs, dataSets)
+              .invokeGraphs();
 
-    if (status.isSuccess()) {
-      storeInvocationGraphs(pipeline.getPipelineId(), graphs, dataSets);
+      encryptSecrets(graphs);
 
-      PipelineStatusManager.addPipelineStatus(pipeline.getPipelineId(),
-              new PipelineStatusMessage(pipeline.getPipelineId(), System.currentTimeMillis(), PipelineStatusMessageType.PIPELINE_STARTED.title(), PipelineStatusMessageType.PIPELINE_STARTED.description()));
+      if (status.isSuccess()) {
+        storeInvocationGraphs(pipeline.getPipelineId(), graphs, dataSets);
 
-      if (storeStatus) {
-        pipeline.setHealthStatus(PipelineHealthStatus.OK);
-        setPipelineStarted(pipeline);
+        PipelineStatusManager.addPipelineStatus(pipeline.getPipelineId(),
+                new PipelineStatusMessage(pipeline.getPipelineId(), System.currentTimeMillis(), PipelineStatusMessageType.PIPELINE_STARTED.title(), PipelineStatusMessageType.PIPELINE_STARTED.description()));
+
+        if (storeStatus) {
+          pipeline.setHealthStatus(PipelineHealthStatus.OK);
+          setPipelineStarted(pipeline);
+        }
       }
+    } else {
+      List<PipelineElementStatus> pe = failedServices.stream().map(fs ->
+              new PipelineElementStatus(fs.getElementId(),
+                      fs.getName(),
+                      false,
+                      "No active supporting service found")).collect(Collectors.toList());
+      status = new PipelineOperationStatus(pipeline.getPipelineId(),
+              pipeline.getName(),
+              "Could not start pipeline " + pipeline.getName() + ".",
+              pe);
     }
     return status;
   }
 
+  private String findSelectedEndpoint(InvocableStreamPipesEntity g) throws NoServiceEndpointsAvailableException {
+    return new ExtensionsServiceEndpointGenerator(
+            g.getAppId(),
+            getPipelineElementType(g))
+            .getEndpointResourceUrl();
+  }
+
   private void updateGroupIds(InvocableStreamPipesEntity entity) {
     entity.getInputStreams()
             .stream()
@@ -180,8 +207,8 @@ public class PipelineExecutor {
     return StorageDispatcher.INSTANCE.getNoSqlStore().getPipelineStorageAPI();
   }
 
-  private PipelineElementUrl getPipelineElementType(InvocableStreamPipesEntity entity) {
-    return entity instanceof DataProcessorInvocation ? PipelineElementUrl.DATA_PROCESSOR : PipelineElementUrl.DATA_SINK;
+  private SpServiceUrlProvider getPipelineElementType(InvocableStreamPipesEntity entity) {
+    return entity instanceof DataProcessorInvocation ? SpServiceUrlProvider.DATA_PROCESSOR : SpServiceUrlProvider.DATA_SINK;
   }
 
 }
diff --git a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/operations/Operations.java b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/operations/Operations.java
index c7f0295..1c21088 100644
--- a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/operations/Operations.java
+++ b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/operations/Operations.java
@@ -36,8 +36,8 @@ import org.apache.streampipes.manager.topic.WildcardTopicGenerator;
 import org.apache.streampipes.manager.verification.extractor.TypeExtractor;
 import org.apache.streampipes.model.SpDataSet;
 import org.apache.streampipes.model.SpDataStream;
-import org.apache.streampipes.model.client.endpoint.RdfEndpoint;
-import org.apache.streampipes.model.client.endpoint.RdfEndpointItem;
+import org.apache.streampipes.model.client.endpoint.ExtensionsServiceEndpoint;
+import org.apache.streampipes.model.client.endpoint.ExtensionsServiceEndpointItem;
 import org.apache.streampipes.model.message.DataSetModificationMessage;
 import org.apache.streampipes.model.message.Message;
 import org.apache.streampipes.model.message.PipelineModificationMessage;
@@ -150,7 +150,7 @@ public class Operations {
     return new PipelineExecutor(pipeline, visualize, storeStatus, monitor, forceStop).stopPipeline();
   }
 
-  public static List<RdfEndpointItem> getEndpointUriContents(List<RdfEndpoint> endpoints) {
+  public static List<ExtensionsServiceEndpointItem> getEndpointUriContents(List<ExtensionsServiceEndpoint> endpoints) {
     return new EndpointItemFetcher(endpoints).getItems();
   }
 
diff --git a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/setup/CouchDbInstallationStep.java b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/setup/CouchDbInstallationStep.java
index 98ccd81..7c29181 100644
--- a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/setup/CouchDbInstallationStep.java
+++ b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/setup/CouchDbInstallationStep.java
@@ -18,10 +18,10 @@
 
 package org.apache.streampipes.manager.setup;
 
-import org.apache.streampipes.model.client.endpoint.RdfEndpoint;
+import org.apache.streampipes.model.client.endpoint.ExtensionsServiceEndpoint;
 import org.apache.streampipes.model.message.Message;
 import org.apache.streampipes.model.message.Notifications;
-import org.apache.streampipes.storage.couchdb.impl.RdfEndpointStorageImpl;
+import org.apache.streampipes.storage.couchdb.impl.ExtensionsServiceEndpointStorageImpl;
 import org.apache.streampipes.storage.couchdb.utils.Utils;
 import org.lightcouch.DesignDocument;
 import org.lightcouch.DesignDocument.MapReduce;
@@ -94,10 +94,10 @@ public class CouchDbInstallationStep implements InstallationStep {
     }
 
     private Message addRdfEndpoints() {
-        RdfEndpointStorageImpl rdfEndpointStorage = new RdfEndpointStorageImpl();
+        ExtensionsServiceEndpointStorageImpl rdfEndpointStorage = new ExtensionsServiceEndpointStorageImpl();
         initRdfEndpointPorts
                 .forEach(p -> rdfEndpointStorage
-                        .addRdfEndpoint(new RdfEndpoint(initRdfEndpointHost + p)));
+                        .addExtensionsServiceEndpoint(new ExtensionsServiceEndpoint(initRdfEndpointHost + p)));
 
         return Notifications.success("Discovering pipeline element endpoints...");
     }
diff --git a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/setup/InstallationConfiguration.java b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/setup/InstallationConfiguration.java
index 461446b..d6ad3e4 100644
--- a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/setup/InstallationConfiguration.java
+++ b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/setup/InstallationConfiguration.java
@@ -19,7 +19,7 @@
 package org.apache.streampipes.manager.setup;
 
 import org.apache.streampipes.manager.endpoint.EndpointFetcher;
-import org.apache.streampipes.model.client.endpoint.RdfEndpoint;
+import org.apache.streampipes.model.client.endpoint.ExtensionsServiceEndpoint;
 import org.apache.streampipes.model.client.setup.InitialSettings;
 
 import java.util.ArrayList;
@@ -35,7 +35,7 @@ public class InstallationConfiguration {
 		steps.add(new UserRegistrationInstallationStep(settings.getAdminEmail(), settings.getAdminPassword()));
 
 		if (settings.getInstallPipelineElements()) {
-			for(RdfEndpoint endpoint : new EndpointFetcher().getEndpoints()) {
+			for(ExtensionsServiceEndpoint endpoint : new EndpointFetcher().getEndpoints()) {
 				steps.add(new PipelineElementInstallationStep(endpoint, settings.getAdminEmail()));
 			}
 		}
diff --git a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/setup/PipelineElementInstallationStep.java b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/setup/PipelineElementInstallationStep.java
index d00a66c..109bc69 100644
--- a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/setup/PipelineElementInstallationStep.java
+++ b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/setup/PipelineElementInstallationStep.java
@@ -19,8 +19,8 @@ package org.apache.streampipes.manager.setup;
 
 import org.apache.streampipes.manager.endpoint.EndpointItemParser;
 import org.apache.streampipes.manager.operations.Operations;
-import org.apache.streampipes.model.client.endpoint.RdfEndpoint;
-import org.apache.streampipes.model.client.endpoint.RdfEndpointItem;
+import org.apache.streampipes.model.client.endpoint.ExtensionsServiceEndpoint;
+import org.apache.streampipes.model.client.endpoint.ExtensionsServiceEndpointItem;
 import org.apache.streampipes.model.message.Message;
 import org.apache.streampipes.model.message.Notifications;
 
@@ -30,10 +30,10 @@ import java.util.List;
 
 public class PipelineElementInstallationStep implements InstallationStep {
 
-  private RdfEndpoint endpoint;
+  private ExtensionsServiceEndpoint endpoint;
   private String userEmail;
 
-  public PipelineElementInstallationStep(RdfEndpoint endpoint, String userEmail) {
+  public PipelineElementInstallationStep(ExtensionsServiceEndpoint endpoint, String userEmail) {
     this.endpoint = endpoint;
     this.userEmail = userEmail;
   }
@@ -41,8 +41,8 @@ public class PipelineElementInstallationStep implements InstallationStep {
   @Override
   public List<Message> install() {
     List<Message> statusMessages = new ArrayList<>();
-    List<RdfEndpointItem> items = Operations.getEndpointUriContents(Collections.singletonList(endpoint));
-    for(RdfEndpointItem item : items) {
+    List<ExtensionsServiceEndpointItem> items = Operations.getEndpointUriContents(Collections.singletonList(endpoint));
+    for(ExtensionsServiceEndpointItem item : items) {
       statusMessages.add(new EndpointItemParser().parseAndAddEndpointItem(item.getUri(),
               userEmail, true, false));
     }
diff --git a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/verification/DataProcessorVerifier.java b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/verification/DataProcessorVerifier.java
index e18fd32..f0fd68a 100644
--- a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/verification/DataProcessorVerifier.java
+++ b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/verification/DataProcessorVerifier.java
@@ -18,7 +18,8 @@
 
 package org.apache.streampipes.manager.verification;
 
-import org.apache.streampipes.commons.constants.PipelineElementUrl;
+import org.apache.streampipes.commons.exceptions.NoServiceEndpointsAvailableException;
+import org.apache.streampipes.svcdiscovery.api.model.SpServiceUrlProvider;
 import org.apache.streampipes.commons.exceptions.SepaParseException;
 import org.apache.streampipes.manager.assets.AssetManager;
 import org.apache.streampipes.model.graph.DataProcessorDescription;
@@ -65,14 +66,14 @@ public class DataProcessorVerifier extends ElementVerifier<DataProcessorDescript
   }
 
   @Override
-  protected void storeAssets() throws IOException {
+  protected void storeAssets() throws IOException, NoServiceEndpointsAvailableException {
     if (elementDescription.isIncludesAssets()) {
-      AssetManager.storeAsset(PipelineElementUrl.DATA_PROCESSOR, elementDescription.getAppId());
+      AssetManager.storeAsset(SpServiceUrlProvider.DATA_PROCESSOR, elementDescription.getAppId());
     }
   }
 
   @Override
-  protected void updateAssets() throws IOException {
+  protected void updateAssets() throws IOException, NoServiceEndpointsAvailableException {
     if (elementDescription.isIncludesAssets()) {
       AssetManager.deleteAsset(elementDescription.getAppId());
       storeAssets();
diff --git a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/verification/DataSinkVerifier.java b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/verification/DataSinkVerifier.java
index a8c6060..6ff2721 100644
--- a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/verification/DataSinkVerifier.java
+++ b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/verification/DataSinkVerifier.java
@@ -18,7 +18,8 @@
 
 package org.apache.streampipes.manager.verification;
 
-import org.apache.streampipes.commons.constants.PipelineElementUrl;
+import org.apache.streampipes.commons.exceptions.NoServiceEndpointsAvailableException;
+import org.apache.streampipes.svcdiscovery.api.model.SpServiceUrlProvider;
 import org.apache.streampipes.commons.exceptions.SepaParseException;
 import org.apache.streampipes.manager.assets.AssetManager;
 import org.apache.streampipes.model.graph.DataSinkDescription;
@@ -74,14 +75,14 @@ public class DataSinkVerifier extends ElementVerifier<DataSinkDescription> {
 	}
 
 	@Override
-	protected void storeAssets() throws IOException  {
+	protected void storeAssets() throws IOException, NoServiceEndpointsAvailableException {
 		if (elementDescription.isIncludesAssets()) {
-			AssetManager.storeAsset(PipelineElementUrl.DATA_SINK, elementDescription.getAppId());
+			AssetManager.storeAsset(SpServiceUrlProvider.DATA_SINK, elementDescription.getAppId());
 		}
 	}
 
 	@Override
-	protected void updateAssets() throws IOException {
+	protected void updateAssets() throws IOException, NoServiceEndpointsAvailableException {
 		if (elementDescription.isIncludesAssets()) {
 			AssetManager.deleteAsset(elementDescription.getAppId());
 			storeAssets();
diff --git a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/verification/DataStreamVerifier.java b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/verification/DataStreamVerifier.java
index b7d1b90..337acc9 100644
--- a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/verification/DataStreamVerifier.java
+++ b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/verification/DataStreamVerifier.java
@@ -18,7 +18,8 @@
 
 package org.apache.streampipes.manager.verification;
 
-import org.apache.streampipes.commons.constants.PipelineElementUrl;
+import org.apache.streampipes.commons.exceptions.NoServiceEndpointsAvailableException;
+import org.apache.streampipes.svcdiscovery.api.model.SpServiceUrlProvider;
 import org.apache.streampipes.manager.assets.AssetManager;
 import org.apache.streampipes.model.SpDataStream;
 
@@ -71,14 +72,14 @@ public class DataStreamVerifier extends ElementVerifier<SpDataStream> {
   }
 
   @Override
-  protected void storeAssets() throws IOException {
+  protected void storeAssets() throws IOException, NoServiceEndpointsAvailableException {
     if (elementDescription.isIncludesAssets()) {
-      AssetManager.storeAsset(PipelineElementUrl.DATA_STREAM, elementDescription.getAppId());
+      AssetManager.storeAsset(SpServiceUrlProvider.DATA_STREAM, elementDescription.getAppId());
     }
   }
 
   @Override
-  protected void updateAssets() throws IOException {
+  protected void updateAssets() throws IOException, NoServiceEndpointsAvailableException {
     if (elementDescription.isIncludesAssets()) {
       AssetManager.deleteAsset(elementDescription.getAppId());
       storeAssets();
diff --git a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/verification/ElementVerifier.java b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/verification/ElementVerifier.java
index e928634..554ea6e 100644
--- a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/verification/ElementVerifier.java
+++ b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/verification/ElementVerifier.java
@@ -19,6 +19,7 @@
 package org.apache.streampipes.manager.verification;
 
 import com.fasterxml.jackson.core.JsonProcessingException;
+import org.apache.streampipes.commons.exceptions.NoServiceEndpointsAvailableException;
 import org.apache.streampipes.commons.exceptions.SepaParseException;
 import org.apache.streampipes.manager.storage.UserManagementService;
 import org.apache.streampipes.manager.storage.UserService;
@@ -97,7 +98,7 @@ public abstract class ElementVerifier<T extends NamedStreamPipesEntity> {
       if (state == StorageState.STORED) {
         try {
           storeAssets();
-        } catch (IOException e) {
+        } catch (IOException | NoServiceEndpointsAvailableException e) {
           e.printStackTrace();
         }
         return successMessage();
@@ -123,7 +124,7 @@ public abstract class ElementVerifier<T extends NamedStreamPipesEntity> {
       update(username);
       try {
         updateAssets();
-      } catch (IOException e) {
+      } catch (IOException | NoServiceEndpointsAvailableException e) {
         e.printStackTrace();
       }
       return successMessage();
@@ -133,9 +134,9 @@ public abstract class ElementVerifier<T extends NamedStreamPipesEntity> {
 
   }
 
-  protected abstract void storeAssets() throws IOException;
+  protected abstract void storeAssets() throws IOException, NoServiceEndpointsAvailableException;
 
-  protected abstract void updateAssets() throws IOException;
+  protected abstract void updateAssets() throws IOException, NoServiceEndpointsAvailableException;
 
   private Message errorMessage() {
     return new ErrorMessage(elementDescription.getName(), collectNotifications());
diff --git a/streampipes-rest/src/main/java/org/apache/streampipes/rest/impl/RdfEndpoint.java b/streampipes-rest/src/main/java/org/apache/streampipes/rest/impl/ExtensionsServiceEndpointResource.java
similarity index 76%
rename from streampipes-rest/src/main/java/org/apache/streampipes/rest/impl/RdfEndpoint.java
rename to streampipes-rest/src/main/java/org/apache/streampipes/rest/impl/ExtensionsServiceEndpointResource.java
index e80720d..c5dc776 100644
--- a/streampipes-rest/src/main/java/org/apache/streampipes/rest/impl/RdfEndpoint.java
+++ b/streampipes-rest/src/main/java/org/apache/streampipes/rest/impl/ExtensionsServiceEndpointResource.java
@@ -23,9 +23,10 @@ import org.apache.streampipes.manager.endpoint.EndpointFetcher;
 import org.apache.streampipes.manager.operations.Operations;
 import org.apache.streampipes.model.SpDataSet;
 import org.apache.streampipes.model.base.NamedStreamPipesEntity;
-import org.apache.streampipes.model.client.endpoint.RdfEndpointItem;
+import org.apache.streampipes.model.client.endpoint.ExtensionsServiceEndpoint;
+import org.apache.streampipes.model.client.endpoint.ExtensionsServiceEndpointItem;
 import org.apache.streampipes.rest.shared.annotation.GsonWithIds;
-import org.apache.streampipes.storage.api.IRdfEndpointStorage;
+import org.apache.streampipes.storage.api.IExtensionsServiceEndpointStorage;
 
 import javax.ws.rs.*;
 import javax.ws.rs.core.MediaType;
@@ -36,7 +37,7 @@ import java.util.List;
 import java.util.stream.Collectors;
 
 @Path("/v2/users/{username}/rdfendpoints")
-public class RdfEndpoint extends AbstractRestResource {
+public class ExtensionsServiceEndpointResource extends AbstractRestResource {
 
   @GET
   @Produces(MediaType.APPLICATION_JSON)
@@ -49,9 +50,9 @@ public class RdfEndpoint extends AbstractRestResource {
   @POST
   @Produces(MediaType.APPLICATION_JSON)
   @GsonWithIds
-  public Response addRdfEndpoint(org.apache.streampipes.model.client.endpoint.RdfEndpoint rdfEndpoint) {
+  public Response addRdfEndpoint(ExtensionsServiceEndpoint extensionsServiceEndpoint) {
     getRdfEndpointStorage()
-            .addRdfEndpoint(rdfEndpoint);
+            .addExtensionsServiceEndpoint(extensionsServiceEndpoint);
 
     return Response.status(Response.Status.OK).build();
   }
@@ -64,7 +65,7 @@ public class RdfEndpoint extends AbstractRestResource {
   @GsonWithIds
   public Response removeRdfEndpoint(@PathParam("rdfEndpointId") String rdfEndpointId) {
     getRdfEndpointStorage()
-            .removeRdfEndpoint(rdfEndpointId);
+            .removeExtensionsServiceEndpoint(rdfEndpointId);
 
     return Response.status(Response.Status.OK).build();
   }
@@ -74,9 +75,9 @@ public class RdfEndpoint extends AbstractRestResource {
   @Produces(MediaType.APPLICATION_JSON)
   @GsonWithIds
   public Response getEndpointContents(@PathParam("username") String username) {
-    List<org.apache.streampipes.model.client.endpoint.RdfEndpoint> endpoints = getEndpoints();
+    List<ExtensionsServiceEndpoint> endpoints = getEndpoints();
 
-    List<RdfEndpointItem> items = Operations.getEndpointUriContents(endpoints);
+    List<ExtensionsServiceEndpointItem> items = Operations.getEndpointUriContents(endpoints);
     items.forEach(item -> item.setInstalled(isInstalled(item.getElementId(), username)));
 
     // also add installed elements that are currently not running or available
@@ -90,7 +91,7 @@ public class RdfEndpoint extends AbstractRestResource {
   @POST
   @Path("/items/icon")
   @Produces("image/png")
-  public Response getEndpointItemIcon(RdfEndpointItem endpointItem) {
+  public Response getEndpointItemIcon(ExtensionsServiceEndpointItem endpointItem) {
     try {
       byte[] imageBytes = Request.Get(makeIconUrl(endpointItem)).execute().returnContent().asBytes();
       return ok(imageBytes);
@@ -99,11 +100,11 @@ public class RdfEndpoint extends AbstractRestResource {
     }
   }
 
-  private String makeIconUrl(RdfEndpointItem endpointItem) {
+  private String makeIconUrl(ExtensionsServiceEndpointItem endpointItem) {
     return endpointItem.getUri() + "/assets/icon";
   }
 
-  private List<org.apache.streampipes.model.client.endpoint.RdfEndpoint> getEndpoints() {
+  private List<ExtensionsServiceEndpoint> getEndpoints() {
     return new EndpointFetcher().getEndpoints();
   }
 
@@ -121,25 +122,25 @@ public class RdfEndpoint extends AbstractRestResource {
     return elementUris;
   }
 
-  private List<RdfEndpointItem> getAllDataStreamEndpoints(String username, List<RdfEndpointItem> existingItems) {
+  private List<ExtensionsServiceEndpointItem> getAllDataStreamEndpoints(String username, List<ExtensionsServiceEndpointItem> existingItems) {
     return getAllDataStreamUris(username)
             .stream()
-            .filter(s -> existingItems.stream().noneMatch(item -> item.getElementId().equals(s)))
+            .filter(s -> existingItems.stream().noneMatch(item -> s.equals(item.getElementId())))
             .map(s -> getPipelineElementStorage().getDataStreamById(s))
             .map(stream -> makeItem(stream, stream instanceof SpDataSet ? "set" : "stream"))
             .collect(Collectors.toList());
   }
 
-  private List<RdfEndpointItem> getAllDataProcessorEndpoints(String username, List<RdfEndpointItem> existingItems) {
+  private List<ExtensionsServiceEndpointItem> getAllDataProcessorEndpoints(String username, List<ExtensionsServiceEndpointItem> existingItems) {
     return getAllDataProcessorUris(username)
             .stream()
-            .filter(s -> existingItems.stream().noneMatch(item -> item.getElementId().equals(s)))
+            .filter(s -> existingItems.stream().noneMatch(item -> s.equals(item.getElementId())))
             .map(s -> getPipelineElementStorage().getDataProcessorById(s))
             .map(source -> makeItem(source, "sepa"))
             .collect(Collectors.toList());
   }
 
-  private List<RdfEndpointItem> getAllDataSinkEndpoints(String username, List<RdfEndpointItem> existingItems) {
+  private List<ExtensionsServiceEndpointItem> getAllDataSinkEndpoints(String username, List<ExtensionsServiceEndpointItem> existingItems) {
     return getAllDataSinkUris(username)
             .stream()
             .filter(s -> existingItems.stream().noneMatch(item -> s.equals(item.getElementId())))
@@ -148,8 +149,8 @@ public class RdfEndpoint extends AbstractRestResource {
             .collect(Collectors.toList());
   }
 
-  private RdfEndpointItem makeItem(NamedStreamPipesEntity entity, String type) {
-    RdfEndpointItem endpoint = new RdfEndpointItem();
+  private ExtensionsServiceEndpointItem makeItem(NamedStreamPipesEntity entity, String type) {
+    ExtensionsServiceEndpointItem endpoint = new ExtensionsServiceEndpointItem();
     endpoint.setInstalled(true);
     endpoint.setDescription(entity.getDescription());
     endpoint.setName(entity.getName());
@@ -172,7 +173,7 @@ public class RdfEndpoint extends AbstractRestResource {
     return getUserService().getOwnActionUris(username);
   }
 
-  private IRdfEndpointStorage getRdfEndpointStorage() {
+  private IExtensionsServiceEndpointStorage getRdfEndpointStorage() {
     return getNoSqlStorage().getRdfEndpointStorage();
   }
 }
diff --git a/streampipes-service-discovery-api/src/main/java/org/apache/streampipes/svcdiscovery/api/ISpServiceDiscovery.java b/streampipes-service-discovery-api/src/main/java/org/apache/streampipes/svcdiscovery/api/ISpServiceDiscovery.java
index 2d6630e..40be32b 100644
--- a/streampipes-service-discovery-api/src/main/java/org/apache/streampipes/svcdiscovery/api/ISpServiceDiscovery.java
+++ b/streampipes-service-discovery-api/src/main/java/org/apache/streampipes/svcdiscovery/api/ISpServiceDiscovery.java
@@ -17,7 +17,7 @@
  */
 package org.apache.streampipes.svcdiscovery.api;
 
-import org.apache.streampipes.svcdiscovery.api.model.SpServiceTag;
+import org.apache.streampipes.svcdiscovery.api.model.SpServiceRegistrationRequest;
 
 import java.util.List;
 import java.util.Map;
@@ -33,14 +33,21 @@ public interface ISpServiceDiscovery {
    * @param port      port of service endpoint
    * @param tags      tags of service
    */
-  void registerService(String svcGroup, String svcId, String host, int port, List<SpServiceTag> tags);
+  void registerService(SpServiceRegistrationRequest serviceRegistrationRequest);
 
   /**
    * Get active pipeline element service endpoints
    *
    * @return list of pipeline element endpoints
    */
-  List<String> getActivePeEndpoints();
+  List<String> getActivePipelineElementEndpoints();
+
+  /**
+   * Get active StreamPipes Connect worker endpoints
+   *
+   * @return list of StreamPipes Connect worker endpoints
+   */
+  List<String> getActiveConnectWorkerEndpoints();
 
   /**
    * Get service endpoints
@@ -50,7 +57,8 @@ public interface ISpServiceDiscovery {
    * @param filterByTags        filter param to filter list of registered services
    * @return                    list of services
    */
-  List<String> getServiceEndpoints(String svcGroup, boolean restrictToHealthy,
+  List<String> getServiceEndpoints(String svcGroup,
+                                   boolean restrictToHealthy,
                                    List<String> filterByTags);
 
   /**
diff --git a/streampipes-service-discovery-api/src/main/java/org/apache/streampipes/svcdiscovery/api/model/DefaultSpServiceGroups.java b/streampipes-service-discovery-api/src/main/java/org/apache/streampipes/svcdiscovery/api/model/DefaultSpServiceGroups.java
index d263f07..5fd3452 100644
--- a/streampipes-service-discovery-api/src/main/java/org/apache/streampipes/svcdiscovery/api/model/DefaultSpServiceGroups.java
+++ b/streampipes-service-discovery-api/src/main/java/org/apache/streampipes/svcdiscovery/api/model/DefaultSpServiceGroups.java
@@ -20,5 +20,6 @@ package org.apache.streampipes.svcdiscovery.api.model;
 public class DefaultSpServiceGroups {
 
   public static final String CORE = "core";
+  public static final String EXT = "ext";
 
 }
diff --git a/streampipes-service-discovery-api/src/main/java/org/apache/streampipes/svcdiscovery/api/model/DefaultSpServiceTags.java b/streampipes-service-discovery-api/src/main/java/org/apache/streampipes/svcdiscovery/api/model/DefaultSpServiceTags.java
index 093df8c..fc4f980 100644
--- a/streampipes-service-discovery-api/src/main/java/org/apache/streampipes/svcdiscovery/api/model/DefaultSpServiceTags.java
+++ b/streampipes-service-discovery-api/src/main/java/org/apache/streampipes/svcdiscovery/api/model/DefaultSpServiceTags.java
@@ -19,9 +19,9 @@ package org.apache.streampipes.svcdiscovery.api.model;
 
 public class DefaultSpServiceTags {
 
-  public static final String CORE = "core";
-  public static final String PE = "pe";
-  public static final String CONNECT_MASTER = "connect-master";
-  public static final String CONNECT_WORKER = "connect-worker";
-  public static final String STREAMPIPES_CLIENT = "streampipes-client";
+  public static final SpServiceTag CORE = SpServiceTag.create(SpServiceTagPrefix.SYSTEM, "core");
+  public static final SpServiceTag PE = SpServiceTag.create(SpServiceTagPrefix.SYSTEM, "pe");
+  public static final SpServiceTag CONNECT_MASTER = SpServiceTag.create(SpServiceTagPrefix.SYSTEM, "connect-master");
+  public static final SpServiceTag CONNECT_WORKER = SpServiceTag.create(SpServiceTagPrefix.SYSTEM, "connect-worker");
+  public static final SpServiceTag STREAMPIPES_CLIENT = SpServiceTag.create(SpServiceTagPrefix.SYSTEM, "streampipes-client");
 }
diff --git a/streampipes-commons/src/main/java/org/apache/streampipes/commons/constants/PipelineElementPrefix.java b/streampipes-service-discovery-api/src/main/java/org/apache/streampipes/svcdiscovery/api/model/SpServicePathPrefix.java
similarity index 81%
rename from streampipes-commons/src/main/java/org/apache/streampipes/commons/constants/PipelineElementPrefix.java
rename to streampipes-service-discovery-api/src/main/java/org/apache/streampipes/svcdiscovery/api/model/SpServicePathPrefix.java
index d391381..970ca73 100644
--- a/streampipes-commons/src/main/java/org/apache/streampipes/commons/constants/PipelineElementPrefix.java
+++ b/streampipes-service-discovery-api/src/main/java/org/apache/streampipes/svcdiscovery/api/model/SpServicePathPrefix.java
@@ -15,14 +15,15 @@
  * limitations under the License.
  *
  */
-package org.apache.streampipes.commons.constants;
+package org.apache.streampipes.svcdiscovery.api.model;
 
-public class PipelineElementPrefix {
+public class SpServicePathPrefix {
 
   public static final String DATA_PROCESSOR = "sepa";
   public static final String DATA_SINK = "sec";
   public static final String DATA_STREAM = "stream";
   public static final String DATA_SET = "set";
-  public static final String ADAPTER = "adapter";
+  public static final String ADAPTER = "api/v1/worker/adapters";
+  public static final String PROTOCOL = "api/v1/worker/protocols";
 
 }
diff --git a/streampipes-service-discovery-api/src/main/java/org/apache/streampipes/svcdiscovery/api/model/SpServiceRegistrationRequest.java b/streampipes-service-discovery-api/src/main/java/org/apache/streampipes/svcdiscovery/api/model/SpServiceRegistrationRequest.java
new file mode 100644
index 0000000..4926cdd
--- /dev/null
+++ b/streampipes-service-discovery-api/src/main/java/org/apache/streampipes/svcdiscovery/api/model/SpServiceRegistrationRequest.java
@@ -0,0 +1,109 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.apache.streampipes.svcdiscovery.api.model;
+
+import java.util.List;
+
+public class SpServiceRegistrationRequest {
+
+  private String svcGroup;
+  private String svcId;
+  private String host;
+  private int port;
+  private List<SpServiceTag> tags;
+  private String healthCheckPath;
+
+  public static SpServiceRegistrationRequest from(String svcGroup,
+                                                  String svcId,
+                                                  String host,
+                                                  Integer port,
+                                                  List<SpServiceTag> tags) {
+    return new SpServiceRegistrationRequest(svcGroup, svcId, host, port, tags, "");
+  }
+
+  public static SpServiceRegistrationRequest from(String svcGroup,
+                                                  String svcId,
+                                                  String host,
+                                                  Integer port,
+                                                  List<SpServiceTag> tags,
+                                                  String healthCheckPath) {
+    return new SpServiceRegistrationRequest(svcGroup, svcId, host, port, tags, healthCheckPath);
+  }
+
+  public SpServiceRegistrationRequest(String svcGroup,
+                                      String svcId,
+                                      String host,
+                                      int port,
+                                      List<SpServiceTag> tags,
+                                      String healthCheckPath) {
+    this.svcGroup = svcGroup;
+    this.svcId = svcId;
+    this.host = host;
+    this.port = port;
+    this.tags = tags;
+    this.healthCheckPath = healthCheckPath;
+  }
+
+  public String getSvcGroup() {
+    return svcGroup;
+  }
+
+  public void setSvcGroup(String svcGroup) {
+    this.svcGroup = svcGroup;
+  }
+
+  public String getSvcId() {
+    return svcId;
+  }
+
+  public void setSvcId(String svcId) {
+    this.svcId = svcId;
+  }
+
+  public String getHost() {
+    return host;
+  }
+
+  public void setHost(String host) {
+    this.host = host;
+  }
+
+  public int getPort() {
+    return port;
+  }
+
+  public void setPort(int port) {
+    this.port = port;
+  }
+
+  public List<SpServiceTag> getTags() {
+    return tags;
+  }
+
+  public void setTags(List<SpServiceTag> tags) {
+    this.tags = tags;
+  }
+
+  public String getHealthCheckPath() {
+    return healthCheckPath;
+  }
+
+  public void setHealthCheckPath(String healthCheckPath) {
+    this.healthCheckPath = healthCheckPath;
+  }
+}
diff --git a/streampipes-service-discovery-api/src/main/java/org/apache/streampipes/svcdiscovery/api/model/SpServiceTagPrefix.java b/streampipes-service-discovery-api/src/main/java/org/apache/streampipes/svcdiscovery/api/model/SpServiceTagPrefix.java
index 12dde94..220f813 100644
--- a/streampipes-service-discovery-api/src/main/java/org/apache/streampipes/svcdiscovery/api/model/SpServiceTagPrefix.java
+++ b/streampipes-service-discovery-api/src/main/java/org/apache/streampipes/svcdiscovery/api/model/SpServiceTagPrefix.java
@@ -23,7 +23,8 @@ public enum SpServiceTagPrefix {
   PROTOCOL("protocol"),
   DATA_STREAM("dstream"),
   DATA_PROCESSOR("dprocessor"),
-  DATA_SINK("dsink");
+  DATA_SINK("dsink"),
+  DATA_SET("dset");
 
   private String prefix;
 
diff --git a/streampipes-commons/src/main/java/org/apache/streampipes/commons/constants/PipelineElementUrl.java b/streampipes-service-discovery-api/src/main/java/org/apache/streampipes/svcdiscovery/api/model/SpServiceUrlProvider.java
similarity index 67%
rename from streampipes-commons/src/main/java/org/apache/streampipes/commons/constants/PipelineElementUrl.java
rename to streampipes-service-discovery-api/src/main/java/org/apache/streampipes/svcdiscovery/api/model/SpServiceUrlProvider.java
index 58d0ba7..68f54c2 100644
--- a/streampipes-commons/src/main/java/org/apache/streampipes/commons/constants/PipelineElementUrl.java
+++ b/streampipes-service-discovery-api/src/main/java/org/apache/streampipes/svcdiscovery/api/model/SpServiceUrlProvider.java
@@ -15,22 +15,27 @@
  * limitations under the License.
  *
  */
-package org.apache.streampipes.commons.constants;
+package org.apache.streampipes.svcdiscovery.api.model;
 
-public enum PipelineElementUrl {
+public enum SpServiceUrlProvider {
 
-  DATA_PROCESSOR(PipelineElementPrefix.DATA_PROCESSOR),
-  DATA_SINK(PipelineElementPrefix.DATA_SINK),
-  DATA_STREAM(PipelineElementPrefix.DATA_STREAM),
-  DATA_SET(PipelineElementPrefix.DATA_SET),
-  ADAPTER(PipelineElementPrefix.ADAPTER);
+  DATA_PROCESSOR(SpServicePathPrefix.DATA_PROCESSOR, SpServiceTagPrefix.DATA_PROCESSOR),
+  DATA_SINK(SpServicePathPrefix.DATA_SINK, SpServiceTagPrefix.DATA_SINK),
+  DATA_STREAM(SpServicePathPrefix.DATA_STREAM, SpServiceTagPrefix.DATA_STREAM),
+  DATA_SET(SpServicePathPrefix.DATA_SET, SpServiceTagPrefix.DATA_SET),
+  ADAPTER(SpServicePathPrefix.ADAPTER, SpServiceTagPrefix.ADAPTER),
+  PROTOCOL(SpServicePathPrefix.PROTOCOL, SpServiceTagPrefix.PROTOCOL);
 
   private final String HTTP = "http://";
   private final String SLASH = "/";
+
   private final String prefix;
+  private final SpServiceTagPrefix serviceTagPrefix;
 
-  PipelineElementUrl(String prefix) {
+  SpServiceUrlProvider(String prefix,
+                       SpServiceTagPrefix serviceTagPrefix) {
     this.prefix = prefix;
+    this.serviceTagPrefix = serviceTagPrefix;
   }
 
   public String getPrefix() {
@@ -72,5 +77,11 @@ public enum PipelineElementUrl {
             + invocationId;
   }
 
+  public SpServiceTagPrefix getServiceTagPrefix() {
+    return serviceTagPrefix;
+  }
 
+  public SpServiceTag getServiceTag(String appId) {
+    return SpServiceTag.create(serviceTagPrefix, appId);
+  }
 }
diff --git a/streampipes-service-discovery-consul/src/main/java/org/apache/streampipes/svcdiscovery/consul/SpConsulServiceDiscovery.java b/streampipes-service-discovery-consul/src/main/java/org/apache/streampipes/svcdiscovery/consul/SpConsulServiceDiscovery.java
index 105d6da..884546e 100644
--- a/streampipes-service-discovery-consul/src/main/java/org/apache/streampipes/svcdiscovery/consul/SpConsulServiceDiscovery.java
+++ b/streampipes-service-discovery-consul/src/main/java/org/apache/streampipes/svcdiscovery/consul/SpConsulServiceDiscovery.java
@@ -27,6 +27,9 @@ import org.apache.http.client.fluent.Request;
 import org.apache.http.entity.StringEntity;
 import org.apache.streampipes.serializers.json.JacksonSerializer;
 import org.apache.streampipes.svcdiscovery.api.ISpServiceDiscovery;
+import org.apache.streampipes.svcdiscovery.api.model.DefaultSpServiceGroups;
+import org.apache.streampipes.svcdiscovery.api.model.DefaultSpServiceTags;
+import org.apache.streampipes.svcdiscovery.api.model.SpServiceRegistrationRequest;
 import org.apache.streampipes.svcdiscovery.api.model.SpServiceTag;
 import org.apache.streampipes.svcdiscovery.consul.model.ConsulServiceRegistrationBody;
 import org.apache.streampipes.svcdiscovery.consul.model.HealthCheckConfiguration;
@@ -48,16 +51,12 @@ public class SpConsulServiceDiscovery extends AbstractConsulService implements I
   private static final String PE_SVC_TAG = "pe";
 
   @Override
-  public void registerService(String svcGroup,
-                              String svcId,
-                              String host,
-                              int port,
-                              List<SpServiceTag> tags) {
+  public void registerService(SpServiceRegistrationRequest req) {
     boolean connected = false;
 
     while (!connected) {
-      LOG.info("Trying to register service at Consul with svcGroup={}, svcId={} host={}, port={}. ", svcGroup, svcId, host, port);
-      ConsulServiceRegistrationBody svcRegistration = createRegistrationBody(svcGroup, svcId, host, port, asString(tags));
+      LOG.info("Trying to register service at Consul with svcGroup={}, svcId={} host={}, port={}. ", req.getSvcGroup(), req.getSvcId(), req.getHost(), req.getPort());
+      ConsulServiceRegistrationBody svcRegistration = createRegistrationBody(req);
       connected = registerServiceHttpClient(svcRegistration);
 
       if (!connected) {
@@ -69,7 +68,7 @@ public class SpConsulServiceDiscovery extends AbstractConsulService implements I
         }
       }
     }
-    LOG.info("Successfully registered service at Consul: " + svcId);
+    LOG.info("Successfully registered service at Consul: " + req.getSvcId());
   }
 
   private List<String> asString(List<SpServiceTag> tags) {
@@ -77,9 +76,17 @@ public class SpConsulServiceDiscovery extends AbstractConsulService implements I
   }
 
   @Override
-  public List<String> getActivePeEndpoints() {
-    LOG.info("Load active pipeline element service endpoints");
-    return getServiceEndpoints(PE_SVC_TAG, true, Collections.singletonList(PE_SVC_TAG));
+  public List<String> getActivePipelineElementEndpoints() {
+    LOG.info("Discovering active pipeline element service endpoints");
+    return getServiceEndpoints(DefaultSpServiceGroups.EXT, true,
+            Collections.singletonList(DefaultSpServiceTags.PE.asString()));
+  }
+
+  @Override
+  public List<String> getActiveConnectWorkerEndpoints() {
+    LOG.info("Discovering active StreamPipes Connect worker service endpoints");
+    return getServiceEndpoints(DefaultSpServiceGroups.EXT, true,
+            Collections.singletonList(DefaultSpServiceTags.CONNECT_WORKER.asString()));
   }
 
   @Override
@@ -144,24 +151,6 @@ public class SpConsulServiceDiscovery extends AbstractConsulService implements I
    */
   private boolean registerServiceHttpClient(ConsulServiceRegistrationBody svcRegistration) {
     try {
-
-//      Consul client = Consul.builder().withHostAndPort(HostAndPort.fromParts("localhost", 8500)).build();
-//      AgentClient agentClient = client.agentClient();
-//
-//      String serviceId = "2";
-//      Registration service = ImmutableRegistration.builder()
-//              .id(serviceId)
-//              .name(svcRegistration.getName())
-//              .port(svcRegistration.getPort())
-//              .address("http://" + InetAddress.getLocalHost().getHostAddress())
-//              .check(Registration.RegCheck.http(InetAddress.getLocalHost().getHostAddress() + COLON + svcRegistration.getPort(), 10000))
-//                   //.check(Registration.RegCheck.ttl(3L)) // registers with a TTL of 3 seconds
-//              .tags(Collections.singletonList("tag1"))
-//              .meta(Collections.singletonMap("version", "1.0"))
-//              .build();
-//
-//      agentClient.register(service);
-
       String endpoint = makeConsulEndpoint();
       String body = JacksonSerializer.getObjectMapper().writeValueAsString(svcRegistration);
 
@@ -177,17 +166,19 @@ public class SpConsulServiceDiscovery extends AbstractConsulService implements I
     return false;
   }
 
-  private static ConsulServiceRegistrationBody createRegistrationBody(String svcGroup, String id, String host,
-                                                                      int port, List<String> tags) {
+  private ConsulServiceRegistrationBody createRegistrationBody(SpServiceRegistrationRequest req) {
     ConsulServiceRegistrationBody body = new ConsulServiceRegistrationBody();
-    body.setID(id);
-    body.setName(svcGroup);
-    body.setTags(tags);
-    body.setAddress(HTTP_PROTOCOL + host);
-    body.setPort(port);
+    body.setID(req.getSvcId());
+    body.setName(req.getSvcGroup());
+    body.setTags(asString(req.getTags()));
+    body.setAddress(HTTP_PROTOCOL + req.getHost());
+    body.setPort(req.getPort());
     body.setEnableTagOverride(true);
-    body.setCheck(new HealthCheckConfiguration("GET",
-            (HTTP_PROTOCOL + host + COLON + port), HEALTH_CHECK_INTERVAL));
+    body.setCheck(new HealthCheckConfiguration(
+            "GET",
+            (HTTP_PROTOCOL + req.getHost() + COLON + req.getPort() + req.getHealthCheckPath()),
+            HEALTH_CHECK_INTERVAL,
+            "60s"));
 
     return body;
   }
diff --git a/streampipes-service-discovery-consul/src/main/java/org/apache/streampipes/svcdiscovery/consul/model/HealthCheckConfiguration.java b/streampipes-service-discovery-consul/src/main/java/org/apache/streampipes/svcdiscovery/consul/model/HealthCheckConfiguration.java
index 94d7342..ed5ca91 100644
--- a/streampipes-service-discovery-consul/src/main/java/org/apache/streampipes/svcdiscovery/consul/model/HealthCheckConfiguration.java
+++ b/streampipes-service-discovery-consul/src/main/java/org/apache/streampipes/svcdiscovery/consul/model/HealthCheckConfiguration.java
@@ -21,14 +21,19 @@ public class HealthCheckConfiguration {
     private String Method;
     private String http;
     private String interval;
+    private String deregister_critical_service_after;
 
     public HealthCheckConfiguration() {
     }
 
-    public HealthCheckConfiguration(String method, String http, String interval) {
+    public HealthCheckConfiguration(String method,
+                                    String http,
+                                    String interval,
+                                    String deregisterAfter) {
         Method = method;
         this.http = http;
         this.interval = interval;
+        this.deregister_critical_service_after = deregisterAfter;
     }
 
     public String getMethod() {
@@ -54,4 +59,12 @@ public class HealthCheckConfiguration {
     public void setInterval(String interval) {
         this.interval = interval;
     }
+
+    public String getDeregister_critical_service_after() {
+        return deregister_critical_service_after;
+    }
+
+    public void setDeregister_critical_service_after(String deregister_critical_service_after) {
+        this.deregister_critical_service_after = deregister_critical_service_after;
+    }
 }
diff --git a/streampipes-service-extensions-base/src/main/java/org/apache/streampipes/service/extensions/base/StreamPipesExtensionsServiceBase.java b/streampipes-service-extensions-base/src/main/java/org/apache/streampipes/service/extensions/base/StreamPipesExtensionsServiceBase.java
index f9adc7e..c7fb289 100644
--- a/streampipes-service-extensions-base/src/main/java/org/apache/streampipes/service/extensions/base/StreamPipesExtensionsServiceBase.java
+++ b/streampipes-service-extensions-base/src/main/java/org/apache/streampipes/service/extensions/base/StreamPipesExtensionsServiceBase.java
@@ -21,6 +21,7 @@ package org.apache.streampipes.service.extensions.base;
 import org.apache.streampipes.container.base.StreamPipesServiceBase;
 import org.apache.streampipes.container.init.DeclarersSingleton;
 import org.apache.streampipes.container.model.SpServiceDefinition;
+import org.apache.streampipes.svcdiscovery.api.model.DefaultSpServiceGroups;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -58,7 +59,7 @@ public abstract class StreamPipesExtensionsServiceBase extends StreamPipesServic
                                        SpServiceDefinition serviceDef) throws UnknownHostException {
         this.startStreamPipesService(
                 serviceClass,
-                serviceDef.getServiceGroup(),
+                DefaultSpServiceGroups.EXT,
                 serviceDef.getServiceId(),
                 serviceDef.getDefaultPort()
         );
diff --git a/streampipes-storage-api/src/main/java/org/apache/streampipes/storage/api/IRdfEndpointStorage.java b/streampipes-storage-api/src/main/java/org/apache/streampipes/storage/api/IExtensionsServiceEndpointStorage.java
similarity index 70%
rename from streampipes-storage-api/src/main/java/org/apache/streampipes/storage/api/IRdfEndpointStorage.java
rename to streampipes-storage-api/src/main/java/org/apache/streampipes/storage/api/IExtensionsServiceEndpointStorage.java
index 861bbe3..e1b7903 100644
--- a/streampipes-storage-api/src/main/java/org/apache/streampipes/storage/api/IRdfEndpointStorage.java
+++ b/streampipes-storage-api/src/main/java/org/apache/streampipes/storage/api/IExtensionsServiceEndpointStorage.java
@@ -18,15 +18,15 @@
 
 package org.apache.streampipes.storage.api;
 
-import org.apache.streampipes.model.client.endpoint.RdfEndpoint;
+import org.apache.streampipes.model.client.endpoint.ExtensionsServiceEndpoint;
 
 import java.util.List;
 
-public interface IRdfEndpointStorage {
+public interface IExtensionsServiceEndpointStorage {
 
-    void addRdfEndpoint(RdfEndpoint rdfEndpoint);
+    void addExtensionsServiceEndpoint(ExtensionsServiceEndpoint extensionsServiceEndpoint);
 
-    void removeRdfEndpoint(String rdfEndpointId);
+    void removeExtensionsServiceEndpoint(String extensionServiceEndpointId);
 
-    List<RdfEndpoint> getRdfEndpoints();
+    List<ExtensionsServiceEndpoint> getExtensionsServiceEndpoints();
 }
diff --git a/streampipes-storage-api/src/main/java/org/apache/streampipes/storage/api/INoSqlStorage.java b/streampipes-storage-api/src/main/java/org/apache/streampipes/storage/api/INoSqlStorage.java
index cdb9ba4..54f8f27 100644
--- a/streampipes-storage-api/src/main/java/org/apache/streampipes/storage/api/INoSqlStorage.java
+++ b/streampipes-storage-api/src/main/java/org/apache/streampipes/storage/api/INoSqlStorage.java
@@ -41,7 +41,7 @@ public interface INoSqlStorage {
 
   IVisualizationStorage getVisualizationStorageApi();
 
-  IRdfEndpointStorage getRdfEndpointStorage();
+  IExtensionsServiceEndpointStorage getRdfEndpointStorage();
 
   IAssetDashboardStorage getAssetDashboardStorage();
 
diff --git a/streampipes-storage-couchdb/src/main/java/org/apache/streampipes/storage/couchdb/CouchDbStorageManager.java b/streampipes-storage-couchdb/src/main/java/org/apache/streampipes/storage/couchdb/CouchDbStorageManager.java
index 608f1f3..288ae0f 100644
--- a/streampipes-storage-couchdb/src/main/java/org/apache/streampipes/storage/couchdb/CouchDbStorageManager.java
+++ b/streampipes-storage-couchdb/src/main/java/org/apache/streampipes/storage/couchdb/CouchDbStorageManager.java
@@ -76,8 +76,8 @@ public enum CouchDbStorageManager implements INoSqlStorage {
   }
 
   @Override
-  public IRdfEndpointStorage getRdfEndpointStorage() {
-    return new RdfEndpointStorageImpl();
+  public IExtensionsServiceEndpointStorage getRdfEndpointStorage() {
+    return new ExtensionsServiceEndpointStorageImpl();
   }
 
   @Override
diff --git a/streampipes-storage-couchdb/src/main/java/org/apache/streampipes/storage/couchdb/impl/RdfEndpointStorageImpl.java b/streampipes-storage-couchdb/src/main/java/org/apache/streampipes/storage/couchdb/impl/ExtensionsServiceEndpointStorageImpl.java
similarity index 55%
rename from streampipes-storage-couchdb/src/main/java/org/apache/streampipes/storage/couchdb/impl/RdfEndpointStorageImpl.java
rename to streampipes-storage-couchdb/src/main/java/org/apache/streampipes/storage/couchdb/impl/ExtensionsServiceEndpointStorageImpl.java
index e2ffb70..c9d454f 100644
--- a/streampipes-storage-couchdb/src/main/java/org/apache/streampipes/storage/couchdb/impl/RdfEndpointStorageImpl.java
+++ b/streampipes-storage-couchdb/src/main/java/org/apache/streampipes/storage/couchdb/impl/ExtensionsServiceEndpointStorageImpl.java
@@ -18,33 +18,32 @@
 
 package org.apache.streampipes.storage.couchdb.impl;
 
-import org.apache.streampipes.model.client.endpoint.RdfEndpoint;
-import org.apache.streampipes.storage.api.IRdfEndpointStorage;
+import org.apache.streampipes.model.client.endpoint.ExtensionsServiceEndpoint;
+import org.apache.streampipes.storage.api.IExtensionsServiceEndpointStorage;
 import org.apache.streampipes.storage.couchdb.dao.AbstractDao;
 import org.apache.streampipes.storage.couchdb.utils.Utils;
 
 import java.util.List;
 
-public class RdfEndpointStorageImpl extends AbstractDao<RdfEndpoint> implements IRdfEndpointStorage {
+public class ExtensionsServiceEndpointStorageImpl extends AbstractDao<ExtensionsServiceEndpoint> implements IExtensionsServiceEndpointStorage {
 
-    public RdfEndpointStorageImpl() {
-        super(Utils::getCouchDbRdfEndpointClient, RdfEndpoint.class);
-    }
+  public ExtensionsServiceEndpointStorageImpl() {
+    super(Utils::getCouchDbRdfEndpointClient, ExtensionsServiceEndpoint.class);
+  }
 
+  @Override
+  public void addExtensionsServiceEndpoint(ExtensionsServiceEndpoint extensionsServiceEndpoint) {
+    persist(extensionsServiceEndpoint);
+  }
 
-    @Override
-    public void addRdfEndpoint(RdfEndpoint rdfEndpoint) {
-        persist(rdfEndpoint);
-    }
+  @Override
+  public void removeExtensionsServiceEndpoint(String rdfEndpointId) {
+    delete(rdfEndpointId);
+  }
 
-    @Override
-    public void removeRdfEndpoint(String rdfEndpointId) {
-        delete(rdfEndpointId)
-;    }
-
-    @Override
-    public List<RdfEndpoint> getRdfEndpoints() {
-        return findAll();
-    }
+  @Override
+  public List<ExtensionsServiceEndpoint> getExtensionsServiceEndpoints() {
+    return findAll();
+  }
 
 }
diff --git a/ui/src/app/connect/services/data-marketplace.service.ts b/ui/src/app/connect/services/data-marketplace.service.ts
index 391b7f1..aca69a5 100644
--- a/ui/src/app/connect/services/data-marketplace.service.ts
+++ b/ui/src/app/connect/services/data-marketplace.service.ts
@@ -79,14 +79,14 @@ export class DataMarketplaceService {
 
   stopAdapter(adapter: AdapterDescriptionUnion): Observable<Message> {
     return this.http.post(this.adapterMasterUrl
-        + adapter.couchDBId
+        + adapter.id
         + "/stop", {})
         .pipe(map(response => Message.fromData(response as any)));
   }
 
   startAdapter(adapter: AdapterDescriptionUnion): Observable<Message> {
     return this.http.post(this.adapterMasterUrl
-        + adapter.couchDBId
+        + adapter.id
         + "/start", {})
         .pipe(map(response => Message.fromData(response as any)));;
   }
@@ -116,7 +116,7 @@ export class DataMarketplaceService {
         this.host +
         this.authStatusService.email +
         url +
-        adapter.couchDBId
+        adapter.id
     );
   }
 
diff --git a/ui/src/app/core-model/gen/streampipes-model.ts b/ui/src/app/core-model/gen/streampipes-model.ts
index 4fa3957..9ac229d 100644
--- a/ui/src/app/core-model/gen/streampipes-model.ts
+++ b/ui/src/app/core-model/gen/streampipes-model.ts
@@ -19,7 +19,7 @@
 /* tslint:disable */
 /* eslint-disable */
 // @ts-nocheck
-// Generated using typescript-generator version 2.27.744 on 2021-05-24 18:53:06.
+// Generated using typescript-generator version 2.27.744 on 2021-06-21 21:05:13.
 
 export class AbstractStreamPipesEntity {
     "@class": "org.apache.streampipes.model.base.AbstractStreamPipesEntity" | "org.apache.streampipes.model.base.NamedStreamPipesEntity" | "org.apache.streampipes.model.connect.adapter.AdapterDescription" | "org.apache.streampipes.model.connect.adapter.AdapterSetDescription" | "org.apache.streampipes.model.connect.adapter.GenericAdapterSetDescription" | "org.apache.streampipes.model.connect.adapter.SpecificAdapterSetDescription" | "org.apache.streampipes.model.connect.adapter.AdapterStre [...]
@@ -120,6 +120,7 @@ export class Accuracy extends EventPropertyQualityDefinition {
 
 export class NamedStreamPipesEntity extends AbstractStreamPipesEntity {
     "@class": "org.apache.streampipes.model.base.NamedStreamPipesEntity" | "org.apache.streampipes.model.connect.adapter.AdapterDescription" | "org.apache.streampipes.model.connect.adapter.AdapterSetDescription" | "org.apache.streampipes.model.connect.adapter.GenericAdapterSetDescription" | "org.apache.streampipes.model.connect.adapter.SpecificAdapterSetDescription" | "org.apache.streampipes.model.connect.adapter.AdapterStreamDescription" | "org.apache.streampipes.model.connect.adapter.G [...]
+    _rev: string;
     appId: string;
     applicationLinks: ApplicationLink[];
     connectedTo: string[];
@@ -151,25 +152,26 @@ export class NamedStreamPipesEntity extends AbstractStreamPipesEntity {
         instance.applicationLinks = __getCopyArrayFn(ApplicationLink.fromData)(data.applicationLinks);
         instance.internallyManaged = data.internallyManaged;
         instance.connectedTo = __getCopyArrayFn(__identity<string>())(data.connectedTo);
-        instance.dom = data.dom;
         instance.uri = data.uri;
+        instance.dom = data.dom;
+        instance._rev = data._rev;
         return instance;
     }
 }
 
 export class AdapterDescription extends NamedStreamPipesEntity {
     "@class": "org.apache.streampipes.model.connect.adapter.AdapterDescription" | "org.apache.streampipes.model.connect.adapter.AdapterSetDescription" | "org.apache.streampipes.model.connect.adapter.GenericAdapterSetDescription" | "org.apache.streampipes.model.connect.adapter.SpecificAdapterSetDescription" | "org.apache.streampipes.model.connect.adapter.AdapterStreamDescription" | "org.apache.streampipes.model.connect.adapter.GenericAdapterStreamDescription" | "org.apache.streampipes.mod [...]
-    _rev: string;
     adapterId: string;
     adapterType: string;
     category: string[];
     config: StaticPropertyUnion[];
-    couchDBId: string;
     createdAt: number;
     eventGrounding: EventGrounding;
     icon: string;
+    id: string;
     rules: TransformationRuleDescriptionUnion[];
     schemaRules: any[];
+    selectedEndpointUrl: string;
     streamRules: any[];
     userName: string;
     valueRules: any[];
@@ -189,11 +191,11 @@ export class AdapterDescription extends NamedStreamPipesEntity {
         instance.rules = __getCopyArrayFn(TransformationRuleDescription.fromDataUnion)(data.rules);
         instance.category = __getCopyArrayFn(__identity<string>())(data.category);
         instance.createdAt = data.createdAt;
+        instance.selectedEndpointUrl = data.selectedEndpointUrl;
+        instance.id = data.id;
+        instance.streamRules = __getCopyArrayFn(__identity<any>())(data.streamRules);
         instance.valueRules = __getCopyArrayFn(__identity<any>())(data.valueRules);
         instance.schemaRules = __getCopyArrayFn(__identity<any>())(data.schemaRules);
-        instance.streamRules = __getCopyArrayFn(__identity<any>())(data.streamRules);
-        instance.couchDBId = data.couchDBId;
-        instance._rev = data._rev;
         return instance;
     }
 
@@ -979,6 +981,7 @@ export class InvocableStreamPipesEntity extends NamedStreamPipesEntity {
     correspondingPipeline: string;
     correspondingUser: string;
     inputStreams: SpDataStreamUnion[];
+    selectedEndpointUrl: string;
     staticProperties: StaticPropertyUnion[];
     statusInfoSettings: ElementStatusInfoSettings;
     streamRequirements: SpDataStreamUnion[];
@@ -1001,6 +1004,7 @@ export class InvocableStreamPipesEntity extends NamedStreamPipesEntity {
         instance.streamRequirements = __getCopyArrayFn(SpDataStream.fromDataUnion)(data.streamRequirements);
         instance.configured = data.configured;
         instance.uncompleted = data.uncompleted;
+        instance.selectedEndpointUrl = data.selectedEndpointUrl;
         return instance;
     }
 }
@@ -1647,9 +1651,9 @@ export class GenericAdapterSetDescription extends AdapterSetDescription implemen
         }
         const instance = target || new GenericAdapterSetDescription();
         super.fromData(data, instance);
-        instance.protocolDescription = ProtocolDescription.fromData(data.protocolDescription);
-        instance.formatDescription = FormatDescription.fromData(data.formatDescription);
         instance.eventSchema = EventSchema.fromData(data.eventSchema);
+        instance.formatDescription = FormatDescription.fromData(data.formatDescription);
+        instance.protocolDescription = ProtocolDescription.fromData(data.protocolDescription);
         return instance;
     }
 }
@@ -1666,9 +1670,9 @@ export class GenericAdapterStreamDescription extends AdapterStreamDescription im
         }
         const instance = target || new GenericAdapterStreamDescription();
         super.fromData(data, instance);
-        instance.protocolDescription = ProtocolDescription.fromData(data.protocolDescription);
-        instance.formatDescription = FormatDescription.fromData(data.formatDescription);
         instance.eventSchema = EventSchema.fromData(data.eventSchema);
+        instance.formatDescription = FormatDescription.fromData(data.formatDescription);
+        instance.protocolDescription = ProtocolDescription.fromData(data.protocolDescription);
         return instance;
     }
 }