You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@streampipes.apache.org by ze...@apache.org on 2021/10/11 13:00:19 UTC

[incubator-streampipes] 03/04: [STREAMPIPES-438] Fix data set adapters

This is an automated email from the ASF dual-hosted git repository.

zehnder pushed a commit to branch STREAMPIPES-438
in repository https://gitbox.apache.org/repos/asf/incubator-streampipes.git

commit 5897ade5217c449991659c93547b0533a068eb6a
Author: Philipp Zehnder <ze...@fzi.de>
AuthorDate: Mon Oct 11 14:45:21 2021 +0200

    [STREAMPIPES-438] Fix data set adapters
---
 .../master/management/AdapterMasterManagement.java | 46 ++++++-----
 .../master/management/SourcesManagement.java       | 90 +++++++---------------
 .../master/management/WorkerRestClient.java        |  4 +-
 .../connect/container/master/util/WorkerPaths.java | 15 +++-
 .../master/management/SourcesManagementTest.java   | 29 +------
 .../worker/rest/AdapterWorkerResource.java         |  8 +-
 .../manager/setup/CouchDbInstallationStep.java     |  3 +-
 .../rest/impl/connect/SourcesResource.java         | 33 +-------
 .../tests/pipelineElement/AllPipelineElements.ts   |  1 -
 9 files changed, 75 insertions(+), 154 deletions(-)

diff --git a/streampipes-connect-container-master/src/main/java/org/apache/streampipes/connect/container/master/management/AdapterMasterManagement.java b/streampipes-connect-container-master/src/main/java/org/apache/streampipes/connect/container/master/management/AdapterMasterManagement.java
index 5ada040..a6d28f6 100644
--- a/streampipes-connect-container-master/src/main/java/org/apache/streampipes/connect/container/master/management/AdapterMasterManagement.java
+++ b/streampipes-connect-container-master/src/main/java/org/apache/streampipes/connect/container/master/management/AdapterMasterManagement.java
@@ -23,7 +23,7 @@ import org.apache.streampipes.commons.exceptions.SepaParseException;
 import org.apache.streampipes.connect.adapter.GroundingService;
 import org.apache.streampipes.connect.api.exception.AdapterException;
 import org.apache.streampipes.connect.container.master.util.AdapterEncryptionService;
-import org.apache.streampipes.manager.execution.endpoint.ExtensionsServiceEndpointGenerator;
+import org.apache.streampipes.connect.container.master.util.WorkerPaths;
 import org.apache.streampipes.manager.storage.UserService;
 import org.apache.streampipes.manager.verification.DataStreamVerifier;
 import org.apache.streampipes.model.SpDataStream;
@@ -36,14 +36,13 @@ import org.apache.streampipes.storage.api.IAdapterStorage;
 import org.apache.streampipes.storage.api.IPipelineElementDescriptionStorageCache;
 import org.apache.streampipes.storage.couchdb.impl.AdapterInstanceStorageImpl;
 import org.apache.streampipes.storage.management.StorageManager;
-import org.apache.streampipes.svcdiscovery.api.model.SpServiceUrlProvider;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.net.URI;
 import java.net.URISyntaxException;
 import java.util.List;
 import java.util.UUID;
+import java.util.stream.Collectors;
 
 import static org.apache.streampipes.manager.storage.UserManagementService.getUserService;
 
@@ -92,7 +91,7 @@ public class AdapterMasterManagement {
     }
 
     // Create stream
-    SpDataStream storedDescription = new SourcesManagement().getAdapterDataStream(ad.getElementId());
+    SpDataStream storedDescription = new SourcesManagement().createAdapterDataStream(ad);
     storedDescription.setCorrespondingAdapterId(elementId);
     installDataSource(storedDescription, username);
     LOG.info("Install source (source URL: {} in backend", ad.getElementId());
@@ -116,37 +115,43 @@ public class AdapterMasterManagement {
   }
 
   /**
-   * First the adapter is stopped removed, then the according data source is deleted
+   * First the adapter is stopped removed, then the corresponding data source is deleted
    * @param elementId
    * @throws AdapterException
    */
   public void deleteAdapter(String elementId) throws AdapterException {
-    // IF Stream adapter delete it
-    boolean isStreamAdapter = isStreamAdapter(elementId);
 
-    if (isStreamAdapter) {
+    // Stop stream adapter
+    if (isStreamAdapter(elementId)) {
       try {
         stopStreamAdapter(elementId);
       } catch (AdapterException e) {
-        LOG.info("Could not stop adapter: " + elementId);
-        LOG.info(e.toString());
+        LOG.info("Could not stop adapter: " + elementId, e);
       }
     }
 
-
+    // Delete adapter
     AdapterDescription ad = adapterInstanceStorage.getAdapter(elementId);
-    String username = ad.getUserName();
     adapterInstanceStorage.deleteAdapter(elementId);
     LOG.info("Successfully deleted adapter: " + elementId);
 
+    // Delete data stream
     UserService userService = getUserService();
     IPipelineElementDescriptionStorageCache requestor = StorageManager.INSTANCE.getPipelineElementStorage();
-
-    if (requestor.getDataStreamById(ad.getElementId()) != null) {
-      requestor.deleteDataStream(requestor.getDataStreamById(ad.getElementId()));
-      userService.deleteOwnSource(username, ad.getElementId());
+    List<SpDataStream> streamsToDelete = requestor
+            .getAllDataStreams()
+            .stream()
+            .filter(spDataStream -> spDataStream.getCorrespondingAdapterId().equals(elementId))
+            .collect(Collectors.toList());
+    String username = ad.getUserName();
+    if (streamsToDelete.size() > 0) {
+      SpDataStream streamToDelete = streamsToDelete.get(0);
+      requestor.deleteDataStream(streamToDelete);
+      userService.deleteOwnSource(username, streamToDelete.getElementId());
       requestor.refreshDataSourceCache();
+      LOG.info("Successfully deleted data stream: " + streamToDelete.getElementId());
     }
+
   }
 
   public List<AdapterDescription> getAllAdapterInstances() throws AdapterException {
@@ -200,7 +205,7 @@ public class AdapterMasterManagement {
 
       try {
         // Find endpoint to start adapter on
-        String baseUrl = findEndpointUrl(ad.getAppId());
+        String baseUrl = WorkerPaths.findEndpointUrl(ad.getAppId());
 
         // Update selected endpoint URL of adapter
         ad.setSelectedEndpointUrl(baseUrl);
@@ -242,11 +247,4 @@ public class AdapterMasterManagement {
     return new AdapterInstanceStorageImpl();
   }
 
-  private String findEndpointUrl(String appId) throws NoServiceEndpointsAvailableException, URISyntaxException {
-    SpServiceUrlProvider serviceUrlProvider = SpServiceUrlProvider.ADAPTER;
-    String endpointUrl = new ExtensionsServiceEndpointGenerator(appId, serviceUrlProvider).getEndpointResourceUrl();
-    URI uri = new URI(endpointUrl);
-    String baseUrl = uri.getScheme() + "://" + uri.getAuthority();
-    return baseUrl;
-  }
 }
diff --git a/streampipes-connect-container-master/src/main/java/org/apache/streampipes/connect/container/master/management/SourcesManagement.java b/streampipes-connect-container-master/src/main/java/org/apache/streampipes/connect/container/master/management/SourcesManagement.java
index ee54fdd..8e3e284 100644
--- a/streampipes-connect-container-master/src/main/java/org/apache/streampipes/connect/container/master/management/SourcesManagement.java
+++ b/streampipes-connect-container-master/src/main/java/org/apache/streampipes/connect/container/master/management/SourcesManagement.java
@@ -22,9 +22,7 @@ import org.apache.streampipes.commons.exceptions.NoServiceEndpointsAvailableExce
 import org.apache.streampipes.connect.adapter.util.TransportFormatGenerator;
 import org.apache.streampipes.connect.api.exception.AdapterException;
 import org.apache.streampipes.connect.container.master.util.AdapterEncryptionService;
-import org.apache.streampipes.container.html.JSONGenerator;
-import org.apache.streampipes.container.html.model.DataSourceDescriptionHtml;
-import org.apache.streampipes.container.html.model.Description;
+import org.apache.streampipes.connect.container.master.util.WorkerPaths;
 import org.apache.streampipes.model.SpDataSet;
 import org.apache.streampipes.model.SpDataStream;
 import org.apache.streampipes.model.connect.adapter.AdapterDescription;
@@ -37,9 +35,7 @@ import org.apache.streampipes.storage.couchdb.impl.AdapterInstanceStorageImpl;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.net.URI;
 import java.net.URISyntaxException;
-import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.List;
 
@@ -47,11 +43,11 @@ public class SourcesManagement {
 
     private Logger logger = LoggerFactory.getLogger(SourcesManagement.class);
 
-    private AdapterInstanceStorageImpl adapterStorage;
+    private AdapterInstanceStorageImpl adapterInstanceStorage;
     private WorkerUrlProvider workerUrlProvider;
 
     public SourcesManagement(AdapterInstanceStorageImpl adapterStorage) {
-      this.adapterStorage = adapterStorage;
+      this.adapterInstanceStorage = adapterStorage;
       this.workerUrlProvider = new WorkerUrlProvider();
     }
 
@@ -59,23 +55,26 @@ public class SourcesManagement {
        this(new AdapterInstanceStorageImpl());
     }
 
-    public void addAdapter(String streamId,
-                           SpDataSet dataSet) throws AdapterException, NoServiceEndpointsAvailableException {
+    public void addSetAdapter(SpDataSet dataSet) throws AdapterException, NoServiceEndpointsAvailableException {
 
+        AdapterSetDescription ad = (AdapterSetDescription) getAndDecryptAdapter(dataSet.getCorrespondingAdapterId());
+        ad.setDataSet(dataSet);
+        ad.setElementId(ad.getElementId() + "/streams/" + dataSet.getDatasetInvocationId());
 
-        String newUrl = getAdapterUrl(streamId);
-        AdapterSetDescription adapterDescription = (AdapterSetDescription) getAdapterDescriptionById(streamId);
-        adapterDescription.setDataSet(dataSet);
-
-        String newId = adapterDescription.getElementId() + "/streams/" + dataSet.getDatasetInvocationId();
-        adapterDescription.setElementId(newId);
-
-        AdapterSetDescription decryptedAdapterDescription =
-                (AdapterSetDescription) new Cloner().adapterDescription(adapterDescription);
-
-        WorkerRestClient.invokeSetAdapter(newUrl, decryptedAdapterDescription);
+        try {
+            String baseUrl = WorkerPaths.findEndpointUrl(ad.getAppId());
+            WorkerRestClient.invokeSetAdapter(baseUrl, ad);
+        } catch (URISyntaxException e) {
+            e.printStackTrace();
+        }
     }
 
+    /**
+     * @param streamId
+     * @param runningInstanceId
+     * @throws AdapterException
+     * @throws NoServiceEndpointsAvailableException
+     */
     public void detachAdapter(String streamId,
                               String runningInstanceId) throws AdapterException, NoServiceEndpointsAvailableException {
         AdapterSetDescription adapterDescription = (AdapterSetDescription) getAdapterDescriptionById(streamId);
@@ -89,7 +88,7 @@ public class SourcesManagement {
 
     private String getAdapterUrl(String streamId) throws NoServiceEndpointsAvailableException {
         String appId = "";
-        List<AdapterDescription> adapterDescriptions = this.adapterStorage.getAllAdapters();
+        List<AdapterDescription> adapterDescriptions = this.adapterInstanceStorage.getAllAdapters();
         for (AdapterDescription ad : adapterDescriptions) {
             if (ad.getElementId().contains(streamId)) {
                 appId = ad.getAppId();
@@ -100,43 +99,9 @@ public class SourcesManagement {
 
     }
 
-    public String getAllAdaptersInstallDescription() throws AdapterException {
-
-        List<AdapterDescription> allAdapters = adapterStorage.getAllAdapters();
-        List<Description> allAdapterDescriptions = new ArrayList<>();
-
-        for (AdapterDescription ad : allAdapters) {
-            URI uri;
-            String uriString = null;
-            try {
-                uriString = ad.getElementId();
-                uri = new URI(uriString);
-            } catch (URISyntaxException e) {
-                logger.error("URI for the sources endpoint is not correct: " + uriString, e);
-                throw new AdapterException("Incorrect source URI: " +uriString);
-            }
-
-
-            List<Description> streams = new ArrayList<>();
-            Description d = new Description(ad.getName(), "", uri.toString());
-            d.setType("set");
-            streams.add(d);
-            DataSourceDescriptionHtml dsd = new DataSourceDescriptionHtml("Adapter Stream",
-                    "This stream is generated by an StreamPipes Connect adapter. ID of adapter: " + ad.getElementId(), uri.toString(), streams);
-            dsd.setType("source");
-            dsd.setAppId(ad.getAppId());
-            dsd.setEditable(!(ad.isInternallyManaged()));
-            allAdapterDescriptions.add(dsd);
-        }
-
-        JSONGenerator json = new JSONGenerator(allAdapterDescriptions);
-
-        return json.buildJson();
-    }
-
     private AdapterDescription getAdapterDescriptionById(String id) {
         AdapterDescription adapterDescription = null;
-        List<AdapterDescription> allAdapters = adapterStorage.getAllAdapters();
+        List<AdapterDescription> allAdapters = adapterInstanceStorage.getAllAdapters();
         for (AdapterDescription a : allAdapters) {
             if (a.getElementId().equals(id)) {
                 adapterDescription = a;
@@ -149,10 +114,7 @@ public class SourcesManagement {
         return decryptedAdapterDescription;
     }
 
-    public SpDataStream getAdapterDataStream(String id) throws AdapterException {
-
-        // get all Adapters and check id
-        AdapterDescription adapterDescription = getAdapterDescriptionById(id);
+    public SpDataStream createAdapterDataStream(AdapterDescription adapterDescription) {
 
         SpDataStream ds;
         if (adapterDescription instanceof AdapterSetDescription) {
@@ -169,9 +131,15 @@ public class SourcesManagement {
 
         ds.setName(adapterDescription.getName());
         ds.setDescription(adapterDescription.getDescription());
-        ds.setCorrespondingAdapterId(adapterDescription.getAppId());
+        ds.setCorrespondingAdapterId(adapterDescription.getElementId());
         ds.setInternallyManaged(true);
 
         return ds;
     }
+
+    private AdapterDescription getAndDecryptAdapter(String adapterId) {
+        AdapterDescription adapter = this.adapterInstanceStorage.getAdapter(adapterId);
+        return new AdapterEncryptionService(new Cloner().adapterDescription(adapter)).decrypt();
+    }
+
 }
diff --git a/streampipes-connect-container-master/src/main/java/org/apache/streampipes/connect/container/master/management/WorkerRestClient.java b/streampipes-connect-container-master/src/main/java/org/apache/streampipes/connect/container/master/management/WorkerRestClient.java
index 0026b60..2240797 100644
--- a/streampipes-connect-container-master/src/main/java/org/apache/streampipes/connect/container/master/management/WorkerRestClient.java
+++ b/streampipes-connect-container-master/src/main/java/org/apache/streampipes/connect/container/master/management/WorkerRestClient.java
@@ -65,9 +65,9 @@ public class WorkerRestClient {
         updateStreamAdapterStatus(adapterStreamDescription.getElementId(), false);
     }
 
-    public static void invokeSetAdapter(String baseUrl,
+    public static void invokeSetAdapter(String endpointUrl,
                                         AdapterSetDescription adapterSetDescription) throws AdapterException {
-        String url = baseUrl + WorkerPaths.getSetInvokePath();
+        String url = endpointUrl + WorkerPaths.getSetInvokePath();
 
         startAdapter(url, adapterSetDescription);
     }
diff --git a/streampipes-connect-container-master/src/main/java/org/apache/streampipes/connect/container/master/util/WorkerPaths.java b/streampipes-connect-container-master/src/main/java/org/apache/streampipes/connect/container/master/util/WorkerPaths.java
index 15572ee..f3af79b 100644
--- a/streampipes-connect-container-master/src/main/java/org/apache/streampipes/connect/container/master/util/WorkerPaths.java
+++ b/streampipes-connect-container-master/src/main/java/org/apache/streampipes/connect/container/master/util/WorkerPaths.java
@@ -17,6 +17,13 @@
  */
 package org.apache.streampipes.connect.container.master.util;
 
+import org.apache.streampipes.commons.exceptions.NoServiceEndpointsAvailableException;
+import org.apache.streampipes.manager.execution.endpoint.ExtensionsServiceEndpointGenerator;
+import org.apache.streampipes.svcdiscovery.api.model.SpServiceUrlProvider;
+
+import java.net.URI;
+import java.net.URISyntaxException;
+
 public class WorkerPaths {
 
   private static final String WorkerMainPath = "/api/v1/worker";
@@ -49,7 +56,13 @@ public class WorkerPaths {
     return WorkerMainPath + "/guess/schema";
   }
 
-
+  public static String findEndpointUrl(String appId) throws NoServiceEndpointsAvailableException, URISyntaxException {
+    SpServiceUrlProvider serviceUrlProvider = SpServiceUrlProvider.ADAPTER;
+    String endpointUrl = new ExtensionsServiceEndpointGenerator(appId, serviceUrlProvider).getEndpointResourceUrl();
+    URI uri = new URI(endpointUrl);
+    String baseUrl = uri.getScheme() + "://" + uri.getAuthority();
+    return baseUrl;
+  }
 
 
 }
diff --git a/streampipes-connect-container-master/src/test/java/org/apache/streampipes/connect/container/master/management/SourcesManagementTest.java b/streampipes-connect-container-master/src/test/java/org/apache/streampipes/connect/container/master/management/SourcesManagementTest.java
index 00fadf9..fb9db01 100644
--- a/streampipes-connect-container-master/src/test/java/org/apache/streampipes/connect/container/master/management/SourcesManagementTest.java
+++ b/streampipes-connect-container-master/src/test/java/org/apache/streampipes/connect/container/master/management/SourcesManagementTest.java
@@ -35,7 +35,6 @@ import org.powermock.modules.junit4.PowerMockRunner;
 import java.util.Arrays;
 import java.util.List;
 
-import static org.junit.Assert.assertEquals;
 import static org.mockito.ArgumentMatchers.any;
 import static org.mockito.ArgumentMatchers.anyString;
 import static org.mockito.Mockito.*;
@@ -61,7 +60,7 @@ public class SourcesManagementTest {
         SourcesManagement sourcesManagement = new SourcesManagement(adapterStorage);
         doNothing().when(WorkerRestClient.class, "invokeSetAdapter", anyString(), any());
 
-        sourcesManagement.addAdapter(ID, new SpDataSet());
+        sourcesManagement.addSetAdapter(new SpDataSet());
 
         verify(adapterStorage, times(1)).getAllAdapters();
         verifyStatic(WorkerRestClient.class, times(1));
@@ -107,32 +106,6 @@ public class SourcesManagementTest {
         sourcesManagement.detachAdapter( ID, "id1");
     }
 
-    @Ignore
-    @Test
-    public void getAllAdaptersInstallDescriptionSuccess() throws Exception {
-
-        AdapterInstanceStorageImpl adapterStorage = mock(AdapterInstanceStorageImpl.class);
-        when(adapterStorage.getAllAdapters()).thenReturn(getAdapterDescriptionList());
-
-        SourcesManagement sourcesManagement = new SourcesManagement(adapterStorage);
-
-        String result = sourcesManagement.getAllAdaptersInstallDescription();
-        assertEquals(getJsonString(), result);
-
-    }
-
-    @Ignore
-    @Test(expected = AdapterException.class)
-    public void getAllAdaptersInstallDescriptionFail() throws Exception {
-        AdapterInstanceStorageImpl adapterStorage = mock(AdapterInstanceStorageImpl.class);
-        AdapterDescription adapterDescription = new GenericAdapterSetDescription();
-        when(adapterStorage.getAllAdapters()).thenReturn(Arrays.asList(adapterDescription));
-        SourcesManagement sourcesManagement = new SourcesManagement(adapterStorage);
-
-        sourcesManagement.getAllAdaptersInstallDescription();
-
-    }
-
     private List<AdapterDescription> getAdapterDescriptionList() {
         GenericAdapterSetDescription adapterSetDescription = new GenericAdapterSetDescription();
 
diff --git a/streampipes-connect-container-worker/src/main/java/org/apache/streampipes/connect/container/worker/rest/AdapterWorkerResource.java b/streampipes-connect-container-worker/src/main/java/org/apache/streampipes/connect/container/worker/rest/AdapterWorkerResource.java
index 79c7607..e423590 100644
--- a/streampipes-connect-container-worker/src/main/java/org/apache/streampipes/connect/container/worker/rest/AdapterWorkerResource.java
+++ b/streampipes-connect-container-worker/src/main/java/org/apache/streampipes/connect/container/worker/rest/AdapterWorkerResource.java
@@ -105,11 +105,11 @@ public class AdapterWorkerResource extends AbstractSharedRestInterface {
         try {
             adapterManagement.invokeSetAdapter(adapterSetDescription);
         } catch (AdapterException e) {
-            logger.error("Error while starting adapter with id " + adapterSetDescription.getUri(), e);
+            logger.error("Error while starting adapter with id " + adapterSetDescription.getElementId(), e);
             return ok(Notifications.error(e.getMessage()));
         }
 
-        String responseMessage = "Set adapter with id " + adapterSetDescription.getUri() + " successfully started";
+        String responseMessage = "Set adapter with id " + adapterSetDescription.getElementId() + " successfully started";
 
         logger.info(responseMessage);
         return ok(Notifications.success(responseMessage));
@@ -125,11 +125,11 @@ public class AdapterWorkerResource extends AbstractSharedRestInterface {
         try {
              adapterManagement.stopSetAdapter(adapterSetDescription);
         } catch (AdapterException e) {
-            logger.error("Error while stopping adapter with id " + adapterSetDescription.getUri(), e);
+            logger.error("Error while stopping adapter with id " + adapterSetDescription.getElementId(), e);
             return ok(Notifications.error(e.getMessage()));
         }
 
-        String responseMessage = "Set adapter with id " + adapterSetDescription.getUri() + " successfully stopped";
+        String responseMessage = "Set adapter with id " + adapterSetDescription.getElementId() + " successfully stopped";
 
         logger.info(responseMessage);
         return ok(Notifications.success(responseMessage));
diff --git a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/setup/CouchDbInstallationStep.java b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/setup/CouchDbInstallationStep.java
index e481c14..444d54f 100644
--- a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/setup/CouchDbInstallationStep.java
+++ b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/setup/CouchDbInstallationStep.java
@@ -31,8 +31,7 @@ import java.util.*;
 
 public class CouchDbInstallationStep implements InstallationStep {
 
-    private static List<String> initRdfEndpointPorts =
-            Collections.singletonList("8099/api/v1/master/sources/");
+    private static List<String> initRdfEndpointPorts = new ArrayList<>();
     private static final String initRdfEndpointHost = "http://localhost:";
 
     private static final String PREPARING_NOTIFICATIONS_TEXT = "Preparing database " +
diff --git a/streampipes-rest/src/main/java/org/apache/streampipes/rest/impl/connect/SourcesResource.java b/streampipes-rest/src/main/java/org/apache/streampipes/rest/impl/connect/SourcesResource.java
index d5fb288..b792711 100644
--- a/streampipes-rest/src/main/java/org/apache/streampipes/rest/impl/connect/SourcesResource.java
+++ b/streampipes-rest/src/main/java/org/apache/streampipes/rest/impl/connect/SourcesResource.java
@@ -23,8 +23,6 @@ import org.apache.streampipes.connect.api.exception.AdapterException;
 import org.apache.streampipes.connect.container.master.management.SourcesManagement;
 import org.apache.streampipes.model.SpDataSet;
 import org.apache.streampipes.model.message.Notifications;
-import org.apache.streampipes.rest.shared.annotation.GsonWithIds;
-import org.apache.streampipes.rest.shared.annotation.JacksonSerialized;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -41,44 +39,17 @@ public class SourcesResource extends AbstractAdapterResource<SourcesManagement>
         super(SourcesManagement::new);
     }
 
-    @GET
-    @Produces(MediaType.APPLICATION_JSON)
-    @GsonWithIds
-    public Response getAllAdaptersInstallDescription() {
-
-        try {
-            String resultingJson = managementService.getAllAdaptersInstallDescription();
-            return ok(resultingJson);
-        } catch (AdapterException e) {
-            LOG.error("Error while getting all adapter descriptions", e);
-            return fail();
-        }
-    }
-
-    @GET
-    @Path("/{id}")
-    @JacksonSerialized
-    @Produces(MediaType.APPLICATION_JSON)
-    public Response getAdapterDataSource(@PathParam("id") String id) {
-        try {
-            return ok(managementService.getAdapterDataStream(id));
-        } catch (AdapterException e) {
-            LOG.error("Error while retrieving DataSourceDescription with id: " + id);
-            return fail();
-        }
-    }
-
     @POST
     @Consumes(MediaType.APPLICATION_JSON)
     @Path("/{streamId}")
     @Produces(MediaType.APPLICATION_JSON)
-    public Response addAdapter(@PathParam("streamId") String elementId,
+    public Response addSetAdapter(@PathParam("streamId") String streamId,
                                SpDataSet dataSet) {
 
         String responseMessage = "Instance of data set " + dataSet.getElementId() + " successfully started";
 
         try {
-            managementService.addAdapter(elementId,  dataSet);
+            managementService.addSetAdapter(dataSet);
         } catch (AdapterException | NoServiceEndpointsAvailableException e) {
             LOG.error("Could not set data set instance: " + dataSet.getElementId(), e);
             return ok(Notifications.error("Could not set data set instance: " + dataSet.getElementId()));
diff --git a/ui/cypress/tests/pipelineElement/AllPipelineElements.ts b/ui/cypress/tests/pipelineElement/AllPipelineElements.ts
index b3fdac4..70a2c05 100644
--- a/ui/cypress/tests/pipelineElement/AllPipelineElements.ts
+++ b/ui/cypress/tests/pipelineElement/AllPipelineElements.ts
@@ -18,7 +18,6 @@
 
 import { ProcessingElementTestUtils } from '../../support/utils/ProcessingElementTestUtils';
 import { ProcessorTest } from '../../support/model/ProcessorTest';
-import { AdapterUtils } from '../../support/utils/AdapterUtils';
 
 const allTests = Cypress.env('processingElements');