You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@streampipes.apache.org by ze...@apache.org on 2021/12/28 08:44:13 UTC

[incubator-streampipes] branch STREAMPIPES-483 updated: [STREAMPIPES-483] Add e2e tests for stream transformation rules in connect

This is an automated email from the ASF dual-hosted git repository.

zehnder pushed a commit to branch STREAMPIPES-483
in repository https://gitbox.apache.org/repos/asf/incubator-streampipes.git


The following commit(s) were added to refs/heads/STREAMPIPES-483 by this push:
     new 08bac9b  [STREAMPIPES-483] Add e2e tests for stream transformation rules in connect
08bac9b is described below

commit 08bac9b5c804057788c17b4e6bca9c6ff4358f59
Author: Philipp Zehnder <ze...@fzi.de>
AuthorDate: Tue Dec 28 09:43:49 2021 +0100

    [STREAMPIPES-483] Add e2e tests for stream transformation rules in connect
---
 .../streampipes/connect/adapter/Adapter.java       |  1 +
 .../stream}/DuplicateFilterPipelineElement.java    |  2 +-
 .../stream/EventRateTransformationRule.java        |  5 ---
 .../generic/elements/DuplicateFilterTest.java      |  2 +-
 .../fixtures/connect/aggregationRules/expected.csv |  2 ++
 .../fixtures/connect/aggregationRules/input.csv    | 11 ++++++
 .../connect/removeDuplicateRules/expected.csv      |  6 ++++
 .../connect/removeDuplicateRules/input.csv         | 11 ++++++
 ui/cypress/support/utils/ConnectUtils.ts           |  1 -
 ui/cypress/tests/adapter/schemaRules.smoke.ts      |  2 ++
 .../{valueRules.smoke.ts => streamRules.smoke.ts}  | 41 +++++++++++++++-------
 ui/cypress/tests/adapter/valueRules.smoke.ts       |  2 ++
 .../start-adapter-configuration.component.html     | 12 ++++---
 .../start-adapter-configuration.component.ts       |  2 +-
 14 files changed, 74 insertions(+), 26 deletions(-)

diff --git a/streampipes-connect/src/main/java/org/apache/streampipes/connect/adapter/Adapter.java b/streampipes-connect/src/main/java/org/apache/streampipes/connect/adapter/Adapter.java
index e6336ca..52344b1 100644
--- a/streampipes-connect/src/main/java/org/apache/streampipes/connect/adapter/Adapter.java
+++ b/streampipes-connect/src/main/java/org/apache/streampipes/connect/adapter/Adapter.java
@@ -21,6 +21,7 @@ package org.apache.streampipes.connect.adapter;
 import org.apache.streampipes.config.backend.BackendConfig;
 import org.apache.streampipes.config.backend.SpProtocol;
 import org.apache.streampipes.connect.adapter.model.pipeline.AdapterPipeline;
+import org.apache.streampipes.connect.adapter.preprocessing.transform.stream.DuplicateFilterPipelineElement;
 import org.apache.streampipes.connect.api.IAdapterPipelineElement;
 import org.apache.streampipes.connect.adapter.preprocessing.elements.*;
 import org.apache.streampipes.connect.api.IAdapter;
diff --git a/streampipes-connect/src/main/java/org/apache/streampipes/connect/adapter/preprocessing/elements/DuplicateFilterPipelineElement.java b/streampipes-connect/src/main/java/org/apache/streampipes/connect/adapter/preprocessing/transform/stream/DuplicateFilterPipelineElement.java
similarity index 97%
rename from streampipes-connect/src/main/java/org/apache/streampipes/connect/adapter/preprocessing/elements/DuplicateFilterPipelineElement.java
rename to streampipes-connect/src/main/java/org/apache/streampipes/connect/adapter/preprocessing/transform/stream/DuplicateFilterPipelineElement.java
index ce223f2..9ae0b6e 100644
--- a/streampipes-connect/src/main/java/org/apache/streampipes/connect/adapter/preprocessing/elements/DuplicateFilterPipelineElement.java
+++ b/streampipes-connect/src/main/java/org/apache/streampipes/connect/adapter/preprocessing/transform/stream/DuplicateFilterPipelineElement.java
@@ -16,7 +16,7 @@
  *
  */
 
-package org.apache.streampipes.connect.adapter.preprocessing.elements;
+package org.apache.streampipes.connect.adapter.preprocessing.transform.stream;
 
 import org.apache.streampipes.connect.api.IAdapterPipelineElement;
 
diff --git a/streampipes-connect/src/main/java/org/apache/streampipes/connect/adapter/preprocessing/transform/stream/EventRateTransformationRule.java b/streampipes-connect/src/main/java/org/apache/streampipes/connect/adapter/preprocessing/transform/stream/EventRateTransformationRule.java
index 86b4a88..f7f58cd 100644
--- a/streampipes-connect/src/main/java/org/apache/streampipes/connect/adapter/preprocessing/transform/stream/EventRateTransformationRule.java
+++ b/streampipes-connect/src/main/java/org/apache/streampipes/connect/adapter/preprocessing/transform/stream/EventRateTransformationRule.java
@@ -28,7 +28,6 @@ public class EventRateTransformationRule implements StreamTransformationRule {
     //none (Values from last event), max, min, mean, sum (of the values in the time window)
     private String aggregationType;
 
-    private LinkedList<Map<String, Object>> eventStorage = new LinkedList<>();
     private long lastSentToPipelineTimestamp = System.currentTimeMillis();
 
     public EventRateTransformationRule(long aggregationTimeWindow, String aggregationType) {
@@ -38,10 +37,6 @@ public class EventRateTransformationRule implements StreamTransformationRule {
 
     @Override
     public Map<String, Object> transform(Map<String, Object> event) {
-        if (!aggregationType.equals("none")) {
-            eventStorage.add(event);
-        }
-
         if (System.currentTimeMillis() > lastSentToPipelineTimestamp + aggregationTimeWindow) {
             switch (aggregationType) {
                 case "none":
diff --git a/streampipes-extensions/streampipes-connect-adapters/src/test/java/org/apache/streampipes/connect/adapters/generic/elements/DuplicateFilterTest.java b/streampipes-extensions/streampipes-connect-adapters/src/test/java/org/apache/streampipes/connect/adapters/generic/elements/DuplicateFilterTest.java
index d5579fb..56ea242 100644
--- a/streampipes-extensions/streampipes-connect-adapters/src/test/java/org/apache/streampipes/connect/adapters/generic/elements/DuplicateFilterTest.java
+++ b/streampipes-extensions/streampipes-connect-adapters/src/test/java/org/apache/streampipes/connect/adapters/generic/elements/DuplicateFilterTest.java
@@ -19,7 +19,7 @@
 package org.apache.streampipes.connect.adapters.generic.elements;
 
 import org.junit.Test;
-import org.apache.streampipes.connect.adapter.preprocessing.elements.DuplicateFilterPipelineElement;
+import org.apache.streampipes.connect.adapter.preprocessing.transform.stream.DuplicateFilterPipelineElement;
 
 import java.util.LinkedHashMap;
 import java.util.LinkedList;
diff --git a/ui/cypress/fixtures/connect/aggregationRules/expected.csv b/ui/cypress/fixtures/connect/aggregationRules/expected.csv
new file mode 100644
index 0000000..ab5ca59
--- /dev/null
+++ b/ui/cypress/fixtures/connect/aggregationRules/expected.csv
@@ -0,0 +1,2 @@
+time;value
+1623871499000;2.0
diff --git a/ui/cypress/fixtures/connect/aggregationRules/input.csv b/ui/cypress/fixtures/connect/aggregationRules/input.csv
new file mode 100644
index 0000000..61c507d
--- /dev/null
+++ b/ui/cypress/fixtures/connect/aggregationRules/input.csv
@@ -0,0 +1,11 @@
+timestamp;value
+1623871499000;2.0
+1623871500000;3.0
+1623871501000;4.0
+1623871502000;5.0
+1623871503000;2.0
+1623871504000;3.0
+1623871505000;4.0
+1623871506000;5.0
+1623871507000;6.0
+1623871508000;7.0
diff --git a/ui/cypress/fixtures/connect/removeDuplicateRules/expected.csv b/ui/cypress/fixtures/connect/removeDuplicateRules/expected.csv
new file mode 100644
index 0000000..89bf952
--- /dev/null
+++ b/ui/cypress/fixtures/connect/removeDuplicateRules/expected.csv
@@ -0,0 +1,6 @@
+time;value
+1623871499000;2.0
+1623871500000;3.0
+1623871501000;4.0
+1623871505000;4.0
+1623871507000;6.0
diff --git a/ui/cypress/fixtures/connect/removeDuplicateRules/input.csv b/ui/cypress/fixtures/connect/removeDuplicateRules/input.csv
new file mode 100644
index 0000000..b111f7f
--- /dev/null
+++ b/ui/cypress/fixtures/connect/removeDuplicateRules/input.csv
@@ -0,0 +1,11 @@
+timestamp;value
+1623871499000;2.0
+1623871499000;2.0
+1623871500000;3.0
+1623871501000;4.0
+1623871501000;4.0
+1623871505000;4.0
+1623871505000;4.0
+1623871499000;2.0
+1623871507000;6.0
+1623871507000;6.0
diff --git a/ui/cypress/support/utils/ConnectUtils.ts b/ui/cypress/support/utils/ConnectUtils.ts
index 0f45e99..d457d59 100644
--- a/ui/cypress/support/utils/ConnectUtils.ts
+++ b/ui/cypress/support/utils/ConnectUtils.ts
@@ -206,7 +206,6 @@ export class ConnectUtils {
   public static tearDownPreprocessingRuleTest(adapterConfiguration: AdapterInput,
                                               expectedFile: string,
                                               ignoreTime: boolean) {
-    ConnectEventSchemaUtils.finishEventSchemaConfiguration();
 
     ConnectUtils.startSetAdapter(adapterConfiguration);
 
diff --git a/ui/cypress/tests/adapter/schemaRules.smoke.ts b/ui/cypress/tests/adapter/schemaRules.smoke.ts
index 01418dc..cb19b52 100644
--- a/ui/cypress/tests/adapter/schemaRules.smoke.ts
+++ b/ui/cypress/tests/adapter/schemaRules.smoke.ts
@@ -42,6 +42,8 @@ describe('Connect schema rule transformations', () => {
         // Add a timestamp property
         ConnectEventSchemaUtils.addTimestampProperty();
 
+        ConnectEventSchemaUtils.finishEventSchemaConfiguration();
+
         ConnectUtils.tearDownPreprocessingRuleTest(adapterConfiguration,
             'cypress/fixtures/connect/schemaRules/expected.csv',
             true);
diff --git a/ui/cypress/tests/adapter/valueRules.smoke.ts b/ui/cypress/tests/adapter/streamRules.smoke.ts
similarity index 51%
copy from ui/cypress/tests/adapter/valueRules.smoke.ts
copy to ui/cypress/tests/adapter/streamRules.smoke.ts
index c73b578..37dbb25 100644
--- a/ui/cypress/tests/adapter/valueRules.smoke.ts
+++ b/ui/cypress/tests/adapter/streamRules.smoke.ts
@@ -20,31 +20,46 @@ import { ConnectUtils } from '../../support/utils/ConnectUtils';
 import { FileManagementUtils } from '../../support/utils/FileManagementUtils';
 import { ConnectEventSchemaUtils } from '../../support/utils/ConnectEventSchemaUtils';
 
-describe('Connect value rule transformations', () => {
+describe('Connect aggregation rule transformations', () => {
     beforeEach('Setup Test', () => {
         cy.initStreamPipesTest();
-        FileManagementUtils.addFile('connect/valueRules/input.csv');
+        FileManagementUtils.addFile('connect/aggregationRules/input.csv');
     });
 
     it('Perform Test', () => {
 
         const adapterConfiguration = ConnectUtils.setUpPreprocessingRuleTest();
 
-        // Edit timestamp property
-        ConnectEventSchemaUtils.editTimestampProperty('timestamp',
-            'yyyy-MM-dd\'T\'HH:mm:ss.SSS\'Z\'');
 
-        // Number transformation
-        ConnectEventSchemaUtils.numberTransformation('value', '10');
-
-        // Unit transformation
-        ConnectEventSchemaUtils.unitTransformation('temperature',
-            'Degree Celsius',
-            'Degree Fahrenheit');
+        ConnectEventSchemaUtils.markPropertyAsTimestamp('timestamp');
+        ConnectEventSchemaUtils.finishEventSchemaConfiguration();
+        cy.dataCy('connect-reduce-event-rate-box').children().click();
+        cy.dataCy('connect-reduce-event-input').type('3000');
 
         ConnectUtils.tearDownPreprocessingRuleTest(adapterConfiguration,
-            'cypress/fixtures/connect/valueRules/expected.csv',
+            'cypress/fixtures/connect/aggregationRules/expected.csv',
             false);
     });
+});
+
+describe('Remove duplicates rule transformations', () => {
+    beforeEach('Setup Test', () => {
+        cy.initStreamPipesTest();
+        FileManagementUtils.addFile('connect/removeDuplicateRules/input.csv');
+    });
+
+    it('Perform Test', () => {
 
+        const adapterConfiguration = ConnectUtils.setUpPreprocessingRuleTest();
+
+        ConnectEventSchemaUtils.markPropertyAsTimestamp('timestamp');
+        ConnectEventSchemaUtils.finishEventSchemaConfiguration();
+
+        cy.dataCy('connect-remove-duplicates-box').children().click();
+        cy.dataCy('connect-remove-duplicates-input').type('10000');
+
+        ConnectUtils.tearDownPreprocessingRuleTest(adapterConfiguration,
+            'cypress/fixtures/connect/removeDuplicateRules/expected.csv',
+            false);
+    });
 });
diff --git a/ui/cypress/tests/adapter/valueRules.smoke.ts b/ui/cypress/tests/adapter/valueRules.smoke.ts
index c73b578..e4c8f1e 100644
--- a/ui/cypress/tests/adapter/valueRules.smoke.ts
+++ b/ui/cypress/tests/adapter/valueRules.smoke.ts
@@ -42,6 +42,8 @@ describe('Connect value rule transformations', () => {
             'Degree Celsius',
             'Degree Fahrenheit');
 
+        ConnectEventSchemaUtils.finishEventSchemaConfiguration();
+
         ConnectUtils.tearDownPreprocessingRuleTest(adapterConfiguration,
             'cypress/fixtures/connect/valueRules/expected.csv',
             false);
diff --git a/ui/src/app/connect/components/start-adapter-configuration/start-adapter-configuration.component.html b/ui/src/app/connect/components/start-adapter-configuration/start-adapter-configuration.component.html
index 1c063ea..8f02966 100644
--- a/ui/src/app/connect/components/start-adapter-configuration/start-adapter-configuration.component.html
+++ b/ui/src/app/connect/components/start-adapter-configuration/start-adapter-configuration.component.html
@@ -45,22 +45,26 @@
 
 
             <mat-checkbox [(ngModel)]="removeDuplicates"
-                          [ngModelOptions]="{standalone: true}">Remove Duplicates
+                          [ngModelOptions]="{standalone: true}"
+                          data-cy="connect-remove-duplicates-box">Remove Duplicates
             </mat-checkbox>
             <mat-form-field *ngIf="removeDuplicates" color="accent">
                 <input matInput id="input-removeDuplicatesTime"
                        [ngModelOptions]="{standalone: true}" placeholder="Remove Duplicates Time Window"
-                       [(ngModel)]="removeDuplicatesTime">
+                       [(ngModel)]="removeDuplicatesTime"
+                       data-cy="connect-remove-duplicates-input">
             </mat-form-field>
 
             <mat-checkbox [(ngModel)]="eventRateReduction"
                           [ngModelOptions]="{standalone: true}"
-                          matTooltip="Send maximum one event in the specified time window">Reduce the event rate
+                          matTooltip="Send maximum one event in the specified time window"
+                          data-cy="connect-reduce-event-rate-box">Reduce the event rate
             </mat-checkbox>
             <mat-form-field *ngIf="eventRateReduction" color="accent">
                 <input type="number" matInput id="input-evenRateTime"
                        [ngModelOptions]="{standalone: true}" [(ngModel)]="eventRateTime"
-                       placeholder="Time Window (Milliseconds)" matTooltipPosition="above">
+                       placeholder="Time Window (Milliseconds)" matTooltipPosition="above"
+                       data-cy="connect-reduce-event-input">
             </mat-form-field>
             <mat-form-field *ngIf="eventRateReduction" color="accent">
                 <mat-label>Event Aggregation</mat-label>
diff --git a/ui/src/app/connect/components/start-adapter-configuration/start-adapter-configuration.component.ts b/ui/src/app/connect/components/start-adapter-configuration/start-adapter-configuration.component.ts
index 21ed441..0a37a08 100644
--- a/ui/src/app/connect/components/start-adapter-configuration/start-adapter-configuration.component.ts
+++ b/ui/src/app/connect/components/start-adapter-configuration/start-adapter-configuration.component.ts
@@ -123,7 +123,7 @@ export class StartAdapterConfigurationComponent implements OnInit {
     if (this.eventRateReduction) {
       const eventRate: EventRateTransformationRuleDescription = new EventRateTransformationRuleDescription();
       eventRate['@class'] = 'org.apache.streampipes.model.connect.rules.stream.EventRateTransformationRuleDescription';
-      eventRate.aggregationTimeWindow = this.eventRateMode as any;
+      eventRate.aggregationTimeWindow = this.eventRateTime;
       eventRate.aggregationType = this.eventRateMode;
       this.adapterDescription.rules.push(eventRate);
     }