You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@streampipes.apache.org by ze...@apache.org on 2021/12/28 08:44:13 UTC
[incubator-streampipes] branch STREAMPIPES-483 updated: [STREAMPIPES-483] Add e2e tests for stream transformation rules in connect
This is an automated email from the ASF dual-hosted git repository.
zehnder pushed a commit to branch STREAMPIPES-483
in repository https://gitbox.apache.org/repos/asf/incubator-streampipes.git
The following commit(s) were added to refs/heads/STREAMPIPES-483 by this push:
new 08bac9b [STREAMPIPES-483] Add e2e tests for stream transformation rules in connect
08bac9b is described below
commit 08bac9b5c804057788c17b4e6bca9c6ff4358f59
Author: Philipp Zehnder <ze...@fzi.de>
AuthorDate: Tue Dec 28 09:43:49 2021 +0100
[STREAMPIPES-483] Add e2e tests for stream transformation rules in connect
---
.../streampipes/connect/adapter/Adapter.java | 1 +
.../stream}/DuplicateFilterPipelineElement.java | 2 +-
.../stream/EventRateTransformationRule.java | 5 ---
.../generic/elements/DuplicateFilterTest.java | 2 +-
.../fixtures/connect/aggregationRules/expected.csv | 2 ++
.../fixtures/connect/aggregationRules/input.csv | 11 ++++++
.../connect/removeDuplicateRules/expected.csv | 6 ++++
.../connect/removeDuplicateRules/input.csv | 11 ++++++
ui/cypress/support/utils/ConnectUtils.ts | 1 -
ui/cypress/tests/adapter/schemaRules.smoke.ts | 2 ++
.../{valueRules.smoke.ts => streamRules.smoke.ts} | 41 +++++++++++++++-------
ui/cypress/tests/adapter/valueRules.smoke.ts | 2 ++
.../start-adapter-configuration.component.html | 12 ++++---
.../start-adapter-configuration.component.ts | 2 +-
14 files changed, 74 insertions(+), 26 deletions(-)
diff --git a/streampipes-connect/src/main/java/org/apache/streampipes/connect/adapter/Adapter.java b/streampipes-connect/src/main/java/org/apache/streampipes/connect/adapter/Adapter.java
index e6336ca..52344b1 100644
--- a/streampipes-connect/src/main/java/org/apache/streampipes/connect/adapter/Adapter.java
+++ b/streampipes-connect/src/main/java/org/apache/streampipes/connect/adapter/Adapter.java
@@ -21,6 +21,7 @@ package org.apache.streampipes.connect.adapter;
import org.apache.streampipes.config.backend.BackendConfig;
import org.apache.streampipes.config.backend.SpProtocol;
import org.apache.streampipes.connect.adapter.model.pipeline.AdapterPipeline;
+import org.apache.streampipes.connect.adapter.preprocessing.transform.stream.DuplicateFilterPipelineElement;
import org.apache.streampipes.connect.api.IAdapterPipelineElement;
import org.apache.streampipes.connect.adapter.preprocessing.elements.*;
import org.apache.streampipes.connect.api.IAdapter;
diff --git a/streampipes-connect/src/main/java/org/apache/streampipes/connect/adapter/preprocessing/elements/DuplicateFilterPipelineElement.java b/streampipes-connect/src/main/java/org/apache/streampipes/connect/adapter/preprocessing/transform/stream/DuplicateFilterPipelineElement.java
similarity index 97%
rename from streampipes-connect/src/main/java/org/apache/streampipes/connect/adapter/preprocessing/elements/DuplicateFilterPipelineElement.java
rename to streampipes-connect/src/main/java/org/apache/streampipes/connect/adapter/preprocessing/transform/stream/DuplicateFilterPipelineElement.java
index ce223f2..9ae0b6e 100644
--- a/streampipes-connect/src/main/java/org/apache/streampipes/connect/adapter/preprocessing/elements/DuplicateFilterPipelineElement.java
+++ b/streampipes-connect/src/main/java/org/apache/streampipes/connect/adapter/preprocessing/transform/stream/DuplicateFilterPipelineElement.java
@@ -16,7 +16,7 @@
*
*/
-package org.apache.streampipes.connect.adapter.preprocessing.elements;
+package org.apache.streampipes.connect.adapter.preprocessing.transform.stream;
import org.apache.streampipes.connect.api.IAdapterPipelineElement;
diff --git a/streampipes-connect/src/main/java/org/apache/streampipes/connect/adapter/preprocessing/transform/stream/EventRateTransformationRule.java b/streampipes-connect/src/main/java/org/apache/streampipes/connect/adapter/preprocessing/transform/stream/EventRateTransformationRule.java
index 86b4a88..f7f58cd 100644
--- a/streampipes-connect/src/main/java/org/apache/streampipes/connect/adapter/preprocessing/transform/stream/EventRateTransformationRule.java
+++ b/streampipes-connect/src/main/java/org/apache/streampipes/connect/adapter/preprocessing/transform/stream/EventRateTransformationRule.java
@@ -28,7 +28,6 @@ public class EventRateTransformationRule implements StreamTransformationRule {
//none (Values from last event), max, min, mean, sum (of the values in the time window)
private String aggregationType;
- private LinkedList<Map<String, Object>> eventStorage = new LinkedList<>();
private long lastSentToPipelineTimestamp = System.currentTimeMillis();
public EventRateTransformationRule(long aggregationTimeWindow, String aggregationType) {
@@ -38,10 +37,6 @@ public class EventRateTransformationRule implements StreamTransformationRule {
@Override
public Map<String, Object> transform(Map<String, Object> event) {
- if (!aggregationType.equals("none")) {
- eventStorage.add(event);
- }
-
if (System.currentTimeMillis() > lastSentToPipelineTimestamp + aggregationTimeWindow) {
switch (aggregationType) {
case "none":
diff --git a/streampipes-extensions/streampipes-connect-adapters/src/test/java/org/apache/streampipes/connect/adapters/generic/elements/DuplicateFilterTest.java b/streampipes-extensions/streampipes-connect-adapters/src/test/java/org/apache/streampipes/connect/adapters/generic/elements/DuplicateFilterTest.java
index d5579fb..56ea242 100644
--- a/streampipes-extensions/streampipes-connect-adapters/src/test/java/org/apache/streampipes/connect/adapters/generic/elements/DuplicateFilterTest.java
+++ b/streampipes-extensions/streampipes-connect-adapters/src/test/java/org/apache/streampipes/connect/adapters/generic/elements/DuplicateFilterTest.java
@@ -19,7 +19,7 @@
package org.apache.streampipes.connect.adapters.generic.elements;
import org.junit.Test;
-import org.apache.streampipes.connect.adapter.preprocessing.elements.DuplicateFilterPipelineElement;
+import org.apache.streampipes.connect.adapter.preprocessing.transform.stream.DuplicateFilterPipelineElement;
import java.util.LinkedHashMap;
import java.util.LinkedList;
diff --git a/ui/cypress/fixtures/connect/aggregationRules/expected.csv b/ui/cypress/fixtures/connect/aggregationRules/expected.csv
new file mode 100644
index 0000000..ab5ca59
--- /dev/null
+++ b/ui/cypress/fixtures/connect/aggregationRules/expected.csv
@@ -0,0 +1,2 @@
+time;value
+1623871499000;2.0
diff --git a/ui/cypress/fixtures/connect/aggregationRules/input.csv b/ui/cypress/fixtures/connect/aggregationRules/input.csv
new file mode 100644
index 0000000..61c507d
--- /dev/null
+++ b/ui/cypress/fixtures/connect/aggregationRules/input.csv
@@ -0,0 +1,11 @@
+timestamp;value
+1623871499000;2.0
+1623871500000;3.0
+1623871501000;4.0
+1623871502000;5.0
+1623871503000;2.0
+1623871504000;3.0
+1623871505000;4.0
+1623871506000;5.0
+1623871507000;6.0
+1623871508000;7.0
diff --git a/ui/cypress/fixtures/connect/removeDuplicateRules/expected.csv b/ui/cypress/fixtures/connect/removeDuplicateRules/expected.csv
new file mode 100644
index 0000000..89bf952
--- /dev/null
+++ b/ui/cypress/fixtures/connect/removeDuplicateRules/expected.csv
@@ -0,0 +1,6 @@
+time;value
+1623871499000;2.0
+1623871500000;3.0
+1623871501000;4.0
+1623871505000;4.0
+1623871507000;6.0
diff --git a/ui/cypress/fixtures/connect/removeDuplicateRules/input.csv b/ui/cypress/fixtures/connect/removeDuplicateRules/input.csv
new file mode 100644
index 0000000..b111f7f
--- /dev/null
+++ b/ui/cypress/fixtures/connect/removeDuplicateRules/input.csv
@@ -0,0 +1,11 @@
+timestamp;value
+1623871499000;2.0
+1623871499000;2.0
+1623871500000;3.0
+1623871501000;4.0
+1623871501000;4.0
+1623871505000;4.0
+1623871505000;4.0
+1623871499000;2.0
+1623871507000;6.0
+1623871507000;6.0
diff --git a/ui/cypress/support/utils/ConnectUtils.ts b/ui/cypress/support/utils/ConnectUtils.ts
index 0f45e99..d457d59 100644
--- a/ui/cypress/support/utils/ConnectUtils.ts
+++ b/ui/cypress/support/utils/ConnectUtils.ts
@@ -206,7 +206,6 @@ export class ConnectUtils {
public static tearDownPreprocessingRuleTest(adapterConfiguration: AdapterInput,
expectedFile: string,
ignoreTime: boolean) {
- ConnectEventSchemaUtils.finishEventSchemaConfiguration();
ConnectUtils.startSetAdapter(adapterConfiguration);
diff --git a/ui/cypress/tests/adapter/schemaRules.smoke.ts b/ui/cypress/tests/adapter/schemaRules.smoke.ts
index 01418dc..cb19b52 100644
--- a/ui/cypress/tests/adapter/schemaRules.smoke.ts
+++ b/ui/cypress/tests/adapter/schemaRules.smoke.ts
@@ -42,6 +42,8 @@ describe('Connect schema rule transformations', () => {
// Add a timestamp property
ConnectEventSchemaUtils.addTimestampProperty();
+ ConnectEventSchemaUtils.finishEventSchemaConfiguration();
+
ConnectUtils.tearDownPreprocessingRuleTest(adapterConfiguration,
'cypress/fixtures/connect/schemaRules/expected.csv',
true);
diff --git a/ui/cypress/tests/adapter/valueRules.smoke.ts b/ui/cypress/tests/adapter/streamRules.smoke.ts
similarity index 51%
copy from ui/cypress/tests/adapter/valueRules.smoke.ts
copy to ui/cypress/tests/adapter/streamRules.smoke.ts
index c73b578..37dbb25 100644
--- a/ui/cypress/tests/adapter/valueRules.smoke.ts
+++ b/ui/cypress/tests/adapter/streamRules.smoke.ts
@@ -20,31 +20,46 @@ import { ConnectUtils } from '../../support/utils/ConnectUtils';
import { FileManagementUtils } from '../../support/utils/FileManagementUtils';
import { ConnectEventSchemaUtils } from '../../support/utils/ConnectEventSchemaUtils';
-describe('Connect value rule transformations', () => {
+describe('Connect aggregation rule transformations', () => {
beforeEach('Setup Test', () => {
cy.initStreamPipesTest();
- FileManagementUtils.addFile('connect/valueRules/input.csv');
+ FileManagementUtils.addFile('connect/aggregationRules/input.csv');
});
it('Perform Test', () => {
const adapterConfiguration = ConnectUtils.setUpPreprocessingRuleTest();
- // Edit timestamp property
- ConnectEventSchemaUtils.editTimestampProperty('timestamp',
- 'yyyy-MM-dd\'T\'HH:mm:ss.SSS\'Z\'');
- // Number transformation
- ConnectEventSchemaUtils.numberTransformation('value', '10');
-
- // Unit transformation
- ConnectEventSchemaUtils.unitTransformation('temperature',
- 'Degree Celsius',
- 'Degree Fahrenheit');
+ ConnectEventSchemaUtils.markPropertyAsTimestamp('timestamp');
+ ConnectEventSchemaUtils.finishEventSchemaConfiguration();
+ cy.dataCy('connect-reduce-event-rate-box').children().click();
+ cy.dataCy('connect-reduce-event-input').type('3000');
ConnectUtils.tearDownPreprocessingRuleTest(adapterConfiguration,
- 'cypress/fixtures/connect/valueRules/expected.csv',
+ 'cypress/fixtures/connect/aggregationRules/expected.csv',
false);
});
+});
+
+describe('Remove duplicates rule transformations', () => {
+ beforeEach('Setup Test', () => {
+ cy.initStreamPipesTest();
+ FileManagementUtils.addFile('connect/removeDuplicateRules/input.csv');
+ });
+
+ it('Perform Test', () => {
+ const adapterConfiguration = ConnectUtils.setUpPreprocessingRuleTest();
+
+ ConnectEventSchemaUtils.markPropertyAsTimestamp('timestamp');
+ ConnectEventSchemaUtils.finishEventSchemaConfiguration();
+
+ cy.dataCy('connect-remove-duplicates-box').children().click();
+ cy.dataCy('connect-remove-duplicates-input').type('10000');
+
+ ConnectUtils.tearDownPreprocessingRuleTest(adapterConfiguration,
+ 'cypress/fixtures/connect/removeDuplicateRules/expected.csv',
+ false);
+ });
});
diff --git a/ui/cypress/tests/adapter/valueRules.smoke.ts b/ui/cypress/tests/adapter/valueRules.smoke.ts
index c73b578..e4c8f1e 100644
--- a/ui/cypress/tests/adapter/valueRules.smoke.ts
+++ b/ui/cypress/tests/adapter/valueRules.smoke.ts
@@ -42,6 +42,8 @@ describe('Connect value rule transformations', () => {
'Degree Celsius',
'Degree Fahrenheit');
+ ConnectEventSchemaUtils.finishEventSchemaConfiguration();
+
ConnectUtils.tearDownPreprocessingRuleTest(adapterConfiguration,
'cypress/fixtures/connect/valueRules/expected.csv',
false);
diff --git a/ui/src/app/connect/components/start-adapter-configuration/start-adapter-configuration.component.html b/ui/src/app/connect/components/start-adapter-configuration/start-adapter-configuration.component.html
index 1c063ea..8f02966 100644
--- a/ui/src/app/connect/components/start-adapter-configuration/start-adapter-configuration.component.html
+++ b/ui/src/app/connect/components/start-adapter-configuration/start-adapter-configuration.component.html
@@ -45,22 +45,26 @@
<mat-checkbox [(ngModel)]="removeDuplicates"
- [ngModelOptions]="{standalone: true}">Remove Duplicates
+ [ngModelOptions]="{standalone: true}"
+ data-cy="connect-remove-duplicates-box">Remove Duplicates
</mat-checkbox>
<mat-form-field *ngIf="removeDuplicates" color="accent">
<input matInput id="input-removeDuplicatesTime"
[ngModelOptions]="{standalone: true}" placeholder="Remove Duplicates Time Window"
- [(ngModel)]="removeDuplicatesTime">
+ [(ngModel)]="removeDuplicatesTime"
+ data-cy="connect-remove-duplicates-input">
</mat-form-field>
<mat-checkbox [(ngModel)]="eventRateReduction"
[ngModelOptions]="{standalone: true}"
- matTooltip="Send maximum one event in the specified time window">Reduce the event rate
+ matTooltip="Send maximum one event in the specified time window"
+ data-cy="connect-reduce-event-rate-box">Reduce the event rate
</mat-checkbox>
<mat-form-field *ngIf="eventRateReduction" color="accent">
<input type="number" matInput id="input-evenRateTime"
[ngModelOptions]="{standalone: true}" [(ngModel)]="eventRateTime"
- placeholder="Time Window (Milliseconds)" matTooltipPosition="above">
+ placeholder="Time Window (Milliseconds)" matTooltipPosition="above"
+ data-cy="connect-reduce-event-input">
</mat-form-field>
<mat-form-field *ngIf="eventRateReduction" color="accent">
<mat-label>Event Aggregation</mat-label>
diff --git a/ui/src/app/connect/components/start-adapter-configuration/start-adapter-configuration.component.ts b/ui/src/app/connect/components/start-adapter-configuration/start-adapter-configuration.component.ts
index 21ed441..0a37a08 100644
--- a/ui/src/app/connect/components/start-adapter-configuration/start-adapter-configuration.component.ts
+++ b/ui/src/app/connect/components/start-adapter-configuration/start-adapter-configuration.component.ts
@@ -123,7 +123,7 @@ export class StartAdapterConfigurationComponent implements OnInit {
if (this.eventRateReduction) {
const eventRate: EventRateTransformationRuleDescription = new EventRateTransformationRuleDescription();
eventRate['@class'] = 'org.apache.streampipes.model.connect.rules.stream.EventRateTransformationRuleDescription';
- eventRate.aggregationTimeWindow = this.eventRateMode as any;
+ eventRate.aggregationTimeWindow = this.eventRateTime;
eventRate.aggregationType = this.eventRateMode;
this.adapterDescription.rules.push(eventRate);
}