You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@streampipes.apache.org by ze...@apache.org on 2022/08/03 12:31:07 UTC

[incubator-streampipes] branch dev updated: [hotfix] Fix cypress count test

This is an automated email from the ASF dual-hosted git repository.

zehnder pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-streampipes.git


The following commit(s) were added to refs/heads/dev by this push:
     new a17570220 [hotfix] Fix cypress count test
a17570220 is described below

commit a1757022006a32088bb8de4dc64d0e1d111b231c
Author: Philipp Zehnder <ze...@fzi.de>
AuthorDate: Wed Aug 3 14:30:58 2022 +0200

    [hotfix] Fix cypress count test
---
 .../processors/siddhi/count/CountAggregation.java         | 15 ++++++++++++---
 .../strings.en                                            |  3 +++
 .../sinks/internal/jvm/datalake/DataLakeSink.java         | 10 ++++++----
 .../filters-siddhi/count1/description.json                |  5 +++++
 .../pipelineElement/filters-siddhi/count1/input.csv       |  2 +-
 ui/cypress/support/utils/PipelineUtils.ts                 |  4 ++--
 6 files changed, 29 insertions(+), 10 deletions(-)

diff --git a/streampipes-extensions/streampipes-processors-filters-siddhi/src/main/java/org/apache/streampipes/processors/siddhi/count/CountAggregation.java b/streampipes-extensions/streampipes-processors-filters-siddhi/src/main/java/org/apache/streampipes/processors/siddhi/count/CountAggregation.java
index 1842a588c..9bf37f50e 100644
--- a/streampipes-extensions/streampipes-processors-filters-siddhi/src/main/java/org/apache/streampipes/processors/siddhi/count/CountAggregation.java
+++ b/streampipes-extensions/streampipes-processors-filters-siddhi/src/main/java/org/apache/streampipes/processors/siddhi/count/CountAggregation.java
@@ -40,6 +40,7 @@ import org.apache.streampipes.wrapper.standalone.ProcessorParams;
 public class CountAggregation extends StreamPipesSiddhiProcessor {
 
   private static final String TIME_WINDOW_KEY = "time-window";
+  private static final String TIMESTAMP_MAPPING_KEY = "timestamp-mapping";
   private static final String SCALE_KEY = "scale";
   private static final String COUNT_MAPPING = "count-mapping";
 
@@ -55,8 +56,14 @@ public class CountAggregation extends StreamPipesSiddhiProcessor {
             .withLocales(Locales.EN)
             .requiredStream(StreamRequirementsBuilder
                     .create()
-                    .requiredPropertyWithUnaryMapping(EpRequirements.anyProperty(),
-                            Labels.withId(COUNT_MAPPING), PropertyScope.DIMENSION_PROPERTY)
+                    .requiredPropertyWithUnaryMapping(
+                      EpRequirements.timestampReq(),
+                      Labels.withId(TIMESTAMP_MAPPING_KEY),
+                      PropertyScope.NONE)
+                    .requiredPropertyWithUnaryMapping(
+                      EpRequirements.anyProperty(),
+                      Labels.withId(COUNT_MAPPING),
+                      PropertyScope.DIMENSION_PROPERTY)
                     .build())
             .outputStrategy(OutputStrategies.fixed(
                     EpProperties.timestampProperty("timestamp"),
@@ -76,12 +83,14 @@ public class CountAggregation extends StreamPipesSiddhiProcessor {
     Integer timeWindowSize = siddhiParams.getParams().extractor().singleValueParameter(TIME_WINDOW_KEY, Integer.class);
     String scale = siddhiParams.getParams().extractor().selectedSingleValueInternalName(SCALE_KEY, String.class);
     String fieldToCount = siddhiParams.getParams().extractor().mappingPropertyValue(COUNT_MAPPING);
+    String timestampField = siddhiParams.getParams().extractor().mappingPropertyValue(TIMESTAMP_MAPPING_KEY);
+
 
     FromClause fromClause = FromClause.create();
     fromClause.add(Expressions.stream(siddhiParams.getInputStreamNames().get(0), Expressions.timeWindow(timeWindowSize, toTimeUnit(scale))));
 
     SelectClause selectClause = SelectClause.create(
-            Expressions.as(Expressions.property("currentTimeMillis()"), "timestamp"),
+            Expressions.as(Expressions.property(timestampField), "timestamp"),
             Expressions.as(Expressions.property(fieldToCount), "value"),
             Expressions.as(Expressions.count(Expressions.property(fieldToCount)), "count"));
 
diff --git a/streampipes-extensions/streampipes-processors-filters-siddhi/src/main/resources/org.apache.streampipes.processors.siddhi.count/strings.en b/streampipes-extensions/streampipes-processors-filters-siddhi/src/main/resources/org.apache.streampipes.processors.siddhi.count/strings.en
index ce1c0938e..e5e690a39 100644
--- a/streampipes-extensions/streampipes-processors-filters-siddhi/src/main/resources/org.apache.streampipes.processors.siddhi.count/strings.en
+++ b/streampipes-extensions/streampipes-processors-filters-siddhi/src/main/resources/org.apache.streampipes.processors.siddhi.count/strings.en
@@ -9,3 +9,6 @@ time-window.description=Size of the time window
 
 scale.title=Time Window Scale
 scale.description=
+
+timestamp-mapping.title=Timestamp Field
+timestamp-mapping.description=The value which contains a timestamp
\ No newline at end of file
diff --git a/streampipes-extensions/streampipes-sinks-internal-jvm/src/main/java/org/apache/streampipes/sinks/internal/jvm/datalake/DataLakeSink.java b/streampipes-extensions/streampipes-sinks-internal-jvm/src/main/java/org/apache/streampipes/sinks/internal/jvm/datalake/DataLakeSink.java
index c268573ae..a62315b90 100644
--- a/streampipes-extensions/streampipes-sinks-internal-jvm/src/main/java/org/apache/streampipes/sinks/internal/jvm/datalake/DataLakeSink.java
+++ b/streampipes-extensions/streampipes-sinks-internal-jvm/src/main/java/org/apache/streampipes/sinks/internal/jvm/datalake/DataLakeSink.java
@@ -53,10 +53,12 @@ public class DataLakeSink extends StreamPipesDataSink {
           .withLocales(Locales.EN)
           .withAssets(Assets.DOCUMENTATION, Assets.ICON)
           .category(DataSinkType.INTERNAL)
-          .requiredStream(StreamRequirementsBuilder.create().requiredPropertyWithUnaryMapping(
-            EpRequirements.timestampReq(),
-            Labels.withId(TIMESTAMP_MAPPING_KEY),
-            PropertyScope.NONE).build())
+          .requiredStream(StreamRequirementsBuilder.create()
+            .requiredPropertyWithUnaryMapping(
+                EpRequirements.timestampReq(),
+                Labels.withId(TIMESTAMP_MAPPING_KEY),
+                PropertyScope.NONE)
+                .build())
           .requiredTextParameter(Labels.withId(DATABASE_MEASUREMENT_KEY))
           .build();
     }
diff --git a/ui/cypress/fixtures/pipelineElement/filters-siddhi/count1/description.json b/ui/cypress/fixtures/pipelineElement/filters-siddhi/count1/description.json
index 8c17a3893..d7ff167c2 100644
--- a/ui/cypress/fixtures/pipelineElement/filters-siddhi/count1/description.json
+++ b/ui/cypress/fixtures/pipelineElement/filters-siddhi/count1/description.json
@@ -7,6 +7,11 @@
       "selector": "count-mapping",
       "value": "text"
     },
+    {
+      "type": "drop-down",
+      "selector": "timestamp-mapping",
+      "value": "timestamp"
+    },
     {
       "type": "radio",
       "selector": "scale",
diff --git a/ui/cypress/fixtures/pipelineElement/filters-siddhi/count1/input.csv b/ui/cypress/fixtures/pipelineElement/filters-siddhi/count1/input.csv
index a9b2ad7a5..c9d251c6c 100644
--- a/ui/cypress/fixtures/pipelineElement/filters-siddhi/count1/input.csv
+++ b/ui/cypress/fixtures/pipelineElement/filters-siddhi/count1/input.csv
@@ -1,4 +1,4 @@
 timestamp;text
 1623871499055;hello
 1623871503078;world
-1623871503079;hello
+1623871504079;hello
diff --git a/ui/cypress/support/utils/PipelineUtils.ts b/ui/cypress/support/utils/PipelineUtils.ts
index 0bd904dc3..6e2d18e39 100644
--- a/ui/cypress/support/utils/PipelineUtils.ts
+++ b/ui/cypress/support/utils/PipelineUtils.ts
@@ -92,8 +92,8 @@ export class PipelineUtils {
     cy.dataCy('sp-editor-pipeline-name').type(pipelineInput.pipelineName);
     cy.dataCy('sp-editor-checkbox-start-immediately').children().click();
     cy.dataCy('sp-editor-save').click();
-    cy.dataCy('sp-pipeline-started-dialog', { timeout: 10000 }).should('be.visible');
-    cy.dataCy('sp-pipeline-dialog-close', { timeout: 10000 }).click();
+    cy.dataCy('sp-pipeline-started-dialog', { timeout: 15000 }).should('be.visible');
+    cy.dataCy('sp-pipeline-dialog-close', { timeout: 15000 }).click();
   }