You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@streampipes.apache.org by ze...@apache.org on 2021/10/11 13:00:17 UTC

[incubator-streampipes] 01/04: [STREAMPIPES-438] Harmonize Model Submitter

This is an automated email from the ASF dual-hosted git repository.

zehnder pushed a commit to branch STREAMPIPES-438
in repository https://gitbox.apache.org/repos/asf/incubator-streampipes.git

commit 62074315c97564eb6423e70a46b99b7691af490f
Author: Philipp Zehnder <ze...@fzi.de>
AuthorDate: Mon Oct 11 11:42:51 2021 +0200

    [STREAMPIPES-438] Harmonize Model Submitter
---
 .../master/health/AdapterHealthCheck.java          |  75 +++++++++++---
 .../master/management/AdapterMasterManagement.java | 113 ++++++++++-----------
 .../master/management/SourcesManagement.java       |   8 --
 .../management/WorkerAdministrationManagement.java |  59 -----------
 .../master/management/WorkerRestClient.java        |  28 ++---
 .../connect/container/master/util/WorkerPaths.java |   4 +
 .../master/health/AdapterHealthCheckTest.java      |  87 ++++++++++++++++
 .../management/DescriptionManagementTest.java      |   1 -
 .../master/management/SourcesManagementTest.java   |   3 +-
 .../worker/management/AdapterWorkerManagement.java |   4 +-
 .../worker/rest/AdapterWorkerResource.java         |   1 -
 .../container/worker/utils/AdapterUtils.java       |  26 -----
 .../container/worker/utils/AdapterUtilsTest.java   |  52 ----------
 .../rest/impl/connect/AdapterResource.java         |   4 +-
 .../rest/impl/connect/GuessResource.java           |   3 +-
 .../impl/connect/RuntimeResolvableResource.java    |   4 +-
 .../rest/impl/connect/SourcesResource.java         |   3 -
 .../impl/connect/WorkerAdministrationResource.java |   5 +-
 .../builder/adapter/AdapterDescriptionBuilder.java |   9 +-
 19 files changed, 229 insertions(+), 260 deletions(-)

diff --git a/streampipes-connect-container-master/src/main/java/org/apache/streampipes/connect/container/master/health/AdapterHealthCheck.java b/streampipes-connect-container-master/src/main/java/org/apache/streampipes/connect/container/master/health/AdapterHealthCheck.java
index 0a6029c..8be0b6a 100644
--- a/streampipes-connect-container-master/src/main/java/org/apache/streampipes/connect/container/master/health/AdapterHealthCheck.java
+++ b/streampipes-connect-container-master/src/main/java/org/apache/streampipes/connect/container/master/health/AdapterHealthCheck.java
@@ -21,7 +21,9 @@ package org.apache.streampipes.connect.container.master.health;
 import org.apache.streampipes.connect.api.exception.AdapterException;
 import org.apache.streampipes.connect.container.master.management.AdapterMasterManagement;
 import org.apache.streampipes.connect.container.master.management.WorkerRestClient;
+import org.apache.streampipes.connect.container.master.util.WorkerPaths;
 import org.apache.streampipes.model.connect.adapter.AdapterDescription;
+import org.apache.streampipes.model.connect.adapter.AdapterStreamDescription;
 import org.apache.streampipes.storage.api.IAdapterStorage;
 import org.apache.streampipes.storage.couchdb.CouchDbStorageManager;
 
@@ -29,51 +31,96 @@ import java.util.*;
 
 public class AdapterHealthCheck {
 
+    private IAdapterStorage adapterStorage;
+    private AdapterMasterManagement adapterMasterManagement;
+
     public AdapterHealthCheck() {
+        this.adapterStorage = CouchDbStorageManager.INSTANCE.getAdapterInstanceStorage();
+        this.adapterMasterManagement = new AdapterMasterManagement();
+    }
+
+    public AdapterHealthCheck(IAdapterStorage adapterStorage, AdapterMasterManagement adapterMasterManagement) {
+        this.adapterStorage = adapterStorage;
+        this.adapterMasterManagement = adapterMasterManagement;
     }
 
-    // TODO how can I test this code?
+    /**
+     * In this method it is checked which adapters are currently running. Then it calls all workers to validate if the adapter instance is
+     * still running as expected. If the adapter is not running anymore a new worker instance is invoked.
+     */
     public void checkAndRestoreAdapters() {
-        AdapterMasterManagement adapterMasterManagement = new AdapterMasterManagement();
+        // Get all adapters
+        Map<String, AdapterDescription> allRunningInstancesAdapterDescriptions = this.getAllRunningInstancesAdapterDescriptions();
 
-        IAdapterStorage adapterStorage = CouchDbStorageManager.INSTANCE.getAdapterInstanceStorage();
+        // Get all worker containers that run adapters
+        Map<String, List<AdapterDescription>> groupByWorker = this.getAllWorkersWithAdapters(allRunningInstancesAdapterDescriptions);
 
-        List<AdapterDescription> allAdapersToRecover = new ArrayList<>();
+        // Get adapters that are not running anymore
+        Map<String, AdapterDescription> allAdaptersToRecover = this.getAdaptersToRecover(groupByWorker, allRunningInstancesAdapterDescriptions);
 
-        // Get all adapters
-        List<AdapterDescription> allRunningInstancesAdaperDescription = adapterStorage.getAllAdapters();
+        // Recover Adapters
+        this.recoverAdapters(allAdaptersToRecover);
+    }
+
+    public Map<String, AdapterDescription> getAllRunningInstancesAdapterDescriptions() {
+        Map<String, AdapterDescription> result = new HashMap<>();
+        List<AdapterDescription> allRunningInstancesAdapterDescription = this.adapterStorage.getAllAdapters();
+        allRunningInstancesAdapterDescription.forEach(adapterDescription -> {
+            result.put(adapterDescription.getElementId(), adapterDescription);
+        });
+
+        return result;
+    }
+
+    public Map<String, List<AdapterDescription>> getAllWorkersWithAdapters(
+            Map<String, AdapterDescription> allRunningInstancesAdapterDescription) {
 
         Map<String, List<AdapterDescription>> groupByWorker = new HashMap<>();
-        allRunningInstancesAdaperDescription.forEach(ad -> {
+        allRunningInstancesAdapterDescription.values().forEach(ad -> {
             String selectedEndpointUrl = ad.getSelectedEndpointUrl();
             if (groupByWorker.containsKey(selectedEndpointUrl)) {
                 groupByWorker.get(selectedEndpointUrl).add(ad);
             } else {
-                List<AdapterDescription> tmp = Arrays.asList(ad);
+                List<AdapterDescription> tmp = new ArrayList<>();
+                tmp.add(ad);
                 groupByWorker.put(selectedEndpointUrl, tmp);
             }
         });
 
+        return groupByWorker;
+    }
+
+    public Map<String, AdapterDescription> getAdaptersToRecover(
+            Map<String, List<AdapterDescription>> groupByWorker,
+            Map<String, AdapterDescription> allRunningInstancesAdapterDescription) {
         groupByWorker.keySet().forEach(adapterEndpointUrl -> {
             try {
-                List<AdapterDescription> allRunningInstancesOfOneWorker = WorkerRestClient.getAllRunningAdapterInstanceDescriptions("");
-                // TODO Remove all running adapters from allRunningInstancesAdaperDescription
+                List<AdapterDescription> allRunningInstancesOfOneWorker = WorkerRestClient.getAllRunningAdapterInstanceDescriptions(adapterEndpointUrl + WorkerPaths.getRunningAdaptersPath());
+                allRunningInstancesOfOneWorker.forEach(adapterDescription -> {
+                    allRunningInstancesAdapterDescription.remove(adapterDescription.getElementId());
+                });
             } catch (AdapterException e) {
                 e.printStackTrace();
             }
         });
 
-        for (AdapterDescription adapterDescription : allAdapersToRecover) {
-            // TODO how do I know there is a worker to start them?
+        return allRunningInstancesAdapterDescription;
+    }
+
 
+    public boolean recoverAdapters(Map<String, AdapterDescription> adaptersToRecover) {
+        for (AdapterDescription adapterDescription : adaptersToRecover.values()) {
             // Invoke the adapters
             try {
-                adapterMasterManagement.startStreamAdapter(adapterDescription.getElementId());
+                if (adapterDescription instanceof AdapterStreamDescription) {
+                    this.adapterMasterManagement.startStreamAdapter(adapterDescription.getElementId());
+                }
             } catch (AdapterException e) {
                 e.printStackTrace();
             }
         }
 
-   }
+        return true;
+    }
 
 }
diff --git a/streampipes-connect-container-master/src/main/java/org/apache/streampipes/connect/container/master/management/AdapterMasterManagement.java b/streampipes-connect-container-master/src/main/java/org/apache/streampipes/connect/container/master/management/AdapterMasterManagement.java
index 98dceca..252a125 100644
--- a/streampipes-connect-container-master/src/main/java/org/apache/streampipes/connect/container/master/management/AdapterMasterManagement.java
+++ b/streampipes-connect-container-master/src/main/java/org/apache/streampipes/connect/container/master/management/AdapterMasterManagement.java
@@ -47,17 +47,17 @@ import java.util.UUID;
 
 import static org.apache.streampipes.manager.storage.UserManagementService.getUserService;
 
-
+/**
+ * This class is responsible for managing all the adapter instances which are executed on worker nodes
+ */
 public class AdapterMasterManagement {
 
   private static final Logger LOG = LoggerFactory.getLogger(AdapterMasterManagement.class);
 
   private IAdapterStorage adapterInstanceStorage;
-  private WorkerUrlProvider workerUrlProvider;
 
   public AdapterMasterManagement() {
     this.adapterInstanceStorage = getAdapterInstanceStorage();
-    this.workerUrlProvider = new WorkerUrlProvider();
   }
 
   public AdapterMasterManagement(IAdapterStorage adapterStorage) {
@@ -68,45 +68,36 @@ public class AdapterMasterManagement {
                            String username)
           throws AdapterException {
 
-    try {
-      // Create elementId for adapter
-      String uuid = UUID.randomUUID().toString();
-      ad.setElementId(ad.getElementId() + ":" + uuid);
-      ad.setCreatedAt(System.currentTimeMillis());
-
-      // Find worker to execute adapter
-      String selectedEndpointUrl = workerUrlProvider.getWorkerBaseUrl(ad.getAppId());
-      ad.setSelectedEndpointUrl(selectedEndpointUrl);
-
-      // Add EventGrounding to AdapterDescription
-      EventGrounding eventGrounding = GroundingService.createEventGrounding();
-      ad.setEventGrounding(eventGrounding);
-
-      // Encrypt adapter description to store it in db
-      AdapterDescription encryptedAdapterDescription =
-              new AdapterEncryptionService(new Cloner().adapterDescription(ad)).encrypt();
-
-      // store in db
-      encryptedAdapterDescription.setRev(null);
-      String adapterId = adapterInstanceStorage.storeAdapter(encryptedAdapterDescription);
-
-      // start when stream adapter
-      if (ad instanceof AdapterStreamDescription) {
-        WorkerRestClient.invokeStreamAdapter(selectedEndpointUrl, adapterId);
-        LOG.info("Start adapter");
-      }
+    // Create elementId for adapter
+    // TODO centralized provisioning of element id
+    String uuid = UUID.randomUUID().toString();
+    ad.setElementId(ad.getElementId() + ":" + uuid);
+    ad.setCreatedAt(System.currentTimeMillis());
+
+    // Add EventGrounding to AdapterDescription
+    EventGrounding eventGrounding = GroundingService.createEventGrounding();
+    ad.setEventGrounding(eventGrounding);
 
-      // Create stream
-      SpDataStream storedDescription = new SourcesManagement().getAdapterDataStream(ad.getElementId());
-      storedDescription.setCorrespondingAdapterId(adapterId);
-      installDataSource(storedDescription, username);
-      LOG.info("Install source (source URL: {} in backend", ad.getElementId());
+    // Encrypt adapter description to store it in db
+    AdapterDescription encryptedAdapterDescription =
+            new AdapterEncryptionService(new Cloner().adapterDescription(ad)).encrypt();
 
-      return storedDescription.getElementId();
-    } catch (NoServiceEndpointsAvailableException e) {
-      throw new AdapterException("Could not find a worker for adapter " + ad.getAppId(), e);
+    // store in db
+    encryptedAdapterDescription.setRev(null);
+    String elementId = adapterInstanceStorage.storeAdapter(encryptedAdapterDescription);
+
+    // start when stream adapter
+    if (ad instanceof AdapterStreamDescription) {
+      startStreamAdapter(elementId);
     }
 
+    // Create stream
+    SpDataStream storedDescription = new SourcesManagement().getAdapterDataStream(ad.getElementId());
+    storedDescription.setCorrespondingAdapterId(elementId);
+    installDataSource(storedDescription, username);
+    LOG.info("Install source (source URL: {} in backend", ad.getElementId());
+
+    return storedDescription.getElementId();
   }
 
 
@@ -180,9 +171,9 @@ public class AdapterMasterManagement {
     return allAdapters;
   }
 
-  public void stopSetAdapter(String adapterId, String baseUrl, AdapterInstanceStorageImpl adapterStorage) throws AdapterException {
+  public void stopSetAdapter(String elementId, String baseUrl, AdapterInstanceStorageImpl adapterStorage) throws AdapterException {
 
-    AdapterSetDescription ad = (AdapterSetDescription) adapterStorage.getAdapter(adapterId);
+    AdapterSetDescription ad = (AdapterSetDescription) adapterStorage.getAdapter(elementId);
 
     WorkerRestClient.stopSetAdapter(baseUrl, ad);
   }
@@ -198,30 +189,31 @@ public class AdapterMasterManagement {
   }
 
   public void startStreamAdapter(String elementId) throws AdapterException {
-    // TODO ensure that adapter is not started twice
 
     AdapterDescription ad = adapterInstanceStorage.getAdapter(elementId);
-    try {
-      String endpointUrl = findEndpointUrl(ad);
-      URI uri = new URI(endpointUrl);
-      String baseUrl = uri.getScheme() + "://" + uri.getAuthority();
-      if (!isStreamAdapter(elementId)) {
-        throw new AdapterException("Adapter " + elementId + "is not a stream adapter.");
-      } else {
+
+    if (!isStreamAdapter(ad)) {
+      throw new AdapterException("Adapter " + elementId + "is not a stream adapter.");
+    } else {
+
+      try {
+        // Find endpoint to start adapter on
+        String baseUrl = findEndpointUrl(ad.getAppId());
+
+        // Update selected endpoint URL of adapter
         ad.setSelectedEndpointUrl(baseUrl);
         adapterInstanceStorage.updateAdapter(ad);
 
-        AdapterDescription decryptedAdapterDescription =
-                new AdapterEncryptionService(new Cloner().adapterDescription(ad)).decrypt();
-        WorkerRestClient.invokeStreamAdapter(baseUrl, (AdapterStreamDescription) decryptedAdapterDescription);
+        // Invoke adapter instance
+        WorkerRestClient.invokeStreamAdapter(baseUrl, elementId);
+
+        LOG.info("Started adapter " + elementId + " on: " + baseUrl);
+      } catch (NoServiceEndpointsAvailableException e) {
+        e.printStackTrace();
+      } catch (URISyntaxException e) {
+        e.printStackTrace();
       }
-    } catch (NoServiceEndpointsAvailableException e) {
-      e.printStackTrace();
-    } catch (URISyntaxException e) {
-      e.printStackTrace();
     }
-
-
   }
 
   private void installDataSource(SpDataStream stream, String username) throws AdapterException {
@@ -247,8 +239,11 @@ public class AdapterMasterManagement {
     return new AdapterInstanceStorageImpl();
   }
 
-  private String findEndpointUrl(AdapterDescription adapterDescription) throws NoServiceEndpointsAvailableException {
+  private String findEndpointUrl(String appId) throws NoServiceEndpointsAvailableException, URISyntaxException {
     SpServiceUrlProvider serviceUrlProvider = SpServiceUrlProvider.ADAPTER;
-    return new ExtensionsServiceEndpointGenerator(adapterDescription.getAppId(), serviceUrlProvider).getEndpointResourceUrl();
+    String endpointUrl = new ExtensionsServiceEndpointGenerator(appId, serviceUrlProvider).getEndpointResourceUrl();
+    URI uri = new URI(endpointUrl);
+    String baseUrl = uri.getScheme() + "://" + uri.getAuthority();
+    return baseUrl;
   }
 }
diff --git a/streampipes-connect-container-master/src/main/java/org/apache/streampipes/connect/container/master/management/SourcesManagement.java b/streampipes-connect-container-master/src/main/java/org/apache/streampipes/connect/container/master/management/SourcesManagement.java
index f362473..d474062 100644
--- a/streampipes-connect-container-master/src/main/java/org/apache/streampipes/connect/container/master/management/SourcesManagement.java
+++ b/streampipes-connect-container-master/src/main/java/org/apache/streampipes/connect/container/master/management/SourcesManagement.java
@@ -49,7 +49,6 @@ public class SourcesManagement {
 
     private AdapterInstanceStorageImpl adapterStorage;
     private WorkerUrlProvider workerUrlProvider;
-    private String connectHost = null;
 
     public SourcesManagement(AdapterInstanceStorageImpl adapterStorage) {
       this.adapterStorage = adapterStorage;
@@ -100,7 +99,6 @@ public class SourcesManagement {
     }
 
     public String getAllAdaptersInstallDescription() throws AdapterException {
-//        String host = getConnectHost();
 
         List<AdapterDescription> allAdapters = adapterStorage.getAllAdapters();
         List<Description> allAdapterDescriptions = new ArrayList<>();
@@ -183,12 +181,6 @@ public class SourcesManagement {
         ds.setCorrespondingAdapterId(adapterDescription.getAppId());
         ds.setInternallyManaged(true);
 
-        ds.setUri(url);
-
         return ds;
     }
-
-    public void setConnectHost(String connectHost) {
-        this.connectHost = connectHost;
-    }
 }
diff --git a/streampipes-connect-container-master/src/main/java/org/apache/streampipes/connect/container/master/management/WorkerAdministrationManagement.java b/streampipes-connect-container-master/src/main/java/org/apache/streampipes/connect/container/master/management/WorkerAdministrationManagement.java
index 6258f12..9d3b5e3 100644
--- a/streampipes-connect-container-master/src/main/java/org/apache/streampipes/connect/container/master/management/WorkerAdministrationManagement.java
+++ b/streampipes-connect-container-master/src/main/java/org/apache/streampipes/connect/container/master/management/WorkerAdministrationManagement.java
@@ -31,14 +31,11 @@ public class WorkerAdministrationManagement {
 
     private static final Logger LOG = LoggerFactory.getLogger(AdapterMasterManagement.class);
 
-    private AdapterMasterManagement adapterMasterManagement;
-
     private IAdapterStorage adapterDescriptionStorage;
 
     private AdapterHealthCheck adapterHealthCheck;
 
     public WorkerAdministrationManagement() {
-        this.adapterMasterManagement = new AdapterMasterManagement();
         this.adapterHealthCheck = new AdapterHealthCheck();
         this.adapterDescriptionStorage = CouchDbStorageManager.INSTANCE.getAdapterDescriptionStorage();
     }
@@ -59,61 +56,5 @@ public class WorkerAdministrationManagement {
         });
 
         this.adapterHealthCheck.checkAndRestoreAdapters();
-
-
-        // Check if already registered
-
-//        List<ConnectWorkerContainer> allConnectWorkerContainers =
-//                this.connectionWorkerContainerStorage.getAllConnectWorkerContainers();
-//
-//        boolean alreadyRegistered = false;
-//
-//        // Delete old description if it was registred before
-//        for (ConnectWorkerContainer c : allConnectWorkerContainers) {
-//            if (c.getServiceGroup().equals(connectWorker.getServiceGroup())) {
-//                boolean adaptersChanged = false;
-//
-//                for (AdapterDescription a : c.getAdapters()) {
-//                    if (connectWorker.getAdapters().stream().noneMatch(ad -> ad.getElementId().equals(a.getElementId()))) {
-//                        adaptersChanged = true;
-//                    }
-//                }
-//
-//                for (ProtocolDescription p : c.getProtocols()) {
-//                    if (connectWorker.getProtocols().stream().noneMatch(pr -> pr.getAppId().equals(p.getAppId()))) {
-//                        adaptersChanged = true;
-//                    }
-//                }
-//
-//                if (!adaptersChanged) {
-//                    alreadyRegistered = true;
-//                } else {
-//                    LOG.info("Remove old connect worker: " + connectWorker.getServiceGroup());
-//                    this.connectionWorkerContainerStorage.deleteConnectWorkerContainer(c.getId());
-//                }
-//            }
-//        }
-//
-//
-//
-//        // TODO I am not sure if this is correct
-//        // IF NOT REGISTERED
-//        // Store Connect Worker in DB
-//        if (!alreadyRegistered) {
-//            this.connectionWorkerContainerStorage.storeConnectWorkerContainer(connectWorker);
-//            LOG.info("Stored new connect worker: " + connectWorker.getServiceGroup() + " in database");
-//        } else {
-//            try {
-//                this.adapterMasterManagement.startAllStreamAdapters(connectWorker);
-//            } catch (AdapterException e) {
-//                LOG.error("Could not start adapters on worker: " + connectWorker.getServiceGroup());
-//            } catch (NoServiceEndpointsAvailableException e) {
-//                LOG.error("Could not start adapter due to missing endpoint");
-//            }
-//        }
     }
-
-
-
-
 }
diff --git a/streampipes-connect-container-master/src/main/java/org/apache/streampipes/connect/container/master/management/WorkerRestClient.java b/streampipes-connect-container-master/src/main/java/org/apache/streampipes/connect/container/master/management/WorkerRestClient.java
index d8eec8e..c749d53 100644
--- a/streampipes-connect-container-master/src/main/java/org/apache/streampipes/connect/container/master/management/WorkerRestClient.java
+++ b/streampipes-connect-container-master/src/main/java/org/apache/streampipes/connect/container/master/management/WorkerRestClient.java
@@ -37,27 +37,26 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
-import java.io.UnsupportedEncodingException;
-import java.net.URLEncoder;
-import java.nio.charset.StandardCharsets;
 import java.util.List;
 
+/**
+ * This client can be used to interact with the adapter workers executing the adapter instances
+ */
 public class WorkerRestClient {
 
     private static final Logger logger = LoggerFactory.getLogger(WorkerRestClient.class);
 
-    public static void invokeStreamAdapter(String endpointUrl, String elementId) throws AdapterException {
-        invokeStreamAdapter(endpointUrl, (AdapterStreamDescription) getAndDecryptAdapter(elementId));
-    }
-
-    public static void invokeStreamAdapter(String endpointUrl, AdapterStreamDescription adapterStreamDescription) throws AdapterException {
+    public static void invokeStreamAdapter(String endpointUrl,
+                                           String elementId) throws AdapterException {
+        AdapterStreamDescription adapterStreamDescription =  (AdapterStreamDescription) getAndDecryptAdapter(elementId);
         String url = endpointUrl + WorkerPaths.getStreamInvokePath();
 
         startAdapter(url, adapterStreamDescription);
         updateStreamAdapterStatus(adapterStreamDescription.getElementId(), true);
     }
 
-    public static void stopStreamAdapter(String baseUrl, AdapterStreamDescription adapterStreamDescription) throws AdapterException {
+    public static void stopStreamAdapter(String baseUrl,
+                                         AdapterStreamDescription adapterStreamDescription) throws AdapterException {
         String url = baseUrl + WorkerPaths.getStreamStopPath();
 
         AdapterDescription ad = getAdapterDescriptionById(new AdapterInstanceStorageImpl(), adapterStreamDescription.getElementId());
@@ -79,7 +78,6 @@ public class WorkerRestClient {
     }
 
     public static List<AdapterDescription> getAllRunningAdapterInstanceDescriptions(String url) throws AdapterException {
-        // Stop execution of adapter
         try {
             logger.info("Requesting all running adapter description instances: " + url);
 
@@ -93,7 +91,7 @@ public class WorkerRestClient {
             return result;
         } catch (IOException e) {
             logger.error("List of running adapters could not be fetched", e);
-            throw new AdapterException("Adapter was not stopped successfully with url: " + url);
+            throw new AdapterException("List of running adapters could not be fetched from: " + url);
         }
     }
 
@@ -222,14 +220,6 @@ public class WorkerRestClient {
         return adapterDescription;
     }
 
-    private static String encodeValue(String value) {
-        try {
-            return URLEncoder.encode(value, StandardCharsets.UTF_8.toString());
-        } catch (UnsupportedEncodingException ex) {
-            throw new RuntimeException(ex.getCause());
-        }
-    }
-
     private static void updateStreamAdapterStatus(String adapterId,
                                            boolean running) {
         AdapterStreamDescription adapter = (AdapterStreamDescription) getAndDecryptAdapter(adapterId);
diff --git a/streampipes-connect-container-master/src/main/java/org/apache/streampipes/connect/container/master/util/WorkerPaths.java b/streampipes-connect-container-master/src/main/java/org/apache/streampipes/connect/container/master/util/WorkerPaths.java
index ffe463c..15572ee 100644
--- a/streampipes-connect-container-master/src/main/java/org/apache/streampipes/connect/container/master/util/WorkerPaths.java
+++ b/streampipes-connect-container-master/src/main/java/org/apache/streampipes/connect/container/master/util/WorkerPaths.java
@@ -37,6 +37,10 @@ public class WorkerPaths {
     return WorkerMainPath + "/set/stop";
   }
 
+  public static String getRunningAdaptersPath() {
+    return WorkerMainPath + "/running";
+  }
+
   public static String getRuntimeResolvablePath(String elementId) {
     return WorkerMainPath + "/resolvable/" + elementId + "/configurations";
   }
diff --git a/streampipes-connect-container-master/src/test/java/org/apache/streampipes/connect/container/master/health/AdapterHealthCheckTest.java b/streampipes-connect-container-master/src/test/java/org/apache/streampipes/connect/container/master/health/AdapterHealthCheckTest.java
new file mode 100644
index 0000000..77185fa
--- /dev/null
+++ b/streampipes-connect-container-master/src/test/java/org/apache/streampipes/connect/container/master/health/AdapterHealthCheckTest.java
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.apache.streampipes.connect.container.master.health;
+
+import org.apache.streampipes.model.connect.adapter.AdapterDescription;
+import org.apache.streampipes.model.connect.adapter.SpecificAdapterStreamDescription;
+import org.apache.streampipes.sdk.builder.adapter.SpecificDataStreamAdapterBuilder;
+import org.apache.streampipes.storage.couchdb.impl.AdapterInstanceStorageImpl;
+import org.junit.Test;
+
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.powermock.api.mockito.PowerMockito.mock;
+import static org.powermock.api.mockito.PowerMockito.when;
+
+public class AdapterHealthCheckTest {
+
+    private String testElementId = "testElementId";
+    private String selectedEndpointUrl = "http://test.de";
+
+    @Test
+    public void getAllRunningInstancesAdapterDescriptions() {
+        AdapterInstanceStorageImpl adapterStorage = mock(AdapterInstanceStorageImpl.class);
+        when(adapterStorage.getAllAdapters()).thenReturn(getAdapterDescriptionList());
+
+        AdapterHealthCheck adapterHealthCheck = new AdapterHealthCheck(adapterStorage, null);
+        Map<String, AdapterDescription> result = adapterHealthCheck.getAllRunningInstancesAdapterDescriptions();
+
+        assertNotNull(result);
+        assertEquals(1, result.keySet().size());
+        assertEquals(getAdapterDescriptionList().get(0), result.get(testElementId));
+    }
+
+    @Test
+    public void getAllWorkersWithAdapters() {
+        AdapterInstanceStorageImpl adapterStorage = mock(AdapterInstanceStorageImpl.class);
+        when(adapterStorage.getAllAdapters()).thenReturn(getAdapterDescriptionList());
+
+        AdapterHealthCheck adapterHealthCheck = new AdapterHealthCheck(null, null);
+        Map<String, List<AdapterDescription>> result = adapterHealthCheck.getAllWorkersWithAdapters(getAdapterDescriptionMap());
+
+        assertNotNull(result);
+        assertEquals(1, result.keySet().size());
+        assertEquals(1, result.get(selectedEndpointUrl).size());
+        assertEquals(getAdapterDescriptionList().get(0), result.get(selectedEndpointUrl).get(0));
+    }
+
+
+    private List<AdapterDescription> getAdapterDescriptionList() {
+
+        SpecificAdapterStreamDescription adapterStreamDescription = SpecificDataStreamAdapterBuilder
+                .create("testAppId", "Test Adapter", "")
+                .elementId(testElementId)
+                .build();
+        adapterStreamDescription.setSelectedEndpointUrl("http://test.de");
+
+        return Arrays.asList(adapterStreamDescription);
+    }
+
+    private Map<String, AdapterDescription> getAdapterDescriptionMap() {
+        Map<String, AdapterDescription> result = new HashMap<>();
+        result.put(testElementId, getAdapterDescriptionList().get(0));
+
+       return result;
+    }
+
+}
\ No newline at end of file
diff --git a/streampipes-connect-container-master/src/test/java/org/apache/streampipes/connect/container/master/management/DescriptionManagementTest.java b/streampipes-connect-container-master/src/test/java/org/apache/streampipes/connect/container/master/management/DescriptionManagementTest.java
index a83a585..4a470fb 100644
--- a/streampipes-connect-container-master/src/test/java/org/apache/streampipes/connect/container/master/management/DescriptionManagementTest.java
+++ b/streampipes-connect-container-master/src/test/java/org/apache/streampipes/connect/container/master/management/DescriptionManagementTest.java
@@ -57,7 +57,6 @@ public class DescriptionManagementTest {
         List<FormatDescription> result = descriptionManagement.getFormats();
 
         assertNotNull(result);
-        assertNotNull(result);
         assertEquals(1, result.size());
         assertEquals(JsonFormat.ID, result.get(0).getAppId());
     }
diff --git a/streampipes-connect-container-master/src/test/java/org/apache/streampipes/connect/container/master/management/SourcesManagementTest.java b/streampipes-connect-container-master/src/test/java/org/apache/streampipes/connect/container/master/management/SourcesManagementTest.java
index bdad3b9..00fadf9 100644
--- a/streampipes-connect-container-master/src/test/java/org/apache/streampipes/connect/container/master/management/SourcesManagementTest.java
+++ b/streampipes-connect-container-master/src/test/java/org/apache/streampipes/connect/container/master/management/SourcesManagementTest.java
@@ -115,20 +115,19 @@ public class SourcesManagementTest {
         when(adapterStorage.getAllAdapters()).thenReturn(getAdapterDescriptionList());
 
         SourcesManagement sourcesManagement = new SourcesManagement(adapterStorage);
-        sourcesManagement.setConnectHost("host");
 
         String result = sourcesManagement.getAllAdaptersInstallDescription();
         assertEquals(getJsonString(), result);
 
     }
 
+    @Ignore
     @Test(expected = AdapterException.class)
     public void getAllAdaptersInstallDescriptionFail() throws Exception {
         AdapterInstanceStorageImpl adapterStorage = mock(AdapterInstanceStorageImpl.class);
         AdapterDescription adapterDescription = new GenericAdapterSetDescription();
         when(adapterStorage.getAllAdapters()).thenReturn(Arrays.asList(adapterDescription));
         SourcesManagement sourcesManagement = new SourcesManagement(adapterStorage);
-        sourcesManagement.setConnectHost("host");
 
         sourcesManagement.getAllAdaptersInstallDescription();
 
diff --git a/streampipes-connect-container-worker/src/main/java/org/apache/streampipes/connect/container/worker/management/AdapterWorkerManagement.java b/streampipes-connect-container-worker/src/main/java/org/apache/streampipes/connect/container/worker/management/AdapterWorkerManagement.java
index 94ba63e..cf9d937 100644
--- a/streampipes-connect-container-worker/src/main/java/org/apache/streampipes/connect/container/worker/management/AdapterWorkerManagement.java
+++ b/streampipes-connect-container-worker/src/main/java/org/apache/streampipes/connect/container/worker/management/AdapterWorkerManagement.java
@@ -50,7 +50,7 @@ public class AdapterWorkerManagement {
         stopAdapter(adapterStreamDescription);
     }
 
-    public void invokeSetAdapter (AdapterSetDescription adapterSetDescription) throws AdapterException {
+    public void invokeSetAdapter(AdapterSetDescription adapterSetDescription) throws AdapterException {
 
         IAdapter<?> adapter = AdapterUtils.setAdapter(adapterSetDescription);
 
@@ -61,7 +61,7 @@ public class AdapterWorkerManagement {
         adapter.startAdapter();
     }
 
-    public void stopSetAdapter (AdapterSetDescription adapterSetDescription) throws AdapterException {
+    public void stopSetAdapter(AdapterSetDescription adapterSetDescription) throws AdapterException {
         stopAdapter(adapterSetDescription);
     }
 
diff --git a/streampipes-connect-container-worker/src/main/java/org/apache/streampipes/connect/container/worker/rest/AdapterWorkerResource.java b/streampipes-connect-container-worker/src/main/java/org/apache/streampipes/connect/container/worker/rest/AdapterWorkerResource.java
index a05e749..79c7607 100644
--- a/streampipes-connect-container-worker/src/main/java/org/apache/streampipes/connect/container/worker/rest/AdapterWorkerResource.java
+++ b/streampipes-connect-container-worker/src/main/java/org/apache/streampipes/connect/container/worker/rest/AdapterWorkerResource.java
@@ -47,7 +47,6 @@ public class AdapterWorkerResource extends AbstractSharedRestInterface {
         this.adapterManagement = adapterManagement;
     }
 
-    // get all running instances
     @GET
     @JacksonSerialized
     @Path("/running")
diff --git a/streampipes-connect-container-worker/src/main/java/org/apache/streampipes/connect/container/worker/utils/AdapterUtils.java b/streampipes-connect-container-worker/src/main/java/org/apache/streampipes/connect/container/worker/utils/AdapterUtils.java
index 72df3df..026b8a7 100644
--- a/streampipes-connect-container-worker/src/main/java/org/apache/streampipes/connect/container/worker/utils/AdapterUtils.java
+++ b/streampipes-connect-container-worker/src/main/java/org/apache/streampipes/connect/container/worker/utils/AdapterUtils.java
@@ -18,7 +18,6 @@
 
 package org.apache.streampipes.connect.container.worker.utils;
 
-import org.apache.http.client.fluent.Request;
 import org.apache.streampipes.connect.adapter.model.generic.GenericAdapter;
 import org.apache.streampipes.connect.adapter.model.generic.GenericDataSetAdapter;
 import org.apache.streampipes.connect.adapter.model.generic.GenericDataStreamAdapter;
@@ -31,34 +30,9 @@ import org.apache.streampipes.model.connect.adapter.GenericAdapterStreamDescript
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.io.IOException;
-
 public class AdapterUtils {
     private static final Logger logger = LoggerFactory.getLogger(AdapterUtils .class);
 
-    public static String stopPipeline(String url) {
-        logger.info("Send stopAdapter pipeline request on URL: " + url);
-
-        String result = "";
-        try {
-            result = Request.Get(url)
-                    .connectTimeout(1000)
-                    .socketTimeout(100000)
-                    .execute().returnContent().asString();
-        } catch (IOException e) {
-            e.printStackTrace();
-            result = e.getMessage();
-        }
-
-        logger.info("Successfully stopped pipeline");
-
-        return result;
-    }
-
-    public static String getUrl(String baseUrl, String pipelineId) {
-        return "http://" +baseUrl + "api/v2/pipelines/" + pipelineId + "/stopAdapter";
-    }
-
     public static IAdapter setAdapter(AdapterDescription adapterDescription) {
         IAdapter adapter = null;
 
diff --git a/streampipes-connect-container-worker/src/test/java/org/apache/streampipes/connect/container/worker/utils/AdapterUtilsTest.java b/streampipes-connect-container-worker/src/test/java/org/apache/streampipes/connect/container/worker/utils/AdapterUtilsTest.java
deleted file mode 100644
index 9f2c88e..0000000
--- a/streampipes-connect-container-worker/src/test/java/org/apache/streampipes/connect/container/worker/utils/AdapterUtilsTest.java
+++ /dev/null
@@ -1,52 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-
-package org.apache.streampipes.connect.container.worker.utils;
-
-
-import com.github.tomakehurst.wiremock.client.WireMock;
-import com.github.tomakehurst.wiremock.junit.WireMockRule;
-import org.apache.streampipes.connect.container.worker.management.Mock;
-import org.junit.Rule;
-import org.junit.Test;
-
-import static org.junit.Assert.assertEquals;
-
-public class AdapterUtilsTest {
-    @Rule
-    public WireMockRule wireMockRule = new WireMockRule(Mock.PORT);
-
-
-    @Test
-    public void stopPipeline() {
-        String expected = "";
-
-        WireMock.stubFor(WireMock.get(WireMock.urlEqualTo("/"))
-                .willReturn(WireMock.aResponse()
-                        .withStatus(200)
-                        .withBody(expected)));
-
-
-        String result = AdapterUtils.stopPipeline(Mock.HOST + "/");
-
-        assertEquals(expected, result);
-
-        WireMock.verify(WireMock.getRequestedFor(WireMock.urlMatching("/")));
-    }
-
-}
diff --git a/streampipes-rest/src/main/java/org/apache/streampipes/rest/impl/connect/AdapterResource.java b/streampipes-rest/src/main/java/org/apache/streampipes/rest/impl/connect/AdapterResource.java
index 81d8383..69ad2eb 100644
--- a/streampipes-rest/src/main/java/org/apache/streampipes/rest/impl/connect/AdapterResource.java
+++ b/streampipes-rest/src/main/java/org/apache/streampipes/rest/impl/connect/AdapterResource.java
@@ -18,7 +18,6 @@
 
 package org.apache.streampipes.rest.impl.connect;
 
-import org.apache.streampipes.commons.exceptions.NoServiceEndpointsAvailableException;
 import org.apache.streampipes.connect.api.exception.AdapterException;
 import org.apache.streampipes.connect.container.master.management.AdapterMasterManagement;
 import org.apache.streampipes.model.connect.adapter.AdapterDescription;
@@ -80,7 +79,7 @@ public class AdapterResource extends AbstractAdapterResource<AdapterMasterManage
     @JacksonSerialized
     @Path("/{id}/stop")
     @Produces(MediaType.APPLICATION_JSON)
-    public Response stopAdapter(@PathParam("id") String adapterId) throws NoServiceEndpointsAvailableException {
+    public Response stopAdapter(@PathParam("id") String adapterId) {
         try {
             managementService.stopStreamAdapter(adapterId);
             return ok(Notifications.success("Adapter started"));
@@ -124,7 +123,6 @@ public class AdapterResource extends AbstractAdapterResource<AdapterMasterManage
     @Produces(MediaType.APPLICATION_JSON)
     public Response getAllAdapters() {
         try {
-            // TODO get all invoced adapters
             List<AdapterDescription> result = managementService.getAllAdapterInstances();
 
             return ok(result);
diff --git a/streampipes-rest/src/main/java/org/apache/streampipes/rest/impl/connect/GuessResource.java b/streampipes-rest/src/main/java/org/apache/streampipes/rest/impl/connect/GuessResource.java
index 3d4bdd0..474c119 100644
--- a/streampipes-rest/src/main/java/org/apache/streampipes/rest/impl/connect/GuessResource.java
+++ b/streampipes-rest/src/main/java/org/apache/streampipes/rest/impl/connect/GuessResource.java
@@ -30,7 +30,6 @@ import org.slf4j.LoggerFactory;
 
 import javax.ws.rs.POST;
 import javax.ws.rs.Path;
-import javax.ws.rs.PathParam;
 import javax.ws.rs.Produces;
 import javax.ws.rs.core.MediaType;
 import javax.ws.rs.core.Response;
@@ -49,7 +48,7 @@ public class GuessResource extends AbstractAdapterResource<GuessManagement> {
   @JacksonSerialized
   @Path("/schema")
   @Produces(MediaType.APPLICATION_JSON)
-  public Response guessSchema(AdapterDescription adapterDescription, @PathParam("username") String userName) {
+  public Response guessSchema(AdapterDescription adapterDescription) {
 
       try {
           GuessSchema result = managementService.guessSchema(adapterDescription);
diff --git a/streampipes-rest/src/main/java/org/apache/streampipes/rest/impl/connect/RuntimeResolvableResource.java b/streampipes-rest/src/main/java/org/apache/streampipes/rest/impl/connect/RuntimeResolvableResource.java
index b8a6ab1..1db6196 100644
--- a/streampipes-rest/src/main/java/org/apache/streampipes/rest/impl/connect/RuntimeResolvableResource.java
+++ b/streampipes-rest/src/main/java/org/apache/streampipes/rest/impl/connect/RuntimeResolvableResource.java
@@ -34,8 +34,7 @@ import javax.ws.rs.core.Response;
 @Path("/v2/connect/master/resolvable")
 public class RuntimeResolvableResource extends AbstractAdapterResource<WorkerAdministrationManagement> {
 
-    private static final String SP_NS =  "https://streampipes.org/vocabulary/v1/";
-    private WorkerUrlProvider workerUrlProvider;
+    private final WorkerUrlProvider workerUrlProvider;
 
     public RuntimeResolvableResource() {
         super(WorkerAdministrationManagement::new);
@@ -48,7 +47,6 @@ public class RuntimeResolvableResource extends AbstractAdapterResource<WorkerAdm
     @Produces(MediaType.APPLICATION_JSON)
     @Consumes(MediaType.APPLICATION_JSON)
     public Response fetchConfigurations(@PathParam("id") String appId,
-                                        @PathParam("username") String username,
                                         RuntimeOptionsRequest runtimeOptionsRequest) {
 
         // TODO add solution for formats
diff --git a/streampipes-rest/src/main/java/org/apache/streampipes/rest/impl/connect/SourcesResource.java b/streampipes-rest/src/main/java/org/apache/streampipes/rest/impl/connect/SourcesResource.java
index 8b6d4cd..d5fb288 100644
--- a/streampipes-rest/src/main/java/org/apache/streampipes/rest/impl/connect/SourcesResource.java
+++ b/streampipes-rest/src/main/java/org/apache/streampipes/rest/impl/connect/SourcesResource.java
@@ -94,9 +94,6 @@ public class SourcesResource extends AbstractAdapterResource<SourcesManagement>
                            @PathParam("runningInstanceId") String runningInstanceId) {
         String responseMessage = "Instance of set id: " + elementId  + " with instance id: "+ runningInstanceId + " successfully started";
 
-//        String workerUrl = new Utils().getWorkerUrlById(elementId);
-//        String newUrl = Utils.addUserNameToApi(workerUrl, username);
-
         try {
             managementService.detachAdapter(elementId, runningInstanceId);
         } catch (AdapterException | NoServiceEndpointsAvailableException e) {
diff --git a/streampipes-rest/src/main/java/org/apache/streampipes/rest/impl/connect/WorkerAdministrationResource.java b/streampipes-rest/src/main/java/org/apache/streampipes/rest/impl/connect/WorkerAdministrationResource.java
index 27c7a94..d332996 100644
--- a/streampipes-rest/src/main/java/org/apache/streampipes/rest/impl/connect/WorkerAdministrationResource.java
+++ b/streampipes-rest/src/main/java/org/apache/streampipes/rest/impl/connect/WorkerAdministrationResource.java
@@ -48,11 +48,8 @@ public class WorkerAdministrationResource extends AbstractSharedRestInterface {
     @JacksonSerialized
     @Produces(MediaType.APPLICATION_JSON)
     public Response addWorkerContainer(List<AdapterDescription> availableAdapterDescription) {
-        // Change this to List<AdapterDescription>
-        // How do I store the available AdapterDescriptions when there is no ConnectWorkerContainer
-//        LOG.info("Worker container: " + connectWorkerContainer.getServiceGroup() + " was detected");
+
         this.workerAdministrationManagement.register(availableAdapterDescription);
-        System.out.println(availableAdapterDescription);
 
         return ok(Notifications.success("Worker Container successfully added"));
     }
diff --git a/streampipes-sdk/src/main/java/org/apache/streampipes/sdk/builder/adapter/AdapterDescriptionBuilder.java b/streampipes-sdk/src/main/java/org/apache/streampipes/sdk/builder/adapter/AdapterDescriptionBuilder.java
index 7df2e40..db18910 100644
--- a/streampipes-sdk/src/main/java/org/apache/streampipes/sdk/builder/adapter/AdapterDescriptionBuilder.java
+++ b/streampipes-sdk/src/main/java/org/apache/streampipes/sdk/builder/adapter/AdapterDescriptionBuilder.java
@@ -30,13 +30,11 @@ public abstract class AdapterDescriptionBuilder<BU extends
 
   protected AdapterDescriptionBuilder(String id, T element) {
     super(id, element);
-//    this.elementDescription.setAdapterId(id);
   }
 
   protected AdapterDescriptionBuilder(String id, String label, String description,
                                    T adapterTypeInstance) {
     super(id, label, description, adapterTypeInstance);
-//    this.elementDescription.setAdapterId(id);
   }
 
   public AdapterDescriptionBuilder<BU, T> category(AdapterType... categories) {
@@ -47,4 +45,11 @@ public abstract class AdapterDescriptionBuilder<BU extends
                     .collect(Collectors.toList()));
     return me();
   }
+
+  public AdapterDescriptionBuilder<BU, T> elementId(String elementId) {
+    this.elementDescription.setElementId(elementId);
+    return me();
+  }
+
+
 }