You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@streampipes.apache.org by ze...@apache.org on 2021/10/11 13:00:19 UTC
[incubator-streampipes] 03/04: [STREAMPIPES-438] Fix data set
adapters
This is an automated email from the ASF dual-hosted git repository.
zehnder pushed a commit to branch STREAMPIPES-438
in repository https://gitbox.apache.org/repos/asf/incubator-streampipes.git
commit 5897ade5217c449991659c93547b0533a068eb6a
Author: Philipp Zehnder <ze...@fzi.de>
AuthorDate: Mon Oct 11 14:45:21 2021 +0200
[STREAMPIPES-438] Fix data set adapters
---
.../master/management/AdapterMasterManagement.java | 46 ++++++-----
.../master/management/SourcesManagement.java | 90 +++++++---------------
.../master/management/WorkerRestClient.java | 4 +-
.../connect/container/master/util/WorkerPaths.java | 15 +++-
.../master/management/SourcesManagementTest.java | 29 +------
.../worker/rest/AdapterWorkerResource.java | 8 +-
.../manager/setup/CouchDbInstallationStep.java | 3 +-
.../rest/impl/connect/SourcesResource.java | 33 +-------
.../tests/pipelineElement/AllPipelineElements.ts | 1 -
9 files changed, 75 insertions(+), 154 deletions(-)
diff --git a/streampipes-connect-container-master/src/main/java/org/apache/streampipes/connect/container/master/management/AdapterMasterManagement.java b/streampipes-connect-container-master/src/main/java/org/apache/streampipes/connect/container/master/management/AdapterMasterManagement.java
index 5ada040..a6d28f6 100644
--- a/streampipes-connect-container-master/src/main/java/org/apache/streampipes/connect/container/master/management/AdapterMasterManagement.java
+++ b/streampipes-connect-container-master/src/main/java/org/apache/streampipes/connect/container/master/management/AdapterMasterManagement.java
@@ -23,7 +23,7 @@ import org.apache.streampipes.commons.exceptions.SepaParseException;
import org.apache.streampipes.connect.adapter.GroundingService;
import org.apache.streampipes.connect.api.exception.AdapterException;
import org.apache.streampipes.connect.container.master.util.AdapterEncryptionService;
-import org.apache.streampipes.manager.execution.endpoint.ExtensionsServiceEndpointGenerator;
+import org.apache.streampipes.connect.container.master.util.WorkerPaths;
import org.apache.streampipes.manager.storage.UserService;
import org.apache.streampipes.manager.verification.DataStreamVerifier;
import org.apache.streampipes.model.SpDataStream;
@@ -36,14 +36,13 @@ import org.apache.streampipes.storage.api.IAdapterStorage;
import org.apache.streampipes.storage.api.IPipelineElementDescriptionStorageCache;
import org.apache.streampipes.storage.couchdb.impl.AdapterInstanceStorageImpl;
import org.apache.streampipes.storage.management.StorageManager;
-import org.apache.streampipes.svcdiscovery.api.model.SpServiceUrlProvider;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.net.URI;
import java.net.URISyntaxException;
import java.util.List;
import java.util.UUID;
+import java.util.stream.Collectors;
import static org.apache.streampipes.manager.storage.UserManagementService.getUserService;
@@ -92,7 +91,7 @@ public class AdapterMasterManagement {
}
// Create stream
- SpDataStream storedDescription = new SourcesManagement().getAdapterDataStream(ad.getElementId());
+ SpDataStream storedDescription = new SourcesManagement().createAdapterDataStream(ad);
storedDescription.setCorrespondingAdapterId(elementId);
installDataSource(storedDescription, username);
LOG.info("Install source (source URL: {} in backend", ad.getElementId());
@@ -116,37 +115,43 @@ public class AdapterMasterManagement {
}
/**
- * First the adapter is stopped removed, then the according data source is deleted
+ * First the adapter is stopped removed, then the corresponding data source is deleted
* @param elementId
* @throws AdapterException
*/
public void deleteAdapter(String elementId) throws AdapterException {
- // IF Stream adapter delete it
- boolean isStreamAdapter = isStreamAdapter(elementId);
- if (isStreamAdapter) {
+ // Stop stream adapter
+ if (isStreamAdapter(elementId)) {
try {
stopStreamAdapter(elementId);
} catch (AdapterException e) {
- LOG.info("Could not stop adapter: " + elementId);
- LOG.info(e.toString());
+ LOG.info("Could not stop adapter: " + elementId, e);
}
}
-
+ // Delete adapter
AdapterDescription ad = adapterInstanceStorage.getAdapter(elementId);
- String username = ad.getUserName();
adapterInstanceStorage.deleteAdapter(elementId);
LOG.info("Successfully deleted adapter: " + elementId);
+ // Delete data stream
UserService userService = getUserService();
IPipelineElementDescriptionStorageCache requestor = StorageManager.INSTANCE.getPipelineElementStorage();
-
- if (requestor.getDataStreamById(ad.getElementId()) != null) {
- requestor.deleteDataStream(requestor.getDataStreamById(ad.getElementId()));
- userService.deleteOwnSource(username, ad.getElementId());
+ List<SpDataStream> streamsToDelete = requestor
+ .getAllDataStreams()
+ .stream()
+ .filter(spDataStream -> spDataStream.getCorrespondingAdapterId().equals(elementId))
+ .collect(Collectors.toList());
+ String username = ad.getUserName();
+ if (streamsToDelete.size() > 0) {
+ SpDataStream streamToDelete = streamsToDelete.get(0);
+ requestor.deleteDataStream(streamToDelete);
+ userService.deleteOwnSource(username, streamToDelete.getElementId());
requestor.refreshDataSourceCache();
+ LOG.info("Successfully deleted data stream: " + streamToDelete.getElementId());
}
+
}
public List<AdapterDescription> getAllAdapterInstances() throws AdapterException {
@@ -200,7 +205,7 @@ public class AdapterMasterManagement {
try {
// Find endpoint to start adapter on
- String baseUrl = findEndpointUrl(ad.getAppId());
+ String baseUrl = WorkerPaths.findEndpointUrl(ad.getAppId());
// Update selected endpoint URL of adapter
ad.setSelectedEndpointUrl(baseUrl);
@@ -242,11 +247,4 @@ public class AdapterMasterManagement {
return new AdapterInstanceStorageImpl();
}
- private String findEndpointUrl(String appId) throws NoServiceEndpointsAvailableException, URISyntaxException {
- SpServiceUrlProvider serviceUrlProvider = SpServiceUrlProvider.ADAPTER;
- String endpointUrl = new ExtensionsServiceEndpointGenerator(appId, serviceUrlProvider).getEndpointResourceUrl();
- URI uri = new URI(endpointUrl);
- String baseUrl = uri.getScheme() + "://" + uri.getAuthority();
- return baseUrl;
- }
}
diff --git a/streampipes-connect-container-master/src/main/java/org/apache/streampipes/connect/container/master/management/SourcesManagement.java b/streampipes-connect-container-master/src/main/java/org/apache/streampipes/connect/container/master/management/SourcesManagement.java
index ee54fdd..8e3e284 100644
--- a/streampipes-connect-container-master/src/main/java/org/apache/streampipes/connect/container/master/management/SourcesManagement.java
+++ b/streampipes-connect-container-master/src/main/java/org/apache/streampipes/connect/container/master/management/SourcesManagement.java
@@ -22,9 +22,7 @@ import org.apache.streampipes.commons.exceptions.NoServiceEndpointsAvailableExce
import org.apache.streampipes.connect.adapter.util.TransportFormatGenerator;
import org.apache.streampipes.connect.api.exception.AdapterException;
import org.apache.streampipes.connect.container.master.util.AdapterEncryptionService;
-import org.apache.streampipes.container.html.JSONGenerator;
-import org.apache.streampipes.container.html.model.DataSourceDescriptionHtml;
-import org.apache.streampipes.container.html.model.Description;
+import org.apache.streampipes.connect.container.master.util.WorkerPaths;
import org.apache.streampipes.model.SpDataSet;
import org.apache.streampipes.model.SpDataStream;
import org.apache.streampipes.model.connect.adapter.AdapterDescription;
@@ -37,9 +35,7 @@ import org.apache.streampipes.storage.couchdb.impl.AdapterInstanceStorageImpl;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.net.URI;
import java.net.URISyntaxException;
-import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
@@ -47,11 +43,11 @@ public class SourcesManagement {
private Logger logger = LoggerFactory.getLogger(SourcesManagement.class);
- private AdapterInstanceStorageImpl adapterStorage;
+ private AdapterInstanceStorageImpl adapterInstanceStorage;
private WorkerUrlProvider workerUrlProvider;
public SourcesManagement(AdapterInstanceStorageImpl adapterStorage) {
- this.adapterStorage = adapterStorage;
+ this.adapterInstanceStorage = adapterStorage;
this.workerUrlProvider = new WorkerUrlProvider();
}
@@ -59,23 +55,26 @@ public class SourcesManagement {
this(new AdapterInstanceStorageImpl());
}
- public void addAdapter(String streamId,
- SpDataSet dataSet) throws AdapterException, NoServiceEndpointsAvailableException {
+ public void addSetAdapter(SpDataSet dataSet) throws AdapterException, NoServiceEndpointsAvailableException {
+ AdapterSetDescription ad = (AdapterSetDescription) getAndDecryptAdapter(dataSet.getCorrespondingAdapterId());
+ ad.setDataSet(dataSet);
+ ad.setElementId(ad.getElementId() + "/streams/" + dataSet.getDatasetInvocationId());
- String newUrl = getAdapterUrl(streamId);
- AdapterSetDescription adapterDescription = (AdapterSetDescription) getAdapterDescriptionById(streamId);
- adapterDescription.setDataSet(dataSet);
-
- String newId = adapterDescription.getElementId() + "/streams/" + dataSet.getDatasetInvocationId();
- adapterDescription.setElementId(newId);
-
- AdapterSetDescription decryptedAdapterDescription =
- (AdapterSetDescription) new Cloner().adapterDescription(adapterDescription);
-
- WorkerRestClient.invokeSetAdapter(newUrl, decryptedAdapterDescription);
+ try {
+ String baseUrl = WorkerPaths.findEndpointUrl(ad.getAppId());
+ WorkerRestClient.invokeSetAdapter(baseUrl, ad);
+ } catch (URISyntaxException e) {
+ e.printStackTrace();
+ }
}
+ /**
+ * @param streamId
+ * @param runningInstanceId
+ * @throws AdapterException
+ * @throws NoServiceEndpointsAvailableException
+ */
public void detachAdapter(String streamId,
String runningInstanceId) throws AdapterException, NoServiceEndpointsAvailableException {
AdapterSetDescription adapterDescription = (AdapterSetDescription) getAdapterDescriptionById(streamId);
@@ -89,7 +88,7 @@ public class SourcesManagement {
private String getAdapterUrl(String streamId) throws NoServiceEndpointsAvailableException {
String appId = "";
- List<AdapterDescription> adapterDescriptions = this.adapterStorage.getAllAdapters();
+ List<AdapterDescription> adapterDescriptions = this.adapterInstanceStorage.getAllAdapters();
for (AdapterDescription ad : adapterDescriptions) {
if (ad.getElementId().contains(streamId)) {
appId = ad.getAppId();
@@ -100,43 +99,9 @@ public class SourcesManagement {
}
- public String getAllAdaptersInstallDescription() throws AdapterException {
-
- List<AdapterDescription> allAdapters = adapterStorage.getAllAdapters();
- List<Description> allAdapterDescriptions = new ArrayList<>();
-
- for (AdapterDescription ad : allAdapters) {
- URI uri;
- String uriString = null;
- try {
- uriString = ad.getElementId();
- uri = new URI(uriString);
- } catch (URISyntaxException e) {
- logger.error("URI for the sources endpoint is not correct: " + uriString, e);
- throw new AdapterException("Incorrect source URI: " +uriString);
- }
-
-
- List<Description> streams = new ArrayList<>();
- Description d = new Description(ad.getName(), "", uri.toString());
- d.setType("set");
- streams.add(d);
- DataSourceDescriptionHtml dsd = new DataSourceDescriptionHtml("Adapter Stream",
- "This stream is generated by an StreamPipes Connect adapter. ID of adapter: " + ad.getElementId(), uri.toString(), streams);
- dsd.setType("source");
- dsd.setAppId(ad.getAppId());
- dsd.setEditable(!(ad.isInternallyManaged()));
- allAdapterDescriptions.add(dsd);
- }
-
- JSONGenerator json = new JSONGenerator(allAdapterDescriptions);
-
- return json.buildJson();
- }
-
private AdapterDescription getAdapterDescriptionById(String id) {
AdapterDescription adapterDescription = null;
- List<AdapterDescription> allAdapters = adapterStorage.getAllAdapters();
+ List<AdapterDescription> allAdapters = adapterInstanceStorage.getAllAdapters();
for (AdapterDescription a : allAdapters) {
if (a.getElementId().equals(id)) {
adapterDescription = a;
@@ -149,10 +114,7 @@ public class SourcesManagement {
return decryptedAdapterDescription;
}
- public SpDataStream getAdapterDataStream(String id) throws AdapterException {
-
- // get all Adapters and check id
- AdapterDescription adapterDescription = getAdapterDescriptionById(id);
+ public SpDataStream createAdapterDataStream(AdapterDescription adapterDescription) {
SpDataStream ds;
if (adapterDescription instanceof AdapterSetDescription) {
@@ -169,9 +131,15 @@ public class SourcesManagement {
ds.setName(adapterDescription.getName());
ds.setDescription(adapterDescription.getDescription());
- ds.setCorrespondingAdapterId(adapterDescription.getAppId());
+ ds.setCorrespondingAdapterId(adapterDescription.getElementId());
ds.setInternallyManaged(true);
return ds;
}
+
+ private AdapterDescription getAndDecryptAdapter(String adapterId) {
+ AdapterDescription adapter = this.adapterInstanceStorage.getAdapter(adapterId);
+ return new AdapterEncryptionService(new Cloner().adapterDescription(adapter)).decrypt();
+ }
+
}
diff --git a/streampipes-connect-container-master/src/main/java/org/apache/streampipes/connect/container/master/management/WorkerRestClient.java b/streampipes-connect-container-master/src/main/java/org/apache/streampipes/connect/container/master/management/WorkerRestClient.java
index 0026b60..2240797 100644
--- a/streampipes-connect-container-master/src/main/java/org/apache/streampipes/connect/container/master/management/WorkerRestClient.java
+++ b/streampipes-connect-container-master/src/main/java/org/apache/streampipes/connect/container/master/management/WorkerRestClient.java
@@ -65,9 +65,9 @@ public class WorkerRestClient {
updateStreamAdapterStatus(adapterStreamDescription.getElementId(), false);
}
- public static void invokeSetAdapter(String baseUrl,
+ public static void invokeSetAdapter(String endpointUrl,
AdapterSetDescription adapterSetDescription) throws AdapterException {
- String url = baseUrl + WorkerPaths.getSetInvokePath();
+ String url = endpointUrl + WorkerPaths.getSetInvokePath();
startAdapter(url, adapterSetDescription);
}
diff --git a/streampipes-connect-container-master/src/main/java/org/apache/streampipes/connect/container/master/util/WorkerPaths.java b/streampipes-connect-container-master/src/main/java/org/apache/streampipes/connect/container/master/util/WorkerPaths.java
index 15572ee..f3af79b 100644
--- a/streampipes-connect-container-master/src/main/java/org/apache/streampipes/connect/container/master/util/WorkerPaths.java
+++ b/streampipes-connect-container-master/src/main/java/org/apache/streampipes/connect/container/master/util/WorkerPaths.java
@@ -17,6 +17,13 @@
*/
package org.apache.streampipes.connect.container.master.util;
+import org.apache.streampipes.commons.exceptions.NoServiceEndpointsAvailableException;
+import org.apache.streampipes.manager.execution.endpoint.ExtensionsServiceEndpointGenerator;
+import org.apache.streampipes.svcdiscovery.api.model.SpServiceUrlProvider;
+
+import java.net.URI;
+import java.net.URISyntaxException;
+
public class WorkerPaths {
private static final String WorkerMainPath = "/api/v1/worker";
@@ -49,7 +56,13 @@ public class WorkerPaths {
return WorkerMainPath + "/guess/schema";
}
-
+ public static String findEndpointUrl(String appId) throws NoServiceEndpointsAvailableException, URISyntaxException {
+ SpServiceUrlProvider serviceUrlProvider = SpServiceUrlProvider.ADAPTER;
+ String endpointUrl = new ExtensionsServiceEndpointGenerator(appId, serviceUrlProvider).getEndpointResourceUrl();
+ URI uri = new URI(endpointUrl);
+ String baseUrl = uri.getScheme() + "://" + uri.getAuthority();
+ return baseUrl;
+ }
}
diff --git a/streampipes-connect-container-master/src/test/java/org/apache/streampipes/connect/container/master/management/SourcesManagementTest.java b/streampipes-connect-container-master/src/test/java/org/apache/streampipes/connect/container/master/management/SourcesManagementTest.java
index 00fadf9..fb9db01 100644
--- a/streampipes-connect-container-master/src/test/java/org/apache/streampipes/connect/container/master/management/SourcesManagementTest.java
+++ b/streampipes-connect-container-master/src/test/java/org/apache/streampipes/connect/container/master/management/SourcesManagementTest.java
@@ -35,7 +35,6 @@ import org.powermock.modules.junit4.PowerMockRunner;
import java.util.Arrays;
import java.util.List;
-import static org.junit.Assert.assertEquals;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyString;
import static org.mockito.Mockito.*;
@@ -61,7 +60,7 @@ public class SourcesManagementTest {
SourcesManagement sourcesManagement = new SourcesManagement(adapterStorage);
doNothing().when(WorkerRestClient.class, "invokeSetAdapter", anyString(), any());
- sourcesManagement.addAdapter(ID, new SpDataSet());
+ sourcesManagement.addSetAdapter(new SpDataSet());
verify(adapterStorage, times(1)).getAllAdapters();
verifyStatic(WorkerRestClient.class, times(1));
@@ -107,32 +106,6 @@ public class SourcesManagementTest {
sourcesManagement.detachAdapter( ID, "id1");
}
- @Ignore
- @Test
- public void getAllAdaptersInstallDescriptionSuccess() throws Exception {
-
- AdapterInstanceStorageImpl adapterStorage = mock(AdapterInstanceStorageImpl.class);
- when(adapterStorage.getAllAdapters()).thenReturn(getAdapterDescriptionList());
-
- SourcesManagement sourcesManagement = new SourcesManagement(adapterStorage);
-
- String result = sourcesManagement.getAllAdaptersInstallDescription();
- assertEquals(getJsonString(), result);
-
- }
-
- @Ignore
- @Test(expected = AdapterException.class)
- public void getAllAdaptersInstallDescriptionFail() throws Exception {
- AdapterInstanceStorageImpl adapterStorage = mock(AdapterInstanceStorageImpl.class);
- AdapterDescription adapterDescription = new GenericAdapterSetDescription();
- when(adapterStorage.getAllAdapters()).thenReturn(Arrays.asList(adapterDescription));
- SourcesManagement sourcesManagement = new SourcesManagement(adapterStorage);
-
- sourcesManagement.getAllAdaptersInstallDescription();
-
- }
-
private List<AdapterDescription> getAdapterDescriptionList() {
GenericAdapterSetDescription adapterSetDescription = new GenericAdapterSetDescription();
diff --git a/streampipes-connect-container-worker/src/main/java/org/apache/streampipes/connect/container/worker/rest/AdapterWorkerResource.java b/streampipes-connect-container-worker/src/main/java/org/apache/streampipes/connect/container/worker/rest/AdapterWorkerResource.java
index 79c7607..e423590 100644
--- a/streampipes-connect-container-worker/src/main/java/org/apache/streampipes/connect/container/worker/rest/AdapterWorkerResource.java
+++ b/streampipes-connect-container-worker/src/main/java/org/apache/streampipes/connect/container/worker/rest/AdapterWorkerResource.java
@@ -105,11 +105,11 @@ public class AdapterWorkerResource extends AbstractSharedRestInterface {
try {
adapterManagement.invokeSetAdapter(adapterSetDescription);
} catch (AdapterException e) {
- logger.error("Error while starting adapter with id " + adapterSetDescription.getUri(), e);
+ logger.error("Error while starting adapter with id " + adapterSetDescription.getElementId(), e);
return ok(Notifications.error(e.getMessage()));
}
- String responseMessage = "Set adapter with id " + adapterSetDescription.getUri() + " successfully started";
+ String responseMessage = "Set adapter with id " + adapterSetDescription.getElementId() + " successfully started";
logger.info(responseMessage);
return ok(Notifications.success(responseMessage));
@@ -125,11 +125,11 @@ public class AdapterWorkerResource extends AbstractSharedRestInterface {
try {
adapterManagement.stopSetAdapter(adapterSetDescription);
} catch (AdapterException e) {
- logger.error("Error while stopping adapter with id " + adapterSetDescription.getUri(), e);
+ logger.error("Error while stopping adapter with id " + adapterSetDescription.getElementId(), e);
return ok(Notifications.error(e.getMessage()));
}
- String responseMessage = "Set adapter with id " + adapterSetDescription.getUri() + " successfully stopped";
+ String responseMessage = "Set adapter with id " + adapterSetDescription.getElementId() + " successfully stopped";
logger.info(responseMessage);
return ok(Notifications.success(responseMessage));
diff --git a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/setup/CouchDbInstallationStep.java b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/setup/CouchDbInstallationStep.java
index e481c14..444d54f 100644
--- a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/setup/CouchDbInstallationStep.java
+++ b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/setup/CouchDbInstallationStep.java
@@ -31,8 +31,7 @@ import java.util.*;
public class CouchDbInstallationStep implements InstallationStep {
- private static List<String> initRdfEndpointPorts =
- Collections.singletonList("8099/api/v1/master/sources/");
+ private static List<String> initRdfEndpointPorts = new ArrayList<>();
private static final String initRdfEndpointHost = "http://localhost:";
private static final String PREPARING_NOTIFICATIONS_TEXT = "Preparing database " +
diff --git a/streampipes-rest/src/main/java/org/apache/streampipes/rest/impl/connect/SourcesResource.java b/streampipes-rest/src/main/java/org/apache/streampipes/rest/impl/connect/SourcesResource.java
index d5fb288..b792711 100644
--- a/streampipes-rest/src/main/java/org/apache/streampipes/rest/impl/connect/SourcesResource.java
+++ b/streampipes-rest/src/main/java/org/apache/streampipes/rest/impl/connect/SourcesResource.java
@@ -23,8 +23,6 @@ import org.apache.streampipes.connect.api.exception.AdapterException;
import org.apache.streampipes.connect.container.master.management.SourcesManagement;
import org.apache.streampipes.model.SpDataSet;
import org.apache.streampipes.model.message.Notifications;
-import org.apache.streampipes.rest.shared.annotation.GsonWithIds;
-import org.apache.streampipes.rest.shared.annotation.JacksonSerialized;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -41,44 +39,17 @@ public class SourcesResource extends AbstractAdapterResource<SourcesManagement>
super(SourcesManagement::new);
}
- @GET
- @Produces(MediaType.APPLICATION_JSON)
- @GsonWithIds
- public Response getAllAdaptersInstallDescription() {
-
- try {
- String resultingJson = managementService.getAllAdaptersInstallDescription();
- return ok(resultingJson);
- } catch (AdapterException e) {
- LOG.error("Error while getting all adapter descriptions", e);
- return fail();
- }
- }
-
- @GET
- @Path("/{id}")
- @JacksonSerialized
- @Produces(MediaType.APPLICATION_JSON)
- public Response getAdapterDataSource(@PathParam("id") String id) {
- try {
- return ok(managementService.getAdapterDataStream(id));
- } catch (AdapterException e) {
- LOG.error("Error while retrieving DataSourceDescription with id: " + id);
- return fail();
- }
- }
-
@POST
@Consumes(MediaType.APPLICATION_JSON)
@Path("/{streamId}")
@Produces(MediaType.APPLICATION_JSON)
- public Response addAdapter(@PathParam("streamId") String elementId,
+ public Response addSetAdapter(@PathParam("streamId") String streamId,
SpDataSet dataSet) {
String responseMessage = "Instance of data set " + dataSet.getElementId() + " successfully started";
try {
- managementService.addAdapter(elementId, dataSet);
+ managementService.addSetAdapter(dataSet);
} catch (AdapterException | NoServiceEndpointsAvailableException e) {
LOG.error("Could not set data set instance: " + dataSet.getElementId(), e);
return ok(Notifications.error("Could not set data set instance: " + dataSet.getElementId()));
diff --git a/ui/cypress/tests/pipelineElement/AllPipelineElements.ts b/ui/cypress/tests/pipelineElement/AllPipelineElements.ts
index b3fdac4..70a2c05 100644
--- a/ui/cypress/tests/pipelineElement/AllPipelineElements.ts
+++ b/ui/cypress/tests/pipelineElement/AllPipelineElements.ts
@@ -18,7 +18,6 @@
import { ProcessingElementTestUtils } from '../../support/utils/ProcessingElementTestUtils';
import { ProcessorTest } from '../../support/model/ProcessorTest';
-import { AdapterUtils } from '../../support/utils/AdapterUtils';
const allTests = Cypress.env('processingElements');