You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by xi...@apache.org on 2021/12/16 08:18:05 UTC

[iotdb] branch pushdownlimit created (now ef3550e)

This is an automated email from the ASF dual-hosted git repository.

xiangweiwei pushed a change to branch pushdownlimit
in repository https://gitbox.apache.org/repos/asf/iotdb.git.


      at ef3550e  push down limit to read task in rawdatasetwithoutValueFilter

This branch includes the following new commits:

     new ef3550e  push down limit to read task in rawdatasetwithoutValueFilter

The 1 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


[iotdb] 01/01: push down limit to read task in rawdatasetwithoutValueFilter

Posted by xi...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

xiangweiwei pushed a commit to branch pushdownlimit
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit ef3550ea2b7703774198f43480a9527405634e6d
Author: Alima777 <wx...@gmail.com>
AuthorDate: Thu Dec 16 16:17:17 2021 +0800

    push down limit to read task in rawdatasetwithoutValueFilter
---
 .../cluster/query/ClusterDataQueryExecutor.java    |  6 +-----
 .../dataset/RawQueryDataSetWithoutValueFilter.java | 25 +++++++++++-----------
 .../UDFRawQueryInputDataSetWithoutValueFilter.java | 11 +++-------
 .../apache/iotdb/db/query/dataset/UDTFDataSet.java |  6 +-----
 .../db/query/executor/RawDataQueryExecutor.java    |  6 +-----
 .../query/udf/core/layer/RawQueryInputLayer.java   |  9 +++-----
 6 files changed, 22 insertions(+), 41 deletions(-)

diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/query/ClusterDataQueryExecutor.java b/cluster/src/main/java/org/apache/iotdb/cluster/query/ClusterDataQueryExecutor.java
index 64e2292..39ef8f1 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/query/ClusterDataQueryExecutor.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/query/ClusterDataQueryExecutor.java
@@ -85,11 +85,7 @@ public class ClusterDataQueryExecutor extends RawDataQueryExecutor {
     try {
       List<ManagedSeriesReader> readersOfSelectedSeries = initMultSeriesReader(context);
       return new RawQueryDataSetWithoutValueFilter(
-          context.getQueryId(),
-          queryPlan.getDeduplicatedPaths(),
-          queryPlan.getDeduplicatedDataTypes(),
-          readersOfSelectedSeries,
-          queryPlan.isAscending());
+          context.getQueryId(), queryPlan, readersOfSelectedSeries);
     } catch (InterruptedException e) {
       Thread.currentThread().interrupt();
       throw new StorageEngineException(e.getMessage());
diff --git a/server/src/main/java/org/apache/iotdb/db/query/dataset/RawQueryDataSetWithoutValueFilter.java b/server/src/main/java/org/apache/iotdb/db/query/dataset/RawQueryDataSetWithoutValueFilter.java
index 52ad327..13e124e 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/dataset/RawQueryDataSetWithoutValueFilter.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/dataset/RawQueryDataSetWithoutValueFilter.java
@@ -22,6 +22,7 @@ package org.apache.iotdb.db.query.dataset;
 import org.apache.iotdb.db.concurrent.WrappedRunnable;
 import org.apache.iotdb.db.metadata.path.AlignedPath;
 import org.apache.iotdb.db.metadata.path.PartialPath;
+import org.apache.iotdb.db.qp.physical.crud.RawDataQueryPlan;
 import org.apache.iotdb.db.query.control.QueryTimeManager;
 import org.apache.iotdb.db.query.pool.QueryTaskPoolManager;
 import org.apache.iotdb.db.query.reader.series.ManagedSeriesReader;
@@ -185,18 +186,21 @@ public class RawQueryDataSetWithoutValueFilter extends QueryDataSet
   /**
    * constructor of EngineDataSetWithoutValueFilter.
    *
-   * @param paths paths in List structure
-   * @param dataTypes time series data type
    * @param readers readers in List(IPointReader) structure
    */
   public RawQueryDataSetWithoutValueFilter(
-      long queryId,
-      List<PartialPath> paths,
-      List<TSDataType> dataTypes,
-      List<ManagedSeriesReader> readers,
-      boolean ascending)
+      long queryId, RawDataQueryPlan queryPlan, List<ManagedSeriesReader> readers)
       throws IOException, InterruptedException {
-    super(new ArrayList<>(paths), dataTypes, ascending);
+    super(
+        new ArrayList<>(queryPlan.getDeduplicatedPaths()),
+        queryPlan.getDeduplicatedDataTypes(),
+        queryPlan.isAscending());
+    this.rowLimit = queryPlan.getRowLimit();
+    this.rowOffset = queryPlan.getRowOffset();
+    if (rowLimit != 0) {
+      batchDataLengthList = new int[readers.size()];
+    }
+
     this.queryId = queryId;
     this.seriesReaderList = readers;
     blockingQueueArray = new BlockingQueue[readers.size()];
@@ -206,16 +210,13 @@ public class RawQueryDataSetWithoutValueFilter extends QueryDataSet
     cachedBatchDataArray = new BatchData[readers.size()];
     noMoreDataInQueueArray = new boolean[readers.size()];
     bufferNum = 0;
-    for (PartialPath path : paths) {
+    for (PartialPath path : queryPlan.getDeduplicatedPaths()) {
       if (path instanceof AlignedPath) {
         bufferNum += ((AlignedPath) path).getMeasurementList().size();
       } else {
         bufferNum += 1;
       }
     }
-    if (rowLimit != 0) {
-      batchDataLengthList = new int[readers.size()];
-    }
     init();
   }
 
diff --git a/server/src/main/java/org/apache/iotdb/db/query/dataset/UDFRawQueryInputDataSetWithoutValueFilter.java b/server/src/main/java/org/apache/iotdb/db/query/dataset/UDFRawQueryInputDataSetWithoutValueFilter.java
index f4e4cf8..e50078d 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/dataset/UDFRawQueryInputDataSetWithoutValueFilter.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/dataset/UDFRawQueryInputDataSetWithoutValueFilter.java
@@ -19,9 +19,8 @@
 
 package org.apache.iotdb.db.query.dataset;
 
-import org.apache.iotdb.db.metadata.path.PartialPath;
+import org.apache.iotdb.db.qp.physical.crud.UDTFPlan;
 import org.apache.iotdb.db.query.reader.series.ManagedSeriesReader;
-import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
 
 import java.io.IOException;
 import java.util.List;
@@ -30,13 +29,9 @@ public class UDFRawQueryInputDataSetWithoutValueFilter extends RawQueryDataSetWi
     implements IUDFInputDataSet {
 
   public UDFRawQueryInputDataSetWithoutValueFilter(
-      long queryId,
-      List<PartialPath> paths,
-      List<TSDataType> dataTypes,
-      List<ManagedSeriesReader> readers,
-      boolean ascending)
+      long queryId, UDTFPlan queryPlan, List<ManagedSeriesReader> readers)
       throws IOException, InterruptedException {
-    super(queryId, paths, dataTypes, readers, ascending);
+    super(queryId, queryPlan, readers);
   }
 
   @Override
diff --git a/server/src/main/java/org/apache/iotdb/db/query/dataset/UDTFDataSet.java b/server/src/main/java/org/apache/iotdb/db/query/dataset/UDTFDataSet.java
index db533db..52ca43f 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/dataset/UDTFDataSet.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/dataset/UDTFDataSet.java
@@ -94,11 +94,7 @@ public abstract class UDTFDataSet extends QueryDataSet {
     this.udtfPlan = udtfPlan;
     rawQueryInputLayer =
         new RawQueryInputLayer(
-            queryId,
-            UDF_READER_MEMORY_BUDGET_IN_MB,
-            deduplicatedPaths,
-            deduplicatedDataTypes,
-            readersOfSelectedSeries);
+            queryId, UDF_READER_MEMORY_BUDGET_IN_MB, udtfPlan, readersOfSelectedSeries);
 
     initTransformers();
     initDataSetFields();
diff --git a/server/src/main/java/org/apache/iotdb/db/query/executor/RawDataQueryExecutor.java b/server/src/main/java/org/apache/iotdb/db/query/executor/RawDataQueryExecutor.java
index 88481d9..09f43cf 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/executor/RawDataQueryExecutor.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/executor/RawDataQueryExecutor.java
@@ -68,11 +68,7 @@ public class RawDataQueryExecutor {
 
     try {
       return new RawQueryDataSetWithoutValueFilter(
-          context.getQueryId(),
-          queryPlan.getDeduplicatedPaths(),
-          queryPlan.getDeduplicatedDataTypes(),
-          readersOfSelectedSeries,
-          queryPlan.isAscending());
+          context.getQueryId(), queryPlan, readersOfSelectedSeries);
     } catch (InterruptedException e) {
       Thread.currentThread().interrupt();
       throw new StorageEngineException(e.getMessage());
diff --git a/server/src/main/java/org/apache/iotdb/db/query/udf/core/layer/RawQueryInputLayer.java b/server/src/main/java/org/apache/iotdb/db/query/udf/core/layer/RawQueryInputLayer.java
index b60e37c..9fcbb61 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/udf/core/layer/RawQueryInputLayer.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/udf/core/layer/RawQueryInputLayer.java
@@ -21,6 +21,7 @@ package org.apache.iotdb.db.query.udf.core.layer;
 
 import org.apache.iotdb.db.exception.query.QueryProcessException;
 import org.apache.iotdb.db.metadata.path.PartialPath;
+import org.apache.iotdb.db.qp.physical.crud.UDTFPlan;
 import org.apache.iotdb.db.query.dataset.IUDFInputDataSet;
 import org.apache.iotdb.db.query.dataset.RawQueryDataSetWithValueFilter;
 import org.apache.iotdb.db.query.dataset.UDFRawQueryInputDataSetWithoutValueFilter;
@@ -47,16 +48,12 @@ public class RawQueryInputLayer {
 
   /** InputLayerWithoutValueFilter */
   public RawQueryInputLayer(
-      long queryId,
-      float memoryBudgetInMB,
-      List<PartialPath> paths,
-      List<TSDataType> dataTypes,
-      List<ManagedSeriesReader> readers)
+      long queryId, float memoryBudgetInMB, UDTFPlan queryPlan, List<ManagedSeriesReader> readers)
       throws QueryProcessException, IOException, InterruptedException {
     construct(
         queryId,
         memoryBudgetInMB,
-        new UDFRawQueryInputDataSetWithoutValueFilter(queryId, paths, dataTypes, readers, true));
+        new UDFRawQueryInputDataSetWithoutValueFilter(queryId, queryPlan, readers));
   }
 
   /** InputLayerWithValueFilter */