You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@streampipes.apache.org by ze...@apache.org on 2022/08/03 12:31:07 UTC
[incubator-streampipes] branch dev updated: [hotfix] Fix cypress count test
This is an automated email from the ASF dual-hosted git repository.
zehnder pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-streampipes.git
The following commit(s) were added to refs/heads/dev by this push:
new a17570220 [hotfix] Fix cypress count test
a17570220 is described below
commit a1757022006a32088bb8de4dc64d0e1d111b231c
Author: Philipp Zehnder <ze...@fzi.de>
AuthorDate: Wed Aug 3 14:30:58 2022 +0200
[hotfix] Fix cypress count test
---
.../processors/siddhi/count/CountAggregation.java | 15 ++++++++++++---
.../strings.en | 3 +++
.../sinks/internal/jvm/datalake/DataLakeSink.java | 10 ++++++----
.../filters-siddhi/count1/description.json | 5 +++++
.../pipelineElement/filters-siddhi/count1/input.csv | 2 +-
ui/cypress/support/utils/PipelineUtils.ts | 4 ++--
6 files changed, 29 insertions(+), 10 deletions(-)
diff --git a/streampipes-extensions/streampipes-processors-filters-siddhi/src/main/java/org/apache/streampipes/processors/siddhi/count/CountAggregation.java b/streampipes-extensions/streampipes-processors-filters-siddhi/src/main/java/org/apache/streampipes/processors/siddhi/count/CountAggregation.java
index 1842a588c..9bf37f50e 100644
--- a/streampipes-extensions/streampipes-processors-filters-siddhi/src/main/java/org/apache/streampipes/processors/siddhi/count/CountAggregation.java
+++ b/streampipes-extensions/streampipes-processors-filters-siddhi/src/main/java/org/apache/streampipes/processors/siddhi/count/CountAggregation.java
@@ -40,6 +40,7 @@ import org.apache.streampipes.wrapper.standalone.ProcessorParams;
public class CountAggregation extends StreamPipesSiddhiProcessor {
private static final String TIME_WINDOW_KEY = "time-window";
+ private static final String TIMESTAMP_MAPPING_KEY = "timestamp-mapping";
private static final String SCALE_KEY = "scale";
private static final String COUNT_MAPPING = "count-mapping";
@@ -55,8 +56,14 @@ public class CountAggregation extends StreamPipesSiddhiProcessor {
.withLocales(Locales.EN)
.requiredStream(StreamRequirementsBuilder
.create()
- .requiredPropertyWithUnaryMapping(EpRequirements.anyProperty(),
- Labels.withId(COUNT_MAPPING), PropertyScope.DIMENSION_PROPERTY)
+ .requiredPropertyWithUnaryMapping(
+ EpRequirements.timestampReq(),
+ Labels.withId(TIMESTAMP_MAPPING_KEY),
+ PropertyScope.NONE)
+ .requiredPropertyWithUnaryMapping(
+ EpRequirements.anyProperty(),
+ Labels.withId(COUNT_MAPPING),
+ PropertyScope.DIMENSION_PROPERTY)
.build())
.outputStrategy(OutputStrategies.fixed(
EpProperties.timestampProperty("timestamp"),
@@ -76,12 +83,14 @@ public class CountAggregation extends StreamPipesSiddhiProcessor {
Integer timeWindowSize = siddhiParams.getParams().extractor().singleValueParameter(TIME_WINDOW_KEY, Integer.class);
String scale = siddhiParams.getParams().extractor().selectedSingleValueInternalName(SCALE_KEY, String.class);
String fieldToCount = siddhiParams.getParams().extractor().mappingPropertyValue(COUNT_MAPPING);
+ String timestampField = siddhiParams.getParams().extractor().mappingPropertyValue(TIMESTAMP_MAPPING_KEY);
+
FromClause fromClause = FromClause.create();
fromClause.add(Expressions.stream(siddhiParams.getInputStreamNames().get(0), Expressions.timeWindow(timeWindowSize, toTimeUnit(scale))));
SelectClause selectClause = SelectClause.create(
- Expressions.as(Expressions.property("currentTimeMillis()"), "timestamp"),
+ Expressions.as(Expressions.property(timestampField), "timestamp"),
Expressions.as(Expressions.property(fieldToCount), "value"),
Expressions.as(Expressions.count(Expressions.property(fieldToCount)), "count"));
diff --git a/streampipes-extensions/streampipes-processors-filters-siddhi/src/main/resources/org.apache.streampipes.processors.siddhi.count/strings.en b/streampipes-extensions/streampipes-processors-filters-siddhi/src/main/resources/org.apache.streampipes.processors.siddhi.count/strings.en
index ce1c0938e..e5e690a39 100644
--- a/streampipes-extensions/streampipes-processors-filters-siddhi/src/main/resources/org.apache.streampipes.processors.siddhi.count/strings.en
+++ b/streampipes-extensions/streampipes-processors-filters-siddhi/src/main/resources/org.apache.streampipes.processors.siddhi.count/strings.en
@@ -9,3 +9,6 @@ time-window.description=Size of the time window
scale.title=Time Window Scale
scale.description=
+
+timestamp-mapping.title=Timestamp Field
+timestamp-mapping.description=The value which contains a timestamp
\ No newline at end of file
diff --git a/streampipes-extensions/streampipes-sinks-internal-jvm/src/main/java/org/apache/streampipes/sinks/internal/jvm/datalake/DataLakeSink.java b/streampipes-extensions/streampipes-sinks-internal-jvm/src/main/java/org/apache/streampipes/sinks/internal/jvm/datalake/DataLakeSink.java
index c268573ae..a62315b90 100644
--- a/streampipes-extensions/streampipes-sinks-internal-jvm/src/main/java/org/apache/streampipes/sinks/internal/jvm/datalake/DataLakeSink.java
+++ b/streampipes-extensions/streampipes-sinks-internal-jvm/src/main/java/org/apache/streampipes/sinks/internal/jvm/datalake/DataLakeSink.java
@@ -53,10 +53,12 @@ public class DataLakeSink extends StreamPipesDataSink {
.withLocales(Locales.EN)
.withAssets(Assets.DOCUMENTATION, Assets.ICON)
.category(DataSinkType.INTERNAL)
- .requiredStream(StreamRequirementsBuilder.create().requiredPropertyWithUnaryMapping(
- EpRequirements.timestampReq(),
- Labels.withId(TIMESTAMP_MAPPING_KEY),
- PropertyScope.NONE).build())
+ .requiredStream(StreamRequirementsBuilder.create()
+ .requiredPropertyWithUnaryMapping(
+ EpRequirements.timestampReq(),
+ Labels.withId(TIMESTAMP_MAPPING_KEY),
+ PropertyScope.NONE)
+ .build())
.requiredTextParameter(Labels.withId(DATABASE_MEASUREMENT_KEY))
.build();
}
diff --git a/ui/cypress/fixtures/pipelineElement/filters-siddhi/count1/description.json b/ui/cypress/fixtures/pipelineElement/filters-siddhi/count1/description.json
index 8c17a3893..d7ff167c2 100644
--- a/ui/cypress/fixtures/pipelineElement/filters-siddhi/count1/description.json
+++ b/ui/cypress/fixtures/pipelineElement/filters-siddhi/count1/description.json
@@ -7,6 +7,11 @@
"selector": "count-mapping",
"value": "text"
},
+ {
+ "type": "drop-down",
+ "selector": "timestamp-mapping",
+ "value": "timestamp"
+ },
{
"type": "radio",
"selector": "scale",
diff --git a/ui/cypress/fixtures/pipelineElement/filters-siddhi/count1/input.csv b/ui/cypress/fixtures/pipelineElement/filters-siddhi/count1/input.csv
index a9b2ad7a5..c9d251c6c 100644
--- a/ui/cypress/fixtures/pipelineElement/filters-siddhi/count1/input.csv
+++ b/ui/cypress/fixtures/pipelineElement/filters-siddhi/count1/input.csv
@@ -1,4 +1,4 @@
timestamp;text
1623871499055;hello
1623871503078;world
-1623871503079;hello
+1623871504079;hello
diff --git a/ui/cypress/support/utils/PipelineUtils.ts b/ui/cypress/support/utils/PipelineUtils.ts
index 0bd904dc3..6e2d18e39 100644
--- a/ui/cypress/support/utils/PipelineUtils.ts
+++ b/ui/cypress/support/utils/PipelineUtils.ts
@@ -92,8 +92,8 @@ export class PipelineUtils {
cy.dataCy('sp-editor-pipeline-name').type(pipelineInput.pipelineName);
cy.dataCy('sp-editor-checkbox-start-immediately').children().click();
cy.dataCy('sp-editor-save').click();
- cy.dataCy('sp-pipeline-started-dialog', { timeout: 10000 }).should('be.visible');
- cy.dataCy('sp-pipeline-dialog-close', { timeout: 10000 }).click();
+ cy.dataCy('sp-pipeline-started-dialog', { timeout: 15000 }).should('be.visible');
+ cy.dataCy('sp-pipeline-dialog-close', { timeout: 15000 }).click();
}