You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@streampipes.apache.org by ze...@apache.org on 2021/10/11 14:26:07 UTC

[incubator-streampipes] branch STREAMPIPES-438 updated (3bdfe1d -> 3e4aec4)

This is an automated email from the ASF dual-hosted git repository.

zehnder pushed a change to branch STREAMPIPES-438
in repository https://gitbox.apache.org/repos/asf/incubator-streampipes.git.


    from 3bdfe1d  [STREAMPIPES-438] Remove data stream source id from backend
     new 68d110d  [STREAMPIPES-438] Fix store in datalake
     new 3e4aec4  [STREAMPIPES-438] Harmonize Model Submitter

The 2 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../master/management/AdapterMasterManagement.java | 10 ----
 .../master/management/SourcesManagement.java       | 54 +++++-----------------
 .../master/management/WorkerRestClient.java        |  2 +-
 .../master/management/SourcesManagementTest.java   | 39 ----------------
 .../manager/execution/http/GraphSubmitter.java     |  2 +-
 .../streampipes/rest/impl/PipelineTemplate.java    |  2 +-
 .../rest/impl/connect/SourcesResource.java         |  9 ++--
 ui/cypress/tests/adapter/InfluxDbAdapter.ts        |  4 +-
 ui/cypress/tests/adapter/KafkaAdapter.ts           |  3 +-
 ui/cypress/tests/adapter/MqttAdapter.ts            |  3 +-
 ui/cypress/tests/adapter/MySQLDbAdapter.ts         |  5 +-
 ui/cypress/tests/adapter/machineDataSimulator.ts   | 14 +-----
 ui/cypress/tests/adapter/persistInDataLake.ts      | 18 +++++---
 13 files changed, 36 insertions(+), 129 deletions(-)

[incubator-streampipes] 01/02: [STREAMPIPES-438] Fix store in datalake

Posted by ze...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

zehnder pushed a commit to branch STREAMPIPES-438
in repository https://gitbox.apache.org/repos/asf/incubator-streampipes.git

commit 68d110ddb818a6dbb3d1845456bbbdb6951711df
Author: Philipp Zehnder <ze...@fzi.de>
AuthorDate: Mon Oct 11 15:05:22 2021 +0200

    [STREAMPIPES-438] Fix store in datalake
---
 .../main/java/org/apache/streampipes/rest/impl/PipelineTemplate.java    | 2 +-
 ui/cypress/tests/adapter/fileStream.ts                                  | 1 +
 2 files changed, 2 insertions(+), 1 deletion(-)

diff --git a/streampipes-rest/src/main/java/org/apache/streampipes/rest/impl/PipelineTemplate.java b/streampipes-rest/src/main/java/org/apache/streampipes/rest/impl/PipelineTemplate.java
index 5cdf6e6..9508d7d 100644
--- a/streampipes-rest/src/main/java/org/apache/streampipes/rest/impl/PipelineTemplate.java
+++ b/streampipes-rest/src/main/java/org/apache/streampipes/rest/impl/PipelineTemplate.java
@@ -124,7 +124,7 @@ public class PipelineTemplate extends AbstractAuthGuardedRestResource {
   private SpDataStream getDataStream(String streamId) {
     return getAllDataStreams()
             .stream()
-            .filter(sp -> sp.getCorrespondingAdapterId().equals(streamId))
+            .filter(sp -> sp.getElementId().equals(streamId))
             .findFirst()
             .get();
   }
diff --git a/ui/cypress/tests/adapter/fileStream.ts b/ui/cypress/tests/adapter/fileStream.ts
index 2c84ae3..22ebcc5 100644
--- a/ui/cypress/tests/adapter/fileStream.ts
+++ b/ui/cypress/tests/adapter/fileStream.ts
@@ -31,6 +31,7 @@ describe('Test File Stream Adapter', () => {
       .create('File_Stream')
       .setName('File Stream Adapter Test')
       .setTimestampProperty('timestamp')
+      .setStoreInDataLake()
       .addProtocolInput('input', 'speed', '1')
       .addProtocolInput('checkbox', 'replaceTimestamp', 'check')
       .setFormat('csv')

[incubator-streampipes] 02/02: [STREAMPIPES-438] Harmonize Model Submitter

Posted by ze...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

zehnder pushed a commit to branch STREAMPIPES-438
in repository https://gitbox.apache.org/repos/asf/incubator-streampipes.git

commit 3e4aec4515ff967a75b6023d446f9ef6bfb01639
Author: Philipp Zehnder <ze...@fzi.de>
AuthorDate: Mon Oct 11 16:25:38 2021 +0200

    [STREAMPIPES-438] Harmonize Model Submitter
---
 .../master/management/AdapterMasterManagement.java | 10 ----
 .../master/management/SourcesManagement.java       | 54 +++++-----------------
 .../master/management/WorkerRestClient.java        |  2 +-
 .../master/management/SourcesManagementTest.java   | 39 ----------------
 .../manager/execution/http/GraphSubmitter.java     |  2 +-
 .../rest/impl/connect/SourcesResource.java         |  9 ++--
 ui/cypress/tests/adapter/InfluxDbAdapter.ts        |  4 +-
 ui/cypress/tests/adapter/KafkaAdapter.ts           |  3 +-
 ui/cypress/tests/adapter/MqttAdapter.ts            |  3 +-
 ui/cypress/tests/adapter/MySQLDbAdapter.ts         |  5 +-
 ui/cypress/tests/adapter/fileStream.ts             |  1 -
 ui/cypress/tests/adapter/machineDataSimulator.ts   | 14 +-----
 ui/cypress/tests/adapter/persistInDataLake.ts      | 18 +++++---
 13 files changed, 35 insertions(+), 129 deletions(-)

diff --git a/streampipes-connect-container-master/src/main/java/org/apache/streampipes/connect/container/master/management/AdapterMasterManagement.java b/streampipes-connect-container-master/src/main/java/org/apache/streampipes/connect/container/master/management/AdapterMasterManagement.java
index a6d28f6..f7ed6bb 100644
--- a/streampipes-connect-container-master/src/main/java/org/apache/streampipes/connect/container/master/management/AdapterMasterManagement.java
+++ b/streampipes-connect-container-master/src/main/java/org/apache/streampipes/connect/container/master/management/AdapterMasterManagement.java
@@ -28,7 +28,6 @@ import org.apache.streampipes.manager.storage.UserService;
 import org.apache.streampipes.manager.verification.DataStreamVerifier;
 import org.apache.streampipes.model.SpDataStream;
 import org.apache.streampipes.model.connect.adapter.AdapterDescription;
-import org.apache.streampipes.model.connect.adapter.AdapterSetDescription;
 import org.apache.streampipes.model.connect.adapter.AdapterStreamDescription;
 import org.apache.streampipes.model.grounding.EventGrounding;
 import org.apache.streampipes.model.util.Cloner;
@@ -176,15 +175,6 @@ public class AdapterMasterManagement {
     return allAdapters;
   }
 
-  public void stopSetAdapter(String elementId,
-                             String baseUrl,
-                             AdapterInstanceStorageImpl adapterStorage) throws AdapterException {
-
-    AdapterSetDescription ad = (AdapterSetDescription) adapterStorage.getAdapter(elementId);
-
-    WorkerRestClient.stopSetAdapter(baseUrl, ad);
-  }
-
   public void stopStreamAdapter(String elementId) throws AdapterException {
     AdapterDescription ad = adapterInstanceStorage.getAdapter(elementId);
 
diff --git a/streampipes-connect-container-master/src/main/java/org/apache/streampipes/connect/container/master/management/SourcesManagement.java b/streampipes-connect-container-master/src/main/java/org/apache/streampipes/connect/container/master/management/SourcesManagement.java
index 8e3e284..1a71453 100644
--- a/streampipes-connect-container-master/src/main/java/org/apache/streampipes/connect/container/master/management/SourcesManagement.java
+++ b/streampipes-connect-container-master/src/main/java/org/apache/streampipes/connect/container/master/management/SourcesManagement.java
@@ -37,18 +37,15 @@ import org.slf4j.LoggerFactory;
 
 import java.net.URISyntaxException;
 import java.util.Arrays;
-import java.util.List;
 
 public class SourcesManagement {
 
     private Logger logger = LoggerFactory.getLogger(SourcesManagement.class);
 
     private AdapterInstanceStorageImpl adapterInstanceStorage;
-    private WorkerUrlProvider workerUrlProvider;
 
     public SourcesManagement(AdapterInstanceStorageImpl adapterStorage) {
       this.adapterInstanceStorage = adapterStorage;
-      this.workerUrlProvider = new WorkerUrlProvider();
     }
 
     public SourcesManagement() {
@@ -70,48 +67,19 @@ public class SourcesManagement {
     }
 
     /**
-     * @param streamId
      * @param runningInstanceId
      * @throws AdapterException
      * @throws NoServiceEndpointsAvailableException
      */
-    public void detachAdapter(String streamId,
-                              String runningInstanceId) throws AdapterException, NoServiceEndpointsAvailableException {
-        AdapterSetDescription adapterDescription = (AdapterSetDescription) getAdapterDescriptionById(streamId);
-
-        String newId = adapterDescription.getElementId() + "/streams/" + runningInstanceId;
-        adapterDescription.setElementId(newId);
-
-        String newUrl = getAdapterUrl(streamId);
-        WorkerRestClient.stopSetAdapter(newUrl, adapterDescription);
-    }
-
-    private String getAdapterUrl(String streamId) throws NoServiceEndpointsAvailableException {
-        String appId = "";
-        List<AdapterDescription> adapterDescriptions = this.adapterInstanceStorage.getAllAdapters();
-        for (AdapterDescription ad : adapterDescriptions) {
-            if (ad.getElementId().contains(streamId)) {
-                appId = ad.getAppId();
-            }
-        }
-
-        return workerUrlProvider.getWorkerBaseUrl(appId);
-
-    }
-
-    private AdapterDescription getAdapterDescriptionById(String id) {
-        AdapterDescription adapterDescription = null;
-        List<AdapterDescription> allAdapters = adapterInstanceStorage.getAllAdapters();
-        for (AdapterDescription a : allAdapters) {
-            if (a.getElementId().equals(id)) {
-                adapterDescription = a;
-            }
+    public void detachAdapter(String elementId, String runningInstanceId) throws AdapterException, NoServiceEndpointsAvailableException {
+        AdapterSetDescription ad = (AdapterSetDescription) getAndDecryptAdapter(elementId);
+        try {
+            String baseUrl = WorkerPaths.findEndpointUrl(ad.getAppId());
+            ad.setElementId(ad.getElementId() + "/streams/" + runningInstanceId);
+            WorkerRestClient.stopSetAdapter(baseUrl, ad);
+        } catch (URISyntaxException e) {
+            e.printStackTrace();
         }
-        AdapterDescription decryptedAdapterDescription =
-                new AdapterEncryptionService(new Cloner()
-                        .adapterDescription(adapterDescription)).decrypt();
-
-        return decryptedAdapterDescription;
     }
 
     public SpDataStream createAdapterDataStream(AdapterDescription adapterDescription) {
@@ -120,8 +88,10 @@ public class SourcesManagement {
         if (adapterDescription instanceof AdapterSetDescription) {
             ds = ((AdapterSetDescription) adapterDescription).getDataSet();
             EventGrounding eg = new EventGrounding();
-            eg.setTransportProtocols(Arrays.asList(SupportedProtocols.kafka(), SupportedProtocols.jms(),
-                    SupportedProtocols.mqtt()));
+            eg.setTransportProtocols(
+                    Arrays.asList(SupportedProtocols.kafka(),
+                            SupportedProtocols.jms(),
+                            SupportedProtocols.mqtt()));
             eg.setTransportFormats(Arrays.asList(TransportFormatGenerator.getTransportFormat()));
             ((SpDataSet) ds).setSupportedGrounding(eg);
         } else {
diff --git a/streampipes-connect-container-master/src/main/java/org/apache/streampipes/connect/container/master/management/WorkerRestClient.java b/streampipes-connect-container-master/src/main/java/org/apache/streampipes/connect/container/master/management/WorkerRestClient.java
index 2240797..74bdfe2 100644
--- a/streampipes-connect-container-master/src/main/java/org/apache/streampipes/connect/container/master/management/WorkerRestClient.java
+++ b/streampipes-connect-container-master/src/main/java/org/apache/streampipes/connect/container/master/management/WorkerRestClient.java
@@ -124,7 +124,7 @@ public class WorkerRestClient {
 
         // Stop execution of adapter
         try {
-            logger.info("Trying to stopAdapter adapter on endpoint: " + url);
+            logger.info("Trying to stop adapter on endpoint: " + url);
 
             String adapterDescription = JacksonSerializer.getObjectMapper().writeValueAsString(ad);
 
diff --git a/streampipes-connect-container-master/src/test/java/org/apache/streampipes/connect/container/master/management/SourcesManagementTest.java b/streampipes-connect-container-master/src/test/java/org/apache/streampipes/connect/container/master/management/SourcesManagementTest.java
index fb9db01..d9feb52 100644
--- a/streampipes-connect-container-master/src/test/java/org/apache/streampipes/connect/container/master/management/SourcesManagementTest.java
+++ b/streampipes-connect-container-master/src/test/java/org/apache/streampipes/connect/container/master/management/SourcesManagementTest.java
@@ -18,7 +18,6 @@
 
 package org.apache.streampipes.connect.container.master.management;
 
-import org.apache.streampipes.connect.api.exception.AdapterException;
 import org.apache.streampipes.model.SpDataSet;
 import org.apache.streampipes.model.connect.adapter.AdapterDescription;
 import org.apache.streampipes.model.connect.adapter.GenericAdapterSetDescription;
@@ -68,44 +67,6 @@ public class SourcesManagementTest {
 
     }
 
-    @Ignore
-    @Test(expected = AdapterException.class)
-    public void addAdapterFail() throws Exception {
-        AdapterInstanceStorageImpl adapterStorage = mock(AdapterInstanceStorageImpl.class);
-        when(adapterStorage.getAllAdapters()).thenReturn(getAdapterDescriptionList());
-        SourcesManagement sourcesManagement = new SourcesManagement(adapterStorage);
-
-        org.powermock.api.mockito.PowerMockito.doThrow(new AdapterException()).when(WorkerRestClient.class, "stopSetAdapter", anyString(), any());
-
-        sourcesManagement.detachAdapter( ID, "id1");
-    }
-
-    @Ignore
-    @Test
-    public void detachAdapterSuccess() throws Exception {
-        AdapterInstanceStorageImpl adapterStorage = mock(AdapterInstanceStorageImpl.class);
-        when(adapterStorage.getAllAdapters()).thenReturn(getAdapterDescriptionList());
-        SourcesManagement sourcesManagement = new SourcesManagement(adapterStorage);
-        doNothing().when(WorkerRestClient.class, "stopSetAdapter", anyString(), any());
-
-        sourcesManagement.detachAdapter(ID, "id1");
-
-        verify(adapterStorage, times(1)).getAllAdapters();
-        verifyStatic(WorkerRestClient.class, times(1));
-        WorkerRestClient.stopSetAdapter(eq("/"), any());
-    }
-
-    @Ignore
-    @Test(expected = AdapterException.class)
-    public void detachAdapterFail() throws Exception {
-        AdapterInstanceStorageImpl adapterStorage = mock(AdapterInstanceStorageImpl.class);
-        when(adapterStorage.getAllAdapters()).thenReturn(getAdapterDescriptionList());
-        SourcesManagement sourcesManagement = new SourcesManagement(adapterStorage);
-        org.powermock.api.mockito.PowerMockito.doThrow(new AdapterException()).when(WorkerRestClient.class, "stopSetAdapter", anyString(), any());
-
-        sourcesManagement.detachAdapter( ID, "id1");
-    }
-
     private List<AdapterDescription> getAdapterDescriptionList() {
         GenericAdapterSetDescription adapterSetDescription = new GenericAdapterSetDescription();
 
diff --git a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/http/GraphSubmitter.java b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/http/GraphSubmitter.java
index 8074b4d..a215f6c 100644
--- a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/http/GraphSubmitter.java
+++ b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/http/GraphSubmitter.java
@@ -124,7 +124,7 @@ public class GraphSubmitter {
   }
 
   private PipelineElementStatus performDetach(SpDataSet dataset) {
-    String endpointUrl = dataset.getSelectedEndpointUrl() + "/" + dataset.getDatasetInvocationId();
+    String endpointUrl = dataset.getSelectedEndpointUrl() + "/" + dataset.getCorrespondingAdapterId() + "/" + dataset.getDatasetInvocationId();
     return new HttpRequestBuilder(dataset, endpointUrl).detach();
   }
 }
diff --git a/streampipes-rest/src/main/java/org/apache/streampipes/rest/impl/connect/SourcesResource.java b/streampipes-rest/src/main/java/org/apache/streampipes/rest/impl/connect/SourcesResource.java
index 00293d4..1729e31 100644
--- a/streampipes-rest/src/main/java/org/apache/streampipes/rest/impl/connect/SourcesResource.java
+++ b/streampipes-rest/src/main/java/org/apache/streampipes/rest/impl/connect/SourcesResource.java
@@ -58,16 +58,15 @@ public class SourcesResource extends AbstractAdapterResource<SourcesManagement>
     }
 
     @DELETE
-    @Path("/{streamId}/{runningInstanceId}")
+    @Path("{adapterId}/{runningInstanceId}")
     @Produces(MediaType.APPLICATION_JSON)
-    public Response detach(@PathParam("streamId") String elementId,
-                           @PathParam("runningInstanceId") String runningInstanceId) {
-        String responseMessage = "Instance of set id: " + elementId  + " with instance id: "+ runningInstanceId + " successfully started";
+    public Response detach(@PathParam("adapterId") String elementId, @PathParam("runningInstanceId") String runningInstanceId) {
+        String responseMessage = "Instance id: "+ runningInstanceId + " successfully started";
 
         try {
             managementService.detachAdapter(elementId, runningInstanceId);
         } catch (AdapterException | NoServiceEndpointsAvailableException e) {
-            LOG.error("Could not set set id "+ elementId  + " with instance id: "+ runningInstanceId, e);
+            LOG.error("Could not detach instance id: "+ runningInstanceId, e);
             return fail();
         }
 
diff --git a/ui/cypress/tests/adapter/InfluxDbAdapter.ts b/ui/cypress/tests/adapter/InfluxDbAdapter.ts
index d1018ce..48a0400 100644
--- a/ui/cypress/tests/adapter/InfluxDbAdapter.ts
+++ b/ui/cypress/tests/adapter/InfluxDbAdapter.ts
@@ -22,7 +22,7 @@ import { PipelineElementBuilder } from '../../support/builder/PipelineElementBui
 import { AdapterUtils } from '../../support/utils/AdapterUtils';
 import { PipelineUtils } from '../../support/utils/PipelineUtils';
 
-describe('Test Random Data Simulator Stream Adapter', () => {
+describe('Test InfluxDB Integration', () => {
   before('Setup Test', () => {
     cy.initStreamPipesTest();
   });
@@ -59,8 +59,6 @@ describe('Test Random Data Simulator Stream Adapter', () => {
     const adapterInput = SpecificAdapterBuilder
       .create('InfluxDB_Stream_Adapter')
       .setName('InfluxDB Adapter')
-      .setTimestampProperty('time')
-      .setStoreInDataLake()
       .addInput('input', 'influxDbHost', 'http://localhost')
       .addInput('input', 'influxDbPort', '8086')
       .addInput('input', 'influxDbDatabase', 'sp')
diff --git a/ui/cypress/tests/adapter/KafkaAdapter.ts b/ui/cypress/tests/adapter/KafkaAdapter.ts
index 089ff23..1125c73 100644
--- a/ui/cypress/tests/adapter/KafkaAdapter.ts
+++ b/ui/cypress/tests/adapter/KafkaAdapter.ts
@@ -23,7 +23,7 @@ import { PipelineBuilder } from '../../support/builder/PipelineBuilder';
 import { PipelineElementBuilder } from '../../support/builder/PipelineElementBuilder';
 import { PipelineUtils } from '../../support/utils/PipelineUtils';
 
-describe('Test Random Data Simulator Stream Adapter', () => {
+describe('Test Kafka Integration', () => {
   before('Setup Test', () => {
     cy.initStreamPipesTest();
   });
@@ -58,7 +58,6 @@ describe('Test Random Data Simulator Stream Adapter', () => {
       .create('Apache_Kafka')
       .setName('Kafka4')
       .setTimestampProperty('timestamp')
-      .setStoreInDataLake()
       .addProtocolInput('select', 'Unauthenticated', 'check')
       .addProtocolInput('input', 'host', 'localhost')
       .addProtocolInput('input', 'port', '9094')
diff --git a/ui/cypress/tests/adapter/MqttAdapter.ts b/ui/cypress/tests/adapter/MqttAdapter.ts
index 1fdd730..c5ecf4c 100644
--- a/ui/cypress/tests/adapter/MqttAdapter.ts
+++ b/ui/cypress/tests/adapter/MqttAdapter.ts
@@ -23,7 +23,7 @@ import { PipelineBuilder } from '../../support/builder/PipelineBuilder';
 import { PipelineElementBuilder } from '../../support/builder/PipelineElementBuilder';
 import { PipelineUtils } from '../../support/utils/PipelineUtils';
 
-describe('Test Random Data Simulator Stream Adapter', () => {
+describe('Test MQTT Integration', () => {
   before('Setup Test', () => {
     cy.initStreamPipesTest();
   });
@@ -56,7 +56,6 @@ describe('Test Random Data Simulator Stream Adapter', () => {
       .create('MQTT')
       .setName('Adapter Mqtt')
       .setTimestampProperty('timestamp')
-      .setStoreInDataLake()
       .addProtocolInput('select', 'Unauthenticated', 'check')
       .addProtocolInput('input', 'broker_url', 'tcp://localhost:1883')
       .addProtocolInput('input', 'topic', topicname)
diff --git a/ui/cypress/tests/adapter/MySQLDbAdapter.ts b/ui/cypress/tests/adapter/MySQLDbAdapter.ts
index 39f2cb1..0055e68 100644
--- a/ui/cypress/tests/adapter/MySQLDbAdapter.ts
+++ b/ui/cypress/tests/adapter/MySQLDbAdapter.ts
@@ -22,7 +22,7 @@ import { PipelineElementBuilder } from '../../support/builder/PipelineElementBui
 import { AdapterUtils } from '../../support/utils/AdapterUtils';
 import { PipelineUtils } from '../../support/utils/PipelineUtils';
 
-describe('Test Random Data Simulator Stream Adapter', () => {
+describe('Test MySQL Integration', () => {
   before('Setup Test', () => {
     cy.initStreamPipesTest();
   });
@@ -32,7 +32,7 @@ describe('Test Random Data Simulator Stream Adapter', () => {
 
     AdapterUtils.addMachineDataSimulator(simulatorAdapterName);
 
-    const topicname = 'cypresstopic';
+    const topicname = 'cypresstopic1';
     const pipelineInput = PipelineBuilder.create('Pipeline Test')
       .addSource(simulatorAdapterName)
       .addSink(
@@ -51,7 +51,6 @@ describe('Test Random Data Simulator Stream Adapter', () => {
       .create('MySql_Stream_Adapter')
       .setName('MySQL Adapter')
       .setTimestampProperty('timestamp')
-      .setStoreInDataLake()
       .addInput('input', 'mysqlHost', 'localhost')
       .addInput('input', 'mysqlUser', 'root')
       .addInput('input', 'mysqlPassword', '7uc4rAymrPhxv6a5')
diff --git a/ui/cypress/tests/adapter/fileStream.ts b/ui/cypress/tests/adapter/fileStream.ts
index 22ebcc5..2c84ae3 100644
--- a/ui/cypress/tests/adapter/fileStream.ts
+++ b/ui/cypress/tests/adapter/fileStream.ts
@@ -31,7 +31,6 @@ describe('Test File Stream Adapter', () => {
       .create('File_Stream')
       .setName('File Stream Adapter Test')
       .setTimestampProperty('timestamp')
-      .setStoreInDataLake()
       .addProtocolInput('input', 'speed', '1')
       .addProtocolInput('checkbox', 'replaceTimestamp', 'check')
       .setFormat('csv')
diff --git a/ui/cypress/tests/adapter/machineDataSimulator.ts b/ui/cypress/tests/adapter/machineDataSimulator.ts
index f2849f6..6ff206f 100644
--- a/ui/cypress/tests/adapter/machineDataSimulator.ts
+++ b/ui/cypress/tests/adapter/machineDataSimulator.ts
@@ -29,23 +29,11 @@ describe('Test Random Data Simulator Stream Adapter', () => {
       .create('Machine_Data_Simulator')
       .setName('Machine Data Simulator Test')
       .addInput('input', 'wait-time-ms', '1000')
-      // .setTimestampProperty('Timestamp')
-      // .setStoreInDataLake()
       .build();
 
     AdapterUtils.testSpecificStreamAdapter(adapterInput);
+    AdapterUtils.deleteAdapter();
 
-    // const adapterInput1 = SpecificAdapterBuilder
-    //   .create('Machine_Data_Simulator')
-    //   .setName('Machine Data Simulator Test 2')
-    //   .addInput('input', 'wait-time-ms', '1000')
-    //   .addInput('radio', 'selected-simulator-option', 'pressure')
-    //   .setTimestampProperty('timestamp')
-    //   .setStoreInDataLake()
-    //   .build();
-    //
-    // AdapterUtils.testSpecificStreamAdapter(adapterInput1);
-    // AdapterUtils.deleteAdapter();
   });
 
 });
diff --git a/ui/cypress/tests/adapter/persistInDataLake.ts b/ui/cypress/tests/adapter/persistInDataLake.ts
index 6745cf4..5ef07b9 100644
--- a/ui/cypress/tests/adapter/persistInDataLake.ts
+++ b/ui/cypress/tests/adapter/persistInDataLake.ts
@@ -17,8 +17,8 @@
  */
 
 import { AdapterUtils } from '../../support/utils/AdapterUtils';
-import { SpecificAdapterBuilder } from '../../support/builder/SpecificAdapterBuilder';
 import { PipelineUtils } from '../../support/utils/PipelineUtils';
+import { GenericAdapterBuilder } from '../../support/builder/GenericAdapterBuilder';
 
 describe('Test File Stream Adapter', () => {
   before('Setup Test', () => {
@@ -27,15 +27,19 @@ describe('Test File Stream Adapter', () => {
 
   it('Perform Test', () => {
 
-    const adapterInput = SpecificAdapterBuilder
-      .create('Machine_Data_Simulator')
-      .setName('Machine Data Simulator Test')
-      .addInput('input', 'wait-time-ms', '1000')
-      .setTimestampProperty('Timestamp')
+    const adapterInput = GenericAdapterBuilder
+      .create('File_Stream')
+      .setName('File Stream Adapter Test')
+      .setTimestampProperty('timestamp')
       .setStoreInDataLake()
+      .addProtocolInput('input', 'speed', '1')
+      .addProtocolInput('checkbox', 'replaceTimestamp', 'check')
+      .setFormat('csv')
+      .addFormatInput('input', 'delimiter', ';')
+      .addFormatInput('checkbox', 'header', 'check')
       .build();
 
-    AdapterUtils.testSpecificStreamAdapter(adapterInput);
+    AdapterUtils.testGenericStreamAdapter(adapterInput);
     PipelineUtils.checkAmountOfPipelinesPipeline(1);
   });