You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@streampipes.apache.org by ri...@apache.org on 2021/08/22 16:53:50 UTC

[incubator-streampipes] branch dev updated (eb5cb6c -> 12000b8)

This is an automated email from the ASF dual-hosted git repository.

riemer pushed a change to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-streampipes.git.


    from eb5cb6c  Merge branch 'dev' of github.com:apache/incubator-streampipes into dev
     new 656e67e  [STREAMPIPES-410] Improve performance of auto-aggregation
     new 12000b8  [STREAMPIPES-410] Refactor line chart widget

The 2 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../dataexplorer/v4/AutoAggregationHandler.java    | 56 ++++++++++++++--------
 .../v4/params/SelectFromStatementParams.java       |  9 ++--
 .../v4/query/elements/SelectFromStatement.java     |  2 +-
 .../dataexplorer/v4/template/QueryTemplatesV4.java | 26 +++++-----
 .../v4/utils/DataLakeManagementUtils.java          |  2 +-
 .../line-chart/line-chart-widget.component.ts      | 13 +++--
 6 files changed, 67 insertions(+), 41 deletions(-)

[incubator-streampipes] 01/02: [STREAMPIPES-410] Improve performance of auto-aggregation

Posted by ri...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

riemer pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-streampipes.git

commit 656e67e5d0f669ec31ba0b04e69ed21be2c2b498
Author: Dominik Riemer <ri...@fzi.de>
AuthorDate: Sun Aug 22 18:48:58 2021 +0200

    [STREAMPIPES-410] Improve performance of auto-aggregation
---
 .../dataexplorer/v4/AutoAggregationHandler.java    | 56 ++++++++++++++--------
 .../v4/params/SelectFromStatementParams.java       |  9 ++--
 .../v4/query/elements/SelectFromStatement.java     |  2 +-
 .../dataexplorer/v4/template/QueryTemplatesV4.java | 26 +++++-----
 .../v4/utils/DataLakeManagementUtils.java          |  2 +-
 5 files changed, 59 insertions(+), 36 deletions(-)

diff --git a/streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/v4/AutoAggregationHandler.java b/streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/v4/AutoAggregationHandler.java
index 4a2ecb6..6a69dae 100644
--- a/streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/v4/AutoAggregationHandler.java
+++ b/streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/v4/AutoAggregationHandler.java
@@ -48,21 +48,26 @@ public class AutoAggregationHandler {
 
   public ProvidedQueryParams makeAutoAggregationQueryParams() throws IllegalArgumentException {
     checkAllArgumentsPresent();
-    Integer count = getCount();
+    try {
+    DataResult newest = getSingleRecord(Order.DESC);
+    DataResult oldest = getSingleRecord(Order.ASC);
+    String sampleField = getSampleField(newest);
+    Integer count = getCount(sampleField);
     if (count <= MAX_RETURN_LIMIT) {
       LOG.debug("Auto-Aggregation disabled as {} results <= max return limit {}", count, MAX_RETURN_LIMIT);
       return disableAutoAgg(this.queryParams);
     } else {
       LOG.debug("Performing auto-aggregation");
-      try {
-        int aggValue = getAggregationValue();
+
+        int aggValue = getAggregationValue(newest, oldest);
         LOG.debug("Setting auto-aggregation value to {} ms", aggValue);
         queryParams.update(QP_TIME_INTERVAL, aggValue + "ms");
-      } catch (ParseException e) {
-        e.printStackTrace();
-      }
       return disableAutoAgg(queryParams);
     }
+    } catch (ParseException e) {
+      e.printStackTrace();
+    }
+    return null;
   }
 
   private void checkAllArgumentsPresent() throws IllegalArgumentException {
@@ -76,11 +81,12 @@ public class AutoAggregationHandler {
     return params;
   }
 
-  public Integer getCount() {
+  public Integer getCount(String fieldName) {
     ProvidedQueryParams countParams = disableAutoAgg(new ProvidedQueryParams(queryParams));
     countParams.remove(QP_TIME_INTERVAL);
     countParams.remove(QP_AGGREGATION_FUNCTION);
     countParams.update(QP_COUNT_ONLY, true);
+    countParams.update(QP_COLUMNS, fieldName);
 
     DataResult result = new DataLakeManagementV4().getData(countParams);
     return result.getTotal() > 0 ? ((Double) result.getRows().get(0).get(1)).intValue() : 0;
@@ -90,29 +96,41 @@ public class AutoAggregationHandler {
     return dataLakeManagement.getData(params);
   }
 
-  private int getAggregationValue() throws ParseException {
-    long timerange = getNewestTimestamp() - getOldestTimestamp();
+  private int getAggregationValue(DataResult newest, DataResult oldest) throws ParseException {
+    long timerange = extractTimestamp(newest) - extractTimestamp(oldest);
     double v = timerange / MAX_RETURN_LIMIT;
     return Double.valueOf(v).intValue();
   }
 
-  private long getNewestTimestamp() throws ParseException {
-    return getTimestamp(Order.DESC);
-  }
-
-  private long getOldestTimestamp() throws ParseException {
-    return getTimestamp(Order.ASC);
-  }
+//  private long getNewestTimestamp(DataResult result) throws ParseException {
+//    return extractTimestamp(result);
+//  }
+//
+//  private long getOldestTimestamp(DataResult result) throws ParseException {
+//    return extractTimestamp(result);
+//  }
 
-  private long getTimestamp(Order order) throws ParseException {
+  private DataResult getSingleRecord(Order order) throws ParseException {
     ProvidedQueryParams singleEvent = disableAutoAgg(new ProvidedQueryParams(queryParams));
     singleEvent.remove(QP_AGGREGATION_FUNCTION);
     singleEvent.update(QP_LIMIT, 1);
     singleEvent.update(QP_ORDER, order.toValue());
 
-    DataResult result = fireQuery(singleEvent);
-    int timestampIndex = result.getHeaders().indexOf(TIMESTAMP_FIELD);
+    return fireQuery(singleEvent);
+
+  }
+
+  private String getSampleField(DataResult result) {
+    for (String column : result.getHeaders()) {
+      if (!column.equals(TIMESTAMP_FIELD)) {
+        return column;
+      }
+    }
+    throw new IllegalArgumentException("No columns present");
+  }
 
+  private long extractTimestamp(DataResult result) throws ParseException {
+    int timestampIndex = result.getHeaders().indexOf(TIMESTAMP_FIELD);
     return tryParseDate(result.getRows().get(0).get(timestampIndex).toString()).getTime();
   }
 
diff --git a/streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/v4/params/SelectFromStatementParams.java b/streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/v4/params/SelectFromStatementParams.java
index f702d15..6790ece 100644
--- a/streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/v4/params/SelectFromStatementParams.java
+++ b/streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/v4/params/SelectFromStatementParams.java
@@ -22,8 +22,8 @@ import javax.annotation.Nullable;
 
 public class SelectFromStatementParams extends QueryParamsV4 {
 
-    private final String selectedColumns;
-    private final String aggregationFunction;
+    private String selectedColumns;
+    private String aggregationFunction;
     private boolean countOnly = false;
 
     public static SelectFromStatementParams from(String measurementID,
@@ -33,8 +33,9 @@ public class SelectFromStatementParams extends QueryParamsV4 {
     }
 
     public static SelectFromStatementParams from(String measurementId,
+                                                 String columns,
                                                  boolean countOnly) {
-        return new SelectFromStatementParams(measurementId, countOnly);
+        return new SelectFromStatementParams(measurementId, columns, countOnly);
     }
 
     public SelectFromStatementParams(String measurementID) {
@@ -44,8 +45,10 @@ public class SelectFromStatementParams extends QueryParamsV4 {
     }
 
     public SelectFromStatementParams(String measurementId,
+                                     String columns,
                                      boolean countOnly) {
         this(measurementId);
+        this.selectedColumns = columns;
         this.countOnly = countOnly;
     }
 
diff --git a/streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/v4/query/elements/SelectFromStatement.java b/streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/v4/query/elements/SelectFromStatement.java
index 15bc05d..abbe5a5 100644
--- a/streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/v4/query/elements/SelectFromStatement.java
+++ b/streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/v4/query/elements/SelectFromStatement.java
@@ -29,7 +29,7 @@ public class SelectFromStatement extends QueryElement<SelectFromStatementParams>
     @Override
     protected String buildStatement(SelectFromStatementParams selectFromStatementParams) {
         if (selectFromStatementParams.isCountOnly()) {
-            return QueryTemplatesV4.selectCountFrom(selectFromStatementParams.getIndex());
+            return QueryTemplatesV4.selectCountFrom(selectFromStatementParams.getIndex(), selectFromStatementParams.getSelectedColumns());
         } else if (selectFromStatementParams.getAggregationFunction() == null) {
             return QueryTemplatesV4.selectFrom(selectFromStatementParams.getIndex(), selectFromStatementParams.getSelectedColumns());
         } else {
diff --git a/streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/v4/template/QueryTemplatesV4.java b/streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/v4/template/QueryTemplatesV4.java
index 9761c0b..5840736 100644
--- a/streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/v4/template/QueryTemplatesV4.java
+++ b/streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/v4/template/QueryTemplatesV4.java
@@ -17,30 +17,32 @@
  */
 package org.apache.streampipes.dataexplorer.v4.template;
 
+import java.util.StringJoiner;
+
 public class QueryTemplatesV4 {
 
     public static String selectFrom(String index, String columns) {
         return "SELECT " + columns + " FROM " + index;
     }
 
-    public static String selectCountFrom(String index) {
-        return "SELECT COUNT(*) FROM " + index;
+    public static String selectCountFrom(String index, String selectedColumns) {
+        return selectAggregationFrom(index, selectedColumns, "COUNT");
     }
 
     public static String selectAggregationFrom(String index, String columns, String aggregationFunction) {
         String[] cols = columns.split(",");
-        StringBuilder statement = new StringBuilder(aggregationFunction + "(" + cols[0] + ")");
-
-        for (int i = 1; i < cols.length; i++) {
-            statement
-                    .append(", ")
-                    .append(aggregationFunction)
-                    .append("(")
-                    .append(cols[i])
-                    .append(")");
+        StringJoiner joiner = new StringJoiner(", ");
+        //StringBuilder statement = new StringBuilder(aggregationFunction + "(" + cols[0] + ")");
+
+        for (int i = 0; i < cols.length; i++) {
+            String builder = aggregationFunction +
+                    "(" +
+                    cols[i] +
+                    ")" + " AS " + cols[i];
+            joiner.add(builder);
         }
 
-        return "SELECT " + statement + " FROM " + index;
+        return "SELECT " + joiner + " FROM " + index;
     }
 
     public static String deleteFrom(String index) {
diff --git a/streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/v4/utils/DataLakeManagementUtils.java b/streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/v4/utils/DataLakeManagementUtils.java
index c8807e8..f084e49 100644
--- a/streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/v4/utils/DataLakeManagementUtils.java
+++ b/streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/v4/utils/DataLakeManagementUtils.java
@@ -45,7 +45,7 @@ public class DataLakeManagementUtils {
         String measurementId = params.getMeasurementId();
 
         if (params.has(QP_COUNT_ONLY) && params.getAsBoolean(QP_COUNT_ONLY)) {
-            queryParts.put(SELECT_FROM, SelectFromStatementParams.from(measurementId, true));
+            queryParts.put(SELECT_FROM, SelectFromStatementParams.from(measurementId, params.getAsString(QP_COLUMNS), true));
         } else {
             queryParts.put(SELECT_FROM, SelectFromStatementParams.from(measurementId, params.getAsString(QP_COLUMNS), params.getAsString(QP_AGGREGATION_FUNCTION)));
         }

[incubator-streampipes] 02/02: [STREAMPIPES-410] Refactor line chart widget

Posted by ri...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

riemer pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-streampipes.git

commit 12000b8ea4e7ac0b59cce7fb62130fc500dc38ee
Author: Dominik Riemer <ri...@fzi.de>
AuthorDate: Sun Aug 22 18:49:28 2021 +0200

    [STREAMPIPES-410] Refactor line chart widget
---
 .../widgets/line-chart/line-chart-widget.component.ts       | 13 ++++++++-----
 1 file changed, 8 insertions(+), 5 deletions(-)

diff --git a/ui/src/app/data-explorer/components/widgets/line-chart/line-chart-widget.component.ts b/ui/src/app/data-explorer/components/widgets/line-chart/line-chart-widget.component.ts
index 36a0aec..8e10045 100644
--- a/ui/src/app/data-explorer/components/widgets/line-chart/line-chart-widget.component.ts
+++ b/ui/src/app/data-explorer/components/widgets/line-chart/line-chart-widget.component.ts
@@ -140,7 +140,6 @@ export class LineChartWidgetComponent extends BaseDataExplorerWidget<LineChartWi
     this.updateAppearance();
 
     super.ngOnInit();
-    this.updateData();
     this.resizeService.resizeSubject.subscribe(info => {
       if (info.gridsterItem.id === this.gridsterItem.id) {
         setTimeout(() => {
@@ -308,6 +307,8 @@ export class LineChartWidgetComponent extends BaseDataExplorerWidget<LineChartWi
   }
 
   transformData(data: DataResult, xKey: string): DataResult {
+    console.log(xKey);
+    console.log(data);
     const columnsContainingNumbers = [];
     const columnsContainingStrings = [];
 
@@ -323,8 +324,8 @@ export class LineChartWidgetComponent extends BaseDataExplorerWidget<LineChartWi
     });
 
     // Get key of timestamp column for x axis
-    const indexXkey = data.headers.findIndex(headerName => headerName === this.dataExplorerWidget.dataConfig.xKey);
-
+    //const indexXkey = data.headers.findIndex(headerName => headerName === this.dataExplorerWidget.dataConfig.xKey);
+    const indexXkey = 0;
 
     const tmpLineChartTraces: any[] = [];
 
@@ -687,14 +688,16 @@ export class LineChartWidgetComponent extends BaseDataExplorerWidget<LineChartWi
   buildQuery(): DatalakeQueryParameters {
     return DatalakeQueryParameterBuilder
         .create(this.timeSettings.startTime, this.timeSettings.endTime)
-        .withAutoAggregation('FIRST')
+        .withAutoAggregation('MEAN')
+        .withColumnFilter(this.dataExplorerWidget.dataConfig.selectedLineChartProperties.map(ep => ep.runtimeName))
         .build();
   }
 
   buildAggregationQuery(): DatalakeQueryParameters {
     return DatalakeQueryParameterBuilder
         .create(this.timeSettings.startTime, this.timeSettings.endTime)
-        .withPaging(1, 100)
+        .withPaging(1, 2000)
+        .withColumnFilter(this.dataExplorerWidget.dataConfig.selectedLineChartProperties.map(ep => ep.runtimeName))
         .withGrouping(undefined, undefined,
             this.dataExplorerWidget.dataConfig.aggregationTimeUnit, this.dataExplorerWidget.dataConfig.aggregationValue)
         .build();