You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@streampipes.apache.org by ri...@apache.org on 2024/03/20 21:05:52 UTC

(streampipes) branch return-timestamp-influxdb created (now ebd7324fa8)

This is an automated email from the ASF dual-hosted git repository.

riemer pushed a change to branch return-timestamp-influxdb
in repository https://gitbox.apache.org/repos/asf/streampipes.git


      at ebd7324fa8 feat: Improve Influx query handling

This branch includes the following new commits:

     new ebd7324fa8 feat: Improve Influx query handling

The 1 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.



(streampipes) 01/01: feat: Improve Influx query handling

Posted by ri...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

riemer pushed a commit to branch return-timestamp-influxdb
in repository https://gitbox.apache.org/repos/asf/streampipes.git

commit ebd7324fa8345fdb01a885a8344a72b88e02ef49
Author: Dominik Riemer <do...@gmail.com>
AuthorDate: Wed Mar 20 22:02:07 2024 +0100

    feat: Improve Influx query handling
---
 .../influx/DataExplorerInfluxQueryExecutor.java    |  3 +-
 .../param/ProvidedRestQueryParams.java             |  4 ++
 .../query/DataExplorerQueryExecutor.java           |  1 +
 .../query/StreamedQueryResultProvider.java         | 19 +++-----
 .../query/writer/item/ItemGenerator.java           | 10 +++--
 .../streampipes/dataexplorer/utils/TimeParser.java | 50 ----------------------
 6 files changed, 20 insertions(+), 67 deletions(-)

diff --git a/streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/influx/DataExplorerInfluxQueryExecutor.java b/streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/influx/DataExplorerInfluxQueryExecutor.java
index 01194fc259..3d97a5110a 100644
--- a/streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/influx/DataExplorerInfluxQueryExecutor.java
+++ b/streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/influx/DataExplorerInfluxQueryExecutor.java
@@ -33,6 +33,7 @@ import org.influxdb.dto.QueryResult;
 
 import java.util.ArrayList;
 import java.util.List;
+import java.util.concurrent.TimeUnit;
 
 public class DataExplorerInfluxQueryExecutor extends DataExplorerQueryExecutor<Query, QueryResult> {
 
@@ -96,7 +97,7 @@ public class DataExplorerInfluxQueryExecutor extends DataExplorerQueryExecutor<Q
   @Override
   protected QueryResult executeQuery(Query query) {
     try (final InfluxDB influxDB = InfluxClientProvider.getInfluxDBClient()) {
-      return influxDB.query(query);
+      return influxDB.query(query, TimeUnit.MILLISECONDS);
     }
   }
 
diff --git a/streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/param/ProvidedRestQueryParams.java b/streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/param/ProvidedRestQueryParams.java
index 308bfba3c4..d703eb934b 100644
--- a/streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/param/ProvidedRestQueryParams.java
+++ b/streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/param/ProvidedRestQueryParams.java
@@ -65,6 +65,10 @@ public class ProvidedRestQueryParams {
     this.providedParams.put(key, value);
   }
 
+  public void update(String key, long value) {
+    update(key, String.valueOf(value));
+  }
+
   public void update(String key, Integer value) {
     update(key, String.valueOf(value));
   }
diff --git a/streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/query/DataExplorerQueryExecutor.java b/streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/query/DataExplorerQueryExecutor.java
index c28c949780..b5d187fa8a 100644
--- a/streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/query/DataExplorerQueryExecutor.java
+++ b/streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/query/DataExplorerQueryExecutor.java
@@ -94,6 +94,7 @@ public abstract class DataExplorerQueryExecutor<X, W> {
 
   public SpQueryResult executeQuery(X query,
                                     boolean ignoreMissingValues) {
+    LOG.info("Data Lake Query " + asQueryString(query));
     if (LOG.isDebugEnabled()) {
       LOG.debug("Data Lake Query " + asQueryString(query));
     }
diff --git a/streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/query/StreamedQueryResultProvider.java b/streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/query/StreamedQueryResultProvider.java
index ee356e9368..3921200e84 100644
--- a/streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/query/StreamedQueryResultProvider.java
+++ b/streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/query/StreamedQueryResultProvider.java
@@ -32,11 +32,10 @@ import java.util.List;
 import java.util.Optional;
 
 import static org.apache.streampipes.dataexplorer.param.SupportedRestQueryParams.QP_LIMIT;
-import static org.apache.streampipes.dataexplorer.param.SupportedRestQueryParams.QP_PAGE;
 
 public class StreamedQueryResultProvider extends QueryResultProvider {
 
-  private static final int MAX_RESULTS_PER_QUERY = 500000;
+  private static final int MAX_RESULTS_PER_QUERY = 200000;
   private static final String TIME_FIELD = "time";
 
   private final OutputFormat format;
@@ -61,36 +60,32 @@ public class StreamedQueryResultProvider extends QueryResultProvider {
     var measurement = findByMeasurementName(queryParams.getMeasurementId()).get();
 
     SpQueryResult dataResult;
-    int page = 0;
-    if (queryParams.has(QP_PAGE)) {
-      page = queryParams.getAsInt(QP_PAGE);
-    }
 
     boolean isFirstDataItem = true;
     configuredWriter.beforeFirstItem(outputStream);
     do {
-      queryParams.update(SupportedRestQueryParams.QP_PAGE, String.valueOf(page));
       dataResult = getData();
 
+      long lastTimestamp = 0;
       if (dataResult.getTotal() > 0) {
         changeTimestampHeader(measurement, dataResult);
         var columns = dataResult.getHeaders();
         for (List<Object> row : dataResult.getAllDataSeries().get(0).getRows()) {
           configuredWriter.writeItem(outputStream, row, columns, isFirstDataItem);
           isFirstDataItem = false;
+          lastTimestamp = ((Double) row.get(0)).longValue();
         }
       }
-      page++;
-    } while (queryNextPage(dataResult.getTotal(), usesLimit, limit, page));
+      queryParams.update(SupportedRestQueryParams.QP_START_DATE, lastTimestamp + 1);
+    } while (queryNextPage(dataResult.getTotal(), usesLimit, limit));
     configuredWriter.afterLastItem(outputStream);
   }
 
   private boolean queryNextPage(int lastResultsCount,
                                 boolean usesLimit,
-                                int limit,
-                                int lastPage) {
+                                int limit) {
     if (usesLimit) {
-      return !(limit <= (lastPage) * MAX_RESULTS_PER_QUERY);
+      return lastResultsCount >= limit;
     } else {
       return lastResultsCount > 0;
     }
diff --git a/streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/query/writer/item/ItemGenerator.java b/streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/query/writer/item/ItemGenerator.java
index 7b3aa447fe..31e9eb61d5 100644
--- a/streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/query/writer/item/ItemGenerator.java
+++ b/streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/query/writer/item/ItemGenerator.java
@@ -19,8 +19,6 @@
 
 package org.apache.streampipes.dataexplorer.query.writer.item;
 
-import org.apache.streampipes.dataexplorer.utils.TimeParser;
-
 import java.util.List;
 import java.util.StringJoiner;
 
@@ -39,9 +37,9 @@ public abstract class ItemGenerator {
     StringJoiner joiner = new StringJoiner(separator);
 
     for (int i = 0; i < row.size(); i++) {
-      var value = row.get(i);
+      Object value = row.get(i);
       if (i == 0) {
-        value = TimeParser.parseTime(value.toString());
+        value = getTimestampValue((Double) row.get(i));
       }
       joiner.add(makeItemString(columns.get(i), value));
     }
@@ -54,4 +52,8 @@ public abstract class ItemGenerator {
 
   protected abstract String finalizeItem(String item);
 
+  private long getTimestampValue(Double value) {
+    return value.longValue();
+  }
+
 }
diff --git a/streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/utils/TimeParser.java b/streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/utils/TimeParser.java
deleted file mode 100644
index 135a5d89df..0000000000
--- a/streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/utils/TimeParser.java
+++ /dev/null
@@ -1,50 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-
-package org.apache.streampipes.dataexplorer.utils;
-
-import java.time.Instant;
-import java.time.LocalDate;
-import java.time.LocalDateTime;
-import java.time.ZonedDateTime;
-import java.time.format.DateTimeFormatter;
-import java.time.format.DateTimeFormatterBuilder;
-import java.time.temporal.ChronoField;
-import java.time.temporal.ChronoUnit;
-import java.time.temporal.TemporalAccessor;
-
-public class TimeParser {
-
-  private static final DateTimeFormatter formatter = new DateTimeFormatterBuilder()
-      .appendPattern(
-          "uuuu[-MM[-dd]]['T'HH[:mm[:ss[.SSSSSSSSS][.SSSSSSSS][.SSSSSSS]"
-              + "[.SSSSSS][.SSSSS][.SSSS][.SSS][.SS][.S]]]][XXX]")
-      .parseDefaulting(ChronoField.NANO_OF_SECOND, 0)
-      .parseDefaulting(ChronoField.OFFSET_SECONDS, 0)
-      .toFormatter();
-
-  public static Long parseTime(String v) {
-    TemporalAccessor temporalAccessor = formatter.parseBest(v,
-        ZonedDateTime::from,
-        LocalDateTime::from,
-        LocalDate::from);
-
-    Instant instant = Instant.from(temporalAccessor);
-    return Instant.EPOCH.until(instant, ChronoUnit.MILLIS);
-  }
-}