You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@streampipes.apache.org by ze...@apache.org on 2023/01/23 09:55:52 UTC

[streampipes] branch SP-1132 updated (dd6c07535 -> 73422d4a4)

This is an automated email from the ASF dual-hosted git repository.

zehnder pushed a change to branch SP-1132
in repository https://gitbox.apache.org/repos/asf/streampipes.git


    from dd6c07535 [wip] Remove set adapters from e2e tests
     add 37be222bb [#1121] Add option to fast replay a file to FileStreamProtocol
     add 469513753 [#1121] Handle cases where timestamp is not part of FileStreamProtocol events
     new 716e98ddf Merge branch 'SP-1121' into SP-1132
     new ddb280fcf [#1132] Fix preprocessing adapter rules
     new 73422d4a4 [#1132] Remove set adapters from e2e tests

The 3 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../TransformValueAdapterPipelineElement.java      |  4 ++
 .../transform/value/ValueEventTransformer.java     | 11 ++-
 .../iiot/protocol/stream/FileStreamProtocol.java   | 83 +++++++++++++++++++++-
 .../strings.en                                     | 15 +++-
 .../fixtures/connect/aggregationRules/expected.csv |  3 +-
 .../fixtures/connect/aggregationRules/input.csv    | 13 ++--
 ui/cypress/fixtures/connect/schemaRules/input.csv  |  4 +-
 .../fixtures/connect/valueRules/expected.csv       |  2 +-
 ui/cypress/fixtures/connect/valueRules/input.csv   |  2 +-
 .../support/builder/GenericAdapterBuilder.ts       |  2 +-
 .../support/utils/DataDownloadDialogUtils.ts       |  2 -
 ui/cypress/support/utils/PrepareTestDataUtils.ts   | 14 ++--
 .../support/utils/ProcessingElementTestUtils.ts    | 17 +++--
 ui/cypress/support/utils/connect/ConnectUtils.ts   | 61 +++++++---------
 ui/cypress/support/utils/datalake/DataLakeUtils.ts | 16 +++--
 .../adapter/createAdapterWithoutStarting.spec.ts   |  5 +-
 ui/cypress/tests/adapter/fileStream.smoke.spec.ts  |  1 -
 ui/cypress/tests/adapter/formats/format.spec.ts    |  4 +-
 .../tests/adapter/persistInDataLake.smoke.spec.ts  |  1 -
 .../tests/adapter/rules/schemaRules.smoke.spec.ts  |  4 +-
 ui/cypress/tests/adapter/rules/streamRules.spec.ts |  4 +-
 ui/cypress/tests/adapter/rules/valueRules.ts       |  9 ++-
 .../dataDownloadDialogTest.smoke.spec.ts           |  1 +
 ui/cypress/tests/datalake/deleteWidget.ts          |  1 -
 .../datalake/widgetDataConfiguration.smoke.spec.ts |  2 -
 .../pipeline-element-runtime-info.component.html   |  2 +-
 .../adapter-started-dialog.component.html          |  3 +-
 27 files changed, 189 insertions(+), 97 deletions(-)


[streampipes] 03/03: [#1132] Remove set adapters from e2e tests

Posted by ze...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

zehnder pushed a commit to branch SP-1132
in repository https://gitbox.apache.org/repos/asf/streampipes.git

commit 73422d4a41e36132e9f0ba3ff62480bc62b42e27
Author: Philipp Zehnder <te...@users.noreply.github.com>
AuthorDate: Mon Jan 23 10:55:41 2023 +0100

    [#1132] Remove set adapters from e2e tests
---
 .../support/builder/GenericAdapterBuilder.ts       |  2 +-
 .../support/utils/DataDownloadDialogUtils.ts       |  2 -
 ui/cypress/support/utils/PrepareTestDataUtils.ts   | 14 +++---
 .../support/utils/ProcessingElementTestUtils.ts    | 17 +++++--
 ui/cypress/support/utils/connect/ConnectUtils.ts   | 55 ++++++++--------------
 ui/cypress/support/utils/datalake/DataLakeUtils.ts | 16 ++++---
 .../adapter/createAdapterWithoutStarting.spec.ts   |  5 +-
 ui/cypress/tests/adapter/formats/format.spec.ts    |  4 +-
 .../dataDownloadDialogTest.smoke.spec.ts           |  1 +
 ui/cypress/tests/datalake/deleteWidget.ts          |  1 -
 .../datalake/widgetDataConfiguration.smoke.spec.ts |  2 -
 .../pipeline-element-runtime-info.component.html   |  2 +-
 .../adapter-started-dialog.component.html          |  3 +-
 13 files changed, 52 insertions(+), 72 deletions(-)

diff --git a/ui/cypress/support/builder/GenericAdapterBuilder.ts b/ui/cypress/support/builder/GenericAdapterBuilder.ts
index eb6a0b310..478a16748 100644
--- a/ui/cypress/support/builder/GenericAdapterBuilder.ts
+++ b/ui/cypress/support/builder/GenericAdapterBuilder.ts
@@ -76,7 +76,7 @@ export class GenericAdapterBuilder {
         return this;
     }
 
-    public setFormat(format: 'csv' | 'json_array') {
+    public setFormat(format: 'csv' | 'json_array' | 'json_object') {
         this.genericAdapterInput.format = format;
         return this;
     }
diff --git a/ui/cypress/support/utils/DataDownloadDialogUtils.ts b/ui/cypress/support/utils/DataDownloadDialogUtils.ts
index 4ca932952..885052d67 100644
--- a/ui/cypress/support/utils/DataDownloadDialogUtils.ts
+++ b/ui/cypress/support/utils/DataDownloadDialogUtils.ts
@@ -50,12 +50,10 @@ export class DataDownloadDialogUtils {
         cy.dataCy('download-configuration-next-btn').click();
 
         // Format
-        // cy.dataCy('download-configuration-json').click();
         cy.dataCy(
             `download-configuration-${exportConfig.formatExportConfig.exportFormat}`,
         ).click();
         if ('delimiter' in exportConfig.formatExportConfig) {
-            // if ((exportConfig.formatExportConfig as CsvFormatExportConfig).delimiter !== undefined) {
             cy.dataCy(
                 `download-configuration-delimiter-${
                     (exportConfig.formatExportConfig as CsvFormatExportConfig)
diff --git a/ui/cypress/support/utils/PrepareTestDataUtils.ts b/ui/cypress/support/utils/PrepareTestDataUtils.ts
index 9e1bcb260..6ff139280 100644
--- a/ui/cypress/support/utils/PrepareTestDataUtils.ts
+++ b/ui/cypress/support/utils/PrepareTestDataUtils.ts
@@ -37,13 +37,8 @@ export class PrepareTestDataUtils {
             format,
             storeInDataLake,
         );
-        // ConnectUtils.addGenericStreamAdapter(adapter, 'sp-connect-adapter-gathering-live-preview');
-        ConnectUtils.addGenericStreamAdapter(adapter);
 
-        // Wait till data is stored
-        // if (storeInDataLake) {
-        //     cy.wait(10000);
-        // }
+        ConnectUtils.addGenericStreamAdapter(adapter);
     }
 
     private static getDataLakeTestAdapter(
@@ -54,7 +49,11 @@ export class PrepareTestDataUtils {
         const adapterBuilder = GenericAdapterBuilder.create('File_Stream')
             .setName(name)
             .setTimestampProperty('timestamp')
-            .addProtocolInput('input', 'speed', '1');
+            .addProtocolInput(
+                'radio',
+                'speed',
+                'fastest_\\(ignore_original_time\\)',
+            );
 
         if (format === 'csv') {
             adapterBuilder
@@ -69,7 +68,6 @@ export class PrepareTestDataUtils {
 
         if (storeInDataLake) {
             adapterBuilder.setStoreInDataLake();
-            adapterBuilder.setStartAdapter(false);
         }
 
         return adapterBuilder.build();
diff --git a/ui/cypress/support/utils/ProcessingElementTestUtils.ts b/ui/cypress/support/utils/ProcessingElementTestUtils.ts
index 5292960a5..5e7406507 100644
--- a/ui/cypress/support/utils/ProcessingElementTestUtils.ts
+++ b/ui/cypress/support/utils/ProcessingElementTestUtils.ts
@@ -24,6 +24,7 @@ import { GenericAdapterBuilder } from '../builder/GenericAdapterBuilder';
 import { PipelineBuilder } from '../builder/PipelineBuilder';
 import { PipelineElementBuilder } from '../builder/PipelineElementBuilder';
 import { ProcessorTest } from '../model/ProcessorTest';
+import { ConnectBtns } from './connect/ConnectBtns';
 
 export class ProcessingElementTestUtils {
     public static testElement(pipelineElementTest: ProcessorTest) {
@@ -47,10 +48,16 @@ export class ProcessingElementTestUtils {
         const adapterName = pipelineElementTest.name.toLowerCase();
 
         // Build adapter
-        const adapterInputBuilder = GenericAdapterBuilder.create('File_Set')
+        const adapterInputBuilder = GenericAdapterBuilder.create('File_Stream')
             .setName(adapterName)
             .setTimestampProperty('timestamp')
-            .setFormat(formatType);
+            .setFormat(formatType)
+            .setStartAdapter(false)
+            .addProtocolInput(
+                'radio',
+                'speed',
+                'fastest_\\(ignore_original_time\\)',
+            );
 
         if (formatType === 'csv') {
             adapterInputBuilder
@@ -60,7 +67,7 @@ export class ProcessingElementTestUtils {
 
         const adapterInput = adapterInputBuilder.build();
 
-        ConnectUtils.addGenericSetAdapter(adapterInput);
+        ConnectUtils.addGenericStreamAdapter(adapterInput);
 
         // Build Pipeline
         const pipelineInput = PipelineBuilder.create(pipelineElementTest.name)
@@ -76,8 +83,8 @@ export class ProcessingElementTestUtils {
 
         PipelineUtils.addPipeline(pipelineInput);
 
-        // Wait till data is stored
-        cy.wait(10000);
+        ConnectUtils.goToConnect();
+        ConnectBtns.startAdapter().click();
 
         DataLakeUtils.checkResults(
             dataLakeIndex,
diff --git a/ui/cypress/support/utils/connect/ConnectUtils.ts b/ui/cypress/support/utils/connect/ConnectUtils.ts
index 902ecd1fa..614e3191e 100644
--- a/ui/cypress/support/utils/connect/ConnectUtils.ts
+++ b/ui/cypress/support/utils/connect/ConnectUtils.ts
@@ -30,7 +30,6 @@ import { ConnectBtns } from './ConnectBtns';
 export class ConnectUtils {
     public static testSpecificStreamAdapter(
         adapterConfiguration: SpecificAdapterInput,
-        successElement = 'sp-connect-adapter-live-preview',
     ) {
         ConnectUtils.goToConnect();
 
@@ -54,7 +53,7 @@ export class ConnectUtils {
 
         ConnectEventSchemaUtils.finishEventSchemaConfiguration();
 
-        ConnectUtils.startStreamAdapter(adapterConfiguration, successElement);
+        ConnectUtils.startStreamAdapter(adapterConfiguration);
     }
 
     public static testGenericStreamAdapter(
@@ -65,21 +64,12 @@ export class ConnectUtils {
 
     public static addGenericStreamAdapter(
         adapterConfiguration: GenericAdapterInput,
-        successElement = 'sp-connect-adapter-live-preview',
     ) {
         ConnectUtils.addGenericAdapter(adapterConfiguration);
 
-        ConnectUtils.startStreamAdapter(adapterConfiguration, successElement);
+        ConnectUtils.startStreamAdapter(adapterConfiguration);
     }
 
-    // public static addGenericSetAdapter(
-    //   adapterConfiguration: GenericAdapterInput,
-    // ) {
-    //   ConnectUtils.addGenericAdapter(adapterConfiguration);
-    //
-    //   ConnectUtils.startSetAdapter(adapterConfiguration);
-    // }
-
     public static addGenericAdapter(adapterConfiguration: GenericAdapterInput) {
         ConnectUtils.goToConnect();
 
@@ -188,24 +178,11 @@ export class ConnectUtils {
         cy.get('#event-schema-next-button').click();
     }
 
-    public static startStreamAdapter(
-        adapterInput: AdapterInput,
-        successElement = 'sp-connect-adapter-live-preview',
-    ) {
-        ConnectUtils.startAdapter(adapterInput, successElement);
+    public static startStreamAdapter(adapterInput: AdapterInput) {
+        ConnectUtils.startAdapter(adapterInput);
     }
 
-    // public static startSetAdapter(adapterInput: AdapterInput) {
-    //   ConnectUtils.startAdapter(
-    //     adapterInput,
-    //     'sp-connect-adapter-set-success',
-    //   );
-    // }
-
-    public static startAdapter(
-        adapterInput: AdapterInput,
-        successElement: string,
-    ) {
+    public static startAdapter(adapterInput: AdapterInput) {
         // Set adapter name
         cy.dataCy('sp-adapter-name').type(adapterInput.adapterName);
 
@@ -225,7 +202,15 @@ export class ConnectUtils {
 
         ConnectBtns.adapterSettingsStartAdapter().click();
 
-        cy.dataCy(successElement, { timeout: 60000 }).should('be.visible');
+        if (adapterInput.startAdapter) {
+            cy.dataCy('sp-connect-adapter-success-live-preview', {
+                timeout: 60000,
+            }).should('be.visible');
+        } else {
+            cy.dataCy('sp-connect-adapter-success-added', {
+                timeout: 60000,
+            }).should('be.visible');
+        }
 
         this.closeAdapterPreview();
     }
@@ -260,6 +245,7 @@ export class ConnectUtils {
                 'speed',
                 'fastest_\\(ignore_original_time\\)',
             )
+            .addProtocolInput('radio', 'replayonce', 'yes')
             .setName('Adapter to test rules')
             .setFormat('csv')
             .addFormatInput('input', 'delimiter', ';')
@@ -292,9 +278,9 @@ export class ConnectUtils {
         cy.get('div').contains('Values').parent().click();
 
         // Validate resulting event
-        cy.dataCy('sp-connect-adapter-live-preview', { timeout: 10000 }).should(
-            'be.visible',
-        );
+        cy.dataCy('sp-connect-adapter-success-live-preview', {
+            timeout: 10000,
+        }).should('be.visible');
 
         // validate that three event properties
         cy.get('.preview-row', { timeout: 10000 })
@@ -308,10 +294,7 @@ export class ConnectUtils {
         ignoreTime: boolean,
         waitTime = 0,
     ) {
-        ConnectUtils.startAdapter(
-            adapterConfiguration,
-            'sp-connect-adapter-gathering-live-preview',
-        );
+        ConnectUtils.startAdapter(adapterConfiguration);
 
         // Wait till data is stored
         cy.wait(waitTime);
diff --git a/ui/cypress/support/utils/datalake/DataLakeUtils.ts b/ui/cypress/support/utils/datalake/DataLakeUtils.ts
index ca6b7bce1..1dbf1ce89 100644
--- a/ui/cypress/support/utils/datalake/DataLakeUtils.ts
+++ b/ui/cypress/support/utils/datalake/DataLakeUtils.ts
@@ -34,10 +34,16 @@ export class DataLakeUtils {
         storeInDataLake: boolean = true,
         format: 'csv' | 'json_array',
     ) {
-        const adapterBuilder = GenericAdapterBuilder.create('File_Set')
+        const adapterBuilder = GenericAdapterBuilder.create('File_Stream')
             .setName(name)
             .setTimestampProperty('timestamp')
-            .addDimensionProperty('randomtext');
+            .addDimensionProperty('randomtext')
+            .addProtocolInput(
+                'radio',
+                'speed',
+                'fastest_\\(ignore_original_time\\)',
+            )
+            .setStartAdapter(true);
 
         if (format === 'csv') {
             adapterBuilder
@@ -67,12 +73,8 @@ export class DataLakeUtils {
             true,
             format,
         );
-        ConnectUtils.addGenericAdapter(adapter);
 
-        // Wait till data is stored
-        if (wait) {
-            cy.wait(10000);
-        }
+        ConnectUtils.addGenericStreamAdapter(adapter);
     }
 
     public static addDataViewAndWidget(
diff --git a/ui/cypress/tests/adapter/createAdapterWithoutStarting.spec.ts b/ui/cypress/tests/adapter/createAdapterWithoutStarting.spec.ts
index 303189078..6f7aea8ba 100644
--- a/ui/cypress/tests/adapter/createAdapterWithoutStarting.spec.ts
+++ b/ui/cypress/tests/adapter/createAdapterWithoutStarting.spec.ts
@@ -33,10 +33,7 @@ describe('Creates a new adapter without starting it', () => {
             .setStartAdapter(false)
             .build();
 
-        ConnectUtils.testSpecificStreamAdapter(
-            adapterInput,
-            'sp-connect-adapter-not-started-success',
-        );
+        ConnectUtils.testSpecificStreamAdapter(adapterInput);
 
         ConnectUtils.startAndValidateAdapter(7);
 
diff --git a/ui/cypress/tests/adapter/formats/format.spec.ts b/ui/cypress/tests/adapter/formats/format.spec.ts
index 38a2cc7d1..7e615e869 100644
--- a/ui/cypress/tests/adapter/formats/format.spec.ts
+++ b/ui/cypress/tests/adapter/formats/format.spec.ts
@@ -188,9 +188,7 @@ const navigateToFormatSelection = () => {
 
     ConnectUtils.selectAdapter('File_Stream');
 
-    ConnectUtils.configureAdapter(
-        UserInputBuilder.create().add('input', 'speed', '1').build(),
-    );
+    ConnectUtils.configureAdapter([]);
 };
 
 const validateResult = expected => {
diff --git a/ui/cypress/tests/dataDownloadDialog/dataDownloadDialogTest.smoke.spec.ts b/ui/cypress/tests/dataDownloadDialog/dataDownloadDialogTest.smoke.spec.ts
index de4dd1a3c..574509e07 100644
--- a/ui/cypress/tests/dataDownloadDialog/dataDownloadDialogTest.smoke.spec.ts
+++ b/ui/cypress/tests/dataDownloadDialog/dataDownloadDialogTest.smoke.spec.ts
@@ -28,6 +28,7 @@ describe('Test live data download dialog', () => {
             'dataDownloadDialog/input.json',
             'json_array',
         );
+
         DataLakeUtils.addDataViewAndTableWidget(dataViewName, 'Persist');
         DataLakeUtils.saveDataExplorerWidgetConfiguration();
     });
diff --git a/ui/cypress/tests/datalake/deleteWidget.ts b/ui/cypress/tests/datalake/deleteWidget.ts
index 8c4c23f0d..2de896dcd 100644
--- a/ui/cypress/tests/datalake/deleteWidget.ts
+++ b/ui/cypress/tests/datalake/deleteWidget.ts
@@ -21,7 +21,6 @@ describe('Test Table View in Data Explorer', () => {
     beforeEach('Setup Test', () => {
         cy.initStreamPipesTest();
         DataLakeUtils.loadDataIntoDataLake('datalake/sample.csv', false);
-        // cy.login();
     });
 
     it('Perform Test', () => {
diff --git a/ui/cypress/tests/datalake/widgetDataConfiguration.smoke.spec.ts b/ui/cypress/tests/datalake/widgetDataConfiguration.smoke.spec.ts
index 2cf5c4372..0568f70f5 100644
--- a/ui/cypress/tests/datalake/widgetDataConfiguration.smoke.spec.ts
+++ b/ui/cypress/tests/datalake/widgetDataConfiguration.smoke.spec.ts
@@ -24,8 +24,6 @@ describe('Test Table View in Data Explorer', () => {
     beforeEach('Setup Test', () => {
         cy.initStreamPipesTest();
         DataLakeUtils.loadDataIntoDataLake('datalake/sample.csv');
-        // cy.login();
-        // DataLakeUtils.goToDatalake();
     });
 
     it('Perform Test', () => {
diff --git a/ui/src/app/connect/components/runtime-info/pipeline-element-runtime-info.component.html b/ui/src/app/connect/components/runtime-info/pipeline-element-runtime-info.component.html
index 6cc21a8e2..f96025e3c 100644
--- a/ui/src/app/connect/components/runtime-info/pipeline-element-runtime-info.component.html
+++ b/ui/src/app/connect/components/runtime-info/pipeline-element-runtime-info.component.html
@@ -35,7 +35,7 @@
         </div>
     </div>
 </div>
-<div *ngIf="runtimeData" data-cy="sp-connect-adapter-live-preview">
+<div *ngIf="runtimeData" data-cy="sp-connect-adapter-success-live-preview">
     <p>Here is a preview of your data:</p>
     <table class="dataTable row-border hover">
         <thead>
diff --git a/ui/src/app/connect/dialog/adapter-started/adapter-started-dialog.component.html b/ui/src/app/connect/dialog/adapter-started/adapter-started-dialog.component.html
index bccae1295..4dd40f8a4 100644
--- a/ui/src/app/connect/dialog/adapter-started/adapter-started-dialog.component.html
+++ b/ui/src/app/connect/dialog/adapter-started/adapter-started-dialog.component.html
@@ -76,8 +76,7 @@
                             fxLayout="row"
                         >
                             <i class="material-icons">done</i>
-                            <span
-                                data-cy="sp-connect-adapter-not-started-success"
+                            <span data-cy="sp-connect-adapter-success-added"
                                 >&nbsp;Your new data stream is now available in
                                 the pipeline editor.</span
                             >


[streampipes] 01/03: Merge branch 'SP-1121' into SP-1132

Posted by ze...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

zehnder pushed a commit to branch SP-1132
in repository https://gitbox.apache.org/repos/asf/streampipes.git

commit 716e98ddffcc8a72f33e4baa1dfad2a61a7d6db2
Merge: dd6c07535 469513753
Author: Philipp Zehnder <te...@users.noreply.github.com>
AuthorDate: Fri Jan 20 11:37:50 2023 +0100

    Merge branch 'SP-1121' into SP-1132

 .../iiot/protocol/stream/FileStreamProtocol.java   | 65 +++++++++++++++++++++-
 .../strings.en                                     | 15 ++++-
 ui/cypress/tests/adapter/fileStream.smoke.spec.ts  |  1 -
 .../tests/adapter/persistInDataLake.smoke.spec.ts  |  1 -
 4 files changed, 75 insertions(+), 7 deletions(-)


[streampipes] 02/03: [#1132] Fix preprocessing adapter rules

Posted by ze...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

zehnder pushed a commit to branch SP-1132
in repository https://gitbox.apache.org/repos/asf/streampipes.git

commit ddb280fcfc3fb619801986859c16807ab647cfd4
Author: Philipp Zehnder <te...@users.noreply.github.com>
AuthorDate: Sat Jan 21 07:21:16 2023 +0100

    [#1132] Fix preprocessing adapter rules
---
 .../TransformValueAdapterPipelineElement.java      |  4 +++
 .../transform/value/ValueEventTransformer.java     | 11 +++++-
 .../iiot/protocol/stream/FileStreamProtocol.java   | 42 +++++++++++++++-------
 .../fixtures/connect/aggregationRules/expected.csv |  3 +-
 .../fixtures/connect/aggregationRules/input.csv    | 13 ++++---
 ui/cypress/fixtures/connect/schemaRules/input.csv  |  4 +--
 .../fixtures/connect/valueRules/expected.csv       |  2 +-
 ui/cypress/fixtures/connect/valueRules/input.csv   |  2 +-
 ui/cypress/support/utils/connect/ConnectUtils.ts   |  6 +++-
 .../tests/adapter/rules/schemaRules.smoke.spec.ts  |  4 +--
 ui/cypress/tests/adapter/rules/streamRules.spec.ts |  4 +--
 ui/cypress/tests/adapter/rules/valueRules.ts       |  9 +++--
 12 files changed, 74 insertions(+), 30 deletions(-)

diff --git a/streampipes-extensions-management/src/main/java/org/apache/streampipes/extensions/management/connect/adapter/preprocessing/elements/TransformValueAdapterPipelineElement.java b/streampipes-extensions-management/src/main/java/org/apache/streampipes/extensions/management/connect/adapter/preprocessing/elements/TransformValueAdapterPipelineElement.java
index ba284501f..f40eff1d7 100644
--- a/streampipes-extensions-management/src/main/java/org/apache/streampipes/extensions/management/connect/adapter/preprocessing/elements/TransformValueAdapterPipelineElement.java
+++ b/streampipes-extensions-management/src/main/java/org/apache/streampipes/extensions/management/connect/adapter/preprocessing/elements/TransformValueAdapterPipelineElement.java
@@ -89,4 +89,8 @@ public class TransformValueAdapterPipelineElement implements IAdapterPipelineEle
   public Map<String, Object> process(Map<String, Object> event) {
     return eventTransformer.transform(event);
   }
+
+  public ValueEventTransformer getEventTransformer() {
+    return eventTransformer;
+  }
 }
diff --git a/streampipes-extensions-management/src/main/java/org/apache/streampipes/extensions/management/connect/adapter/preprocessing/transform/value/ValueEventTransformer.java b/streampipes-extensions-management/src/main/java/org/apache/streampipes/extensions/management/connect/adapter/preprocessing/transform/value/ValueEventTransformer.java
index 6516fa1ee..10fcf8f47 100644
--- a/streampipes-extensions-management/src/main/java/org/apache/streampipes/extensions/management/connect/adapter/preprocessing/transform/value/ValueEventTransformer.java
+++ b/streampipes-extensions-management/src/main/java/org/apache/streampipes/extensions/management/connect/adapter/preprocessing/transform/value/ValueEventTransformer.java
@@ -27,7 +27,7 @@ import java.util.Map;
 public class ValueEventTransformer implements ValueTransformationRule {
 
   private final List<UnitTransformationRule> unitTransformationRules;
-  private final List<TimestampTranformationRule> timestampTransformationRules;
+  private List<TimestampTranformationRule> timestampTransformationRules;
   private final List<CorrectionValueTransformationRule> correctionValueTransformationRules;
   private final List<DatatypeTransformationRule> datatypeTransformationRules;
 
@@ -71,4 +71,13 @@ public class ValueEventTransformer implements ValueTransformationRule {
 
     return event;
   }
+
+  public List<TimestampTranformationRule> getTimestampTransformationRules() {
+    return timestampTransformationRules;
+  }
+
+  public void setTimestampTransformationRules(
+      List<TimestampTranformationRule> timestampTransformationRules) {
+    this.timestampTransformationRules = timestampTransformationRules;
+  }
 }
diff --git a/streampipes-extensions/streampipes-connect-adapters-iiot/src/main/java/org/apache/streampipes/connect/iiot/protocol/stream/FileStreamProtocol.java b/streampipes-extensions/streampipes-connect-adapters-iiot/src/main/java/org/apache/streampipes/connect/iiot/protocol/stream/FileStreamProtocol.java
index 1c9fd49ff..b726cebff 100644
--- a/streampipes-extensions/streampipes-connect-adapters-iiot/src/main/java/org/apache/streampipes/connect/iiot/protocol/stream/FileStreamProtocol.java
+++ b/streampipes-extensions/streampipes-connect-adapters-iiot/src/main/java/org/apache/streampipes/connect/iiot/protocol/stream/FileStreamProtocol.java
@@ -29,6 +29,7 @@ import org.apache.streampipes.extensions.api.connect.exception.ParseException;
 import org.apache.streampipes.extensions.management.connect.adapter.guess.SchemaGuesser;
 import org.apache.streampipes.extensions.management.connect.adapter.model.generic.Protocol;
 import org.apache.streampipes.extensions.management.connect.adapter.preprocessing.elements.AddTimestampPipelineElement;
+import org.apache.streampipes.extensions.management.connect.adapter.preprocessing.elements.TransformValueAdapterPipelineElement;
 import org.apache.streampipes.extensions.management.connect.adapter.preprocessing.transform.value.TimestampTranformationRule;
 import org.apache.streampipes.extensions.management.util.EventSchemaUtils;
 import org.apache.streampipes.model.AdapterType;
@@ -83,7 +84,7 @@ public class FileStreamProtocol extends Protocol {
   private int timeBetweenReplay;
 
   private Optional<IAdapterPipelineElement> addTimestampRule;
-  private Optional<IAdapterPipelineElement> transformationTimestampRule;
+  private Optional<List<TimestampTranformationRule>> transformationTimestampRule;
 
   private ScheduledExecutorService executor;
 
@@ -105,12 +106,11 @@ public class FileStreamProtocol extends Protocol {
     this.replayOnce = replayOnce;
   }
 
-  private Optional<IAdapterPipelineElement> checkAndRemovePipelineElement(
-      List<IAdapterPipelineElement> pipelineElements,
-      Class elementType) {
+  private Optional<IAdapterPipelineElement> checkAndRemoveAddTimestampPipelineElement(
+      List<IAdapterPipelineElement> pipelineElements) {
 
     var pipelineElement = pipelineElements.stream()
-        .filter(o -> o.getClass() == elementType)
+        .filter(o -> o.getClass() == AddTimestampPipelineElement.class)
         .findFirst();
 
     pipelineElement.ifPresent(pipelineElements::remove);
@@ -118,17 +118,33 @@ public class FileStreamProtocol extends Protocol {
     return pipelineElement;
   }
 
+  private Optional<List<TimestampTranformationRule>> checkAndRemoveChangeTimestampPipelineElement(
+      List<IAdapterPipelineElement> pipelineElements) {
+
+    var pipelineElement = pipelineElements.stream()
+        .filter(o -> o.getClass() == TransformValueAdapterPipelineElement.class)
+        .map(pe -> (TransformValueAdapterPipelineElement) pe)
+        .findFirst();
+
+    if (pipelineElement.isPresent()) {
+      var eventTransformer = pipelineElement.get().getEventTransformer();
+      var result = eventTransformer.getTimestampTransformationRules();
+      eventTransformer.setTimestampTransformationRules(List.of());
+
+      return Optional.of(result);
+    }
+    return Optional.empty();
+  }
+
   @Override
   public void run(IAdapterPipeline adapterPipeline) throws AdapterException {
     String timestampKey = getTimestampKey(adapterPipeline.getResultingEventSchema());
 
-    addTimestampRule = checkAndRemovePipelineElement(
-        adapterPipeline.getPipelineElements(),
-        AddTimestampPipelineElement.class);
+    addTimestampRule = checkAndRemoveAddTimestampPipelineElement(
+        adapterPipeline.getPipelineElements());
 
-    transformationTimestampRule = checkAndRemovePipelineElement(
-        adapterPipeline.getPipelineElements(),
-        TimestampTranformationRule.class);
+    transformationTimestampRule = checkAndRemoveChangeTimestampPipelineElement(
+        adapterPipeline.getPipelineElements());
 
 
     var eventProcessor = new LocalEventProcessor(adapterPipeline, timestampKey);
@@ -187,7 +203,9 @@ public class FileStreamProtocol extends Protocol {
         }
 
         if (transformationTimestampRule.isPresent()) {
-          eventMap = transformationTimestampRule.get().process(eventMap);
+          for (var rule : transformationTimestampRule.get()) {
+            rule.transform(eventMap);
+          }
         }
 
         long actualEventTimestamp = (long) eventMap.get(timestampKey);
diff --git a/ui/cypress/fixtures/connect/aggregationRules/expected.csv b/ui/cypress/fixtures/connect/aggregationRules/expected.csv
index 0e9ad9e3b..bcb273f68 100644
--- a/ui/cypress/fixtures/connect/aggregationRules/expected.csv
+++ b/ui/cypress/fixtures/connect/aggregationRules/expected.csv
@@ -1,3 +1,2 @@
 timestamp;value
-1623871501002;4.0
-1623871503004;2.0
+1623871499000;2.0
diff --git a/ui/cypress/fixtures/connect/aggregationRules/input.csv b/ui/cypress/fixtures/connect/aggregationRules/input.csv
index eaa79cf3b..61c507d80 100644
--- a/ui/cypress/fixtures/connect/aggregationRules/input.csv
+++ b/ui/cypress/fixtures/connect/aggregationRules/input.csv
@@ -1,6 +1,11 @@
 timestamp;value
 1623871499000;2.0
-1623871500001;3.0
-1623871501002;4.0
-1623871502003;5.0
-1623871503004;2.0
+1623871500000;3.0
+1623871501000;4.0
+1623871502000;5.0
+1623871503000;2.0
+1623871504000;3.0
+1623871505000;4.0
+1623871506000;5.0
+1623871507000;6.0
+1623871508000;7.0
diff --git a/ui/cypress/fixtures/connect/schemaRules/input.csv b/ui/cypress/fixtures/connect/schemaRules/input.csv
index 9a34acb46..f92f0b8ce 100644
--- a/ui/cypress/fixtures/connect/schemaRules/input.csv
+++ b/ui/cypress/fixtures/connect/schemaRules/input.csv
@@ -1,2 +1,2 @@
-timestamp;count;density;temperature
-1674159690000;122.0;62.0;11
+count;density;temperature
+122.0;62.0;11
diff --git a/ui/cypress/fixtures/connect/valueRules/expected.csv b/ui/cypress/fixtures/connect/valueRules/expected.csv
index d2b632c68..3ddfebcdc 100644
--- a/ui/cypress/fixtures/connect/valueRules/expected.csv
+++ b/ui/cypress/fixtures/connect/valueRules/expected.csv
@@ -1,2 +1,2 @@
 timestamp;temperature;value
-1623871490900;50.003334045410156;100.0
+1640346912123;50.003334045410156;100.0
diff --git a/ui/cypress/fixtures/connect/valueRules/input.csv b/ui/cypress/fixtures/connect/valueRules/input.csv
index da819853b..8f58cc7e8 100644
--- a/ui/cypress/fixtures/connect/valueRules/input.csv
+++ b/ui/cypress/fixtures/connect/valueRules/input.csv
@@ -1,2 +1,2 @@
 timestamp;value;temperature
-1623871490900;10.0;10.0
+2021-12-24T12:55:12.123Z+0100;10.0;10.0
diff --git a/ui/cypress/support/utils/connect/ConnectUtils.ts b/ui/cypress/support/utils/connect/ConnectUtils.ts
index f80cf3c0f..902ecd1fa 100644
--- a/ui/cypress/support/utils/connect/ConnectUtils.ts
+++ b/ui/cypress/support/utils/connect/ConnectUtils.ts
@@ -255,7 +255,11 @@ export class ConnectUtils {
         const adapterConfiguration = GenericAdapterBuilder.create('File_Stream')
             .setStoreInDataLake()
             .setTimestampProperty('timestamp')
-            .addProtocolInput('input', 'speed', '1')
+            .addProtocolInput(
+                'radio',
+                'speed',
+                'fastest_\\(ignore_original_time\\)',
+            )
             .setName('Adapter to test rules')
             .setFormat('csv')
             .addFormatInput('input', 'delimiter', ';')
diff --git a/ui/cypress/tests/adapter/rules/schemaRules.smoke.spec.ts b/ui/cypress/tests/adapter/rules/schemaRules.smoke.spec.ts
index 3ad0e3fda..3399ffd41 100644
--- a/ui/cypress/tests/adapter/rules/schemaRules.smoke.spec.ts
+++ b/ui/cypress/tests/adapter/rules/schemaRules.smoke.spec.ts
@@ -41,8 +41,8 @@ describe('Connect schema rule transformations', () => {
             'Integer',
         );
 
-        // Mark property as timestamp
-        ConnectEventSchemaUtils.markPropertyAsTimestamp('timestamp');
+        // Add a timestamp property
+        ConnectEventSchemaUtils.addTimestampProperty();
 
         ConnectEventSchemaUtils.finishEventSchemaConfiguration();
 
diff --git a/ui/cypress/tests/adapter/rules/streamRules.spec.ts b/ui/cypress/tests/adapter/rules/streamRules.spec.ts
index 8b5d2469a..6c874f594 100644
--- a/ui/cypress/tests/adapter/rules/streamRules.spec.ts
+++ b/ui/cypress/tests/adapter/rules/streamRules.spec.ts
@@ -38,7 +38,7 @@ describe('Connect aggregation rule transformations', () => {
             adapterConfiguration,
             'cypress/fixtures/connect/aggregationRules/expected.csv',
             false,
-            5000,
+            2000,
         );
     });
 });
@@ -62,7 +62,7 @@ describe('Remove duplicates rule transformations', () => {
             adapterConfiguration,
             'cypress/fixtures/connect/removeDuplicateRules/expected.csv',
             false,
-            4000,
+            2000,
         );
     });
 });
diff --git a/ui/cypress/tests/adapter/rules/valueRules.ts b/ui/cypress/tests/adapter/rules/valueRules.ts
index 9a5d8b1fe..d53de16de 100644
--- a/ui/cypress/tests/adapter/rules/valueRules.ts
+++ b/ui/cypress/tests/adapter/rules/valueRules.ts
@@ -29,6 +29,12 @@ describe('Connect value rule transformations', () => {
     it('Perform Test', () => {
         const adapterConfiguration = ConnectUtils.setUpPreprocessingRuleTest();
 
+        // Edit timestamp property
+        ConnectEventSchemaUtils.editTimestampProperty(
+            'timestamp',
+            "yyyy-MM-dd'T'HH:mm:ss.SSS'Z'",
+        );
+
         // Number transformation
         ConnectEventSchemaUtils.numberTransformation('value', '10');
 
@@ -39,14 +45,13 @@ describe('Connect value rule transformations', () => {
             'Degree Fahrenheit',
         );
 
-        ConnectEventSchemaUtils.markPropertyAsTimestamp('timestamp');
-
         ConnectEventSchemaUtils.finishEventSchemaConfiguration();
 
         ConnectUtils.tearDownPreprocessingRuleTest(
             adapterConfiguration,
             'cypress/fixtures/connect/valueRules/expected.csv',
             false,
+            2000,
         );
     });
 });