You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by hu...@apache.org on 2022/12/01 03:01:51 UTC

[iotdb] branch lmh/scanOpBatchProcess1.0 created (now cf713db5cc)

This is an automated email from the ASF dual-hosted git repository.

hui pushed a change to branch lmh/scanOpBatchProcess1.0
in repository https://gitbox.apache.org/repos/asf/iotdb.git


      at cf713db5cc try to unify

This branch includes the following new commits:

     new fee78afd5b tmp save before run
     new 3bd94b12ec revert config modify
     new 1c97ac2146 fix appendTsBlockToBuilder
     new dda6cec072 rename
     new ceb9331558 fix maxReturnSize
     new cf713db5cc try to unify

The 6 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.



[iotdb] 03/06: fix appendTsBlockToBuilder

Posted by hu...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

hui pushed a commit to branch lmh/scanOpBatchProcess1.0
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit 1c97ac21468a564b3aa1d05597dbf46a1b1af0c1
Author: Minghui Liu <li...@foxmail.com>
AuthorDate: Tue Nov 29 20:12:55 2022 +0800

    fix appendTsBlockToBuilder
---
 .../tsfile/read/common/block/TsBlockUtil.java      | 32 ++++++++++++++++++++++
 .../iotdb/tsfile/read/reader/IPageReader.java      | 16 ++---------
 2 files changed, 35 insertions(+), 13 deletions(-)

diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/block/TsBlockUtil.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/block/TsBlockUtil.java
index 34f1cd42dc..cf52289a3e 100644
--- a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/block/TsBlockUtil.java
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/block/TsBlockUtil.java
@@ -20,7 +20,10 @@
 package org.apache.iotdb.tsfile.read.common.block;
 
 import org.apache.iotdb.tsfile.read.common.TimeRange;
+import org.apache.iotdb.tsfile.read.common.block.column.Column;
+import org.apache.iotdb.tsfile.read.common.block.column.ColumnBuilder;
 import org.apache.iotdb.tsfile.read.common.block.column.TimeColumn;
+import org.apache.iotdb.tsfile.read.common.block.column.TimeColumnBuilder;
 
 public class TsBlockUtil {
 
@@ -63,4 +66,33 @@ public class TsBlockUtil {
     }
     return left;
   }
+
+  public static void appendTsBlockToBuilder(TsBlock tsBlock, TsBlockBuilder builder) {
+    int size = tsBlock.getPositionCount();
+    TimeColumnBuilder timeColumnBuilder = builder.getTimeColumnBuilder();
+    TimeColumn timeColumn = tsBlock.getTimeColumn();
+    for (int i = 0; i < size; i++) {
+      timeColumnBuilder.writeLong(timeColumn.getLong(i));
+      builder.declarePosition();
+    }
+    for (int columnIndex = 0, columnSize = tsBlock.getValueColumnCount();
+        columnIndex < columnSize;
+        columnIndex++) {
+      ColumnBuilder columnBuilder = builder.getColumnBuilder(columnIndex);
+      Column column = tsBlock.getColumn(columnIndex);
+      if (column.mayHaveNull()) {
+        for (int i = 0; i < size; i++) {
+          if (column.isNull(i)) {
+            columnBuilder.appendNull();
+          } else {
+            columnBuilder.write(column, i);
+          }
+        }
+      } else {
+        for (int i = 0; i < size; i++) {
+          columnBuilder.write(column, i);
+        }
+      }
+    }
+  }
 }
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/reader/IPageReader.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/reader/IPageReader.java
index a967cae19c..1ba2f30218 100644
--- a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/reader/IPageReader.java
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/reader/IPageReader.java
@@ -23,13 +23,13 @@ import org.apache.iotdb.tsfile.file.metadata.statistics.Statistics;
 import org.apache.iotdb.tsfile.read.common.BatchData;
 import org.apache.iotdb.tsfile.read.common.block.TsBlock;
 import org.apache.iotdb.tsfile.read.common.block.TsBlockBuilder;
-import org.apache.iotdb.tsfile.read.common.block.column.ColumnBuilder;
-import org.apache.iotdb.tsfile.read.common.block.column.TimeColumnBuilder;
 import org.apache.iotdb.tsfile.read.filter.basic.Filter;
 
 import java.io.IOException;
 import java.util.List;
 
+import static org.apache.iotdb.tsfile.read.common.block.TsBlockUtil.appendTsBlockToBuilder;
+
 public interface IPageReader {
 
   default BatchData getAllSatisfiedPageData() throws IOException {
@@ -46,19 +46,9 @@ public interface IPageReader {
     if (ascending) {
       writeDataToBuilder(builder);
     } else {
-      TimeColumnBuilder timeColumnBuilder = builder.getTimeColumnBuilder();
-      ColumnBuilder[] valueColumnBuilders = builder.getValueColumnBuilders();
-      int columnNum = valueColumnBuilders.length;
-
       TsBlock tsBlock = getAllSatisfiedData();
       tsBlock.reverse();
-      for (int i = 0, size = tsBlock.getPositionCount(); i < size; i++) {
-        timeColumnBuilder.write(tsBlock.getTimeColumn(), i);
-        for (int columnIndex = 0; columnIndex < columnNum; columnIndex++) {
-          valueColumnBuilders[columnIndex].write(tsBlock.getColumn(columnIndex), i);
-        }
-        builder.declarePosition();
-      }
+      appendTsBlockToBuilder(tsBlock, builder);
     }
   }
 


[iotdb] 04/06: rename

Posted by hu...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

hui pushed a commit to branch lmh/scanOpBatchProcess1.0
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit dda6cec072bbb5e5cd495a2ab21777a8f06de94f
Author: Minghui Liu <li...@foxmail.com>
AuthorDate: Wed Nov 30 22:48:25 2022 +0800

    rename
---
 .../execution/operator/source/SeriesScanUtil.java  | 55 ++++++++++------------
 .../query/reader/chunk/MemAlignedPageReader.java   |  4 +-
 .../iotdb/db/query/reader/chunk/MemPageReader.java |  4 +-
 .../iotdb/tsfile/read/reader/IPageReader.java      |  7 +--
 .../tsfile/read/reader/page/AlignedPageReader.java |  4 +-
 .../iotdb/tsfile/read/reader/page/PageReader.java  |  4 +-
 6 files changed, 37 insertions(+), 41 deletions(-)

diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/source/SeriesScanUtil.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/source/SeriesScanUtil.java
index b667d08858..7d15555852 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/source/SeriesScanUtil.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/source/SeriesScanUtil.java
@@ -456,39 +456,32 @@ public class SeriesScanUtil {
     return firstPageReader != null;
   }
 
+  /** Return true if there is new data written to the builder */
   public boolean tryToFetchDataFromPage() throws IOException {
 
-    /*
-     * has overlapped data
-     */
+    // has overlapped data
     if (mergeReader.hasNextTimeValuePair() || firstPageOverlapped()) {
-      if (tryToBuildFromMergeReader()) {
+      if (appendDataFromMergeReader()) {
         return true;
       }
     }
 
     if (firstPageReader != null) {
-      buildFromPageReader();
+      appendDataFromPageReader();
       return true;
     }
 
-    /*
-     * construct first page reader
-     */
+    // construct first page reader
     if (firstChunkMetadata != null) {
-      /*
-       * try to unpack all overlapped ChunkMetadata to cachedPageReaders
-       */
+      // try to unpack all overlapped ChunkMetadata to cachedPageReaders
       unpackAllOverlappedChunkMetadataToPageReaders(
           orderUtils.getOverlapCheckTime(firstChunkMetadata.getStatistics()), true);
     } else {
-      /*
-       * first chunk metadata is already unpacked, consume cached pages
-       */
+      // first chunk metadata is already unpacked, consume cached pages
       initFirstPageReader();
     }
 
-    if (tryToBuildFromOverlappedPage()) {
+    if (tryToFetchDataFromOverlappedPage()) {
       return true;
     }
 
@@ -498,39 +491,41 @@ public class SeriesScanUtil {
 
       initFirstPageReader();
 
-      if (tryToBuildFromOverlappedPage()) {
+      if (tryToFetchDataFromOverlappedPage()) {
         return true;
       }
     }
 
     if (firstPageReader != null) {
-      buildFromPageReader();
+      appendDataFromPageReader();
       return true;
     }
     return false;
   }
 
-  private boolean tryToBuildFromOverlappedPage() throws IOException {
+  /** Return true if there is new data written to the builder */
+  private boolean tryToFetchDataFromOverlappedPage() throws IOException {
     if (firstPageOverlapped()) {
       // next page is overlapped, read overlapped data and cache it
-      return tryToBuildFromMergeReader();
+      return appendDataFromMergeReader();
     }
     return false;
   }
 
-  public void buildFromPageReader() throws IOException {
+  public void appendDataFromPageReader() throws IOException {
     /*
      * next page is not overlapped, push down value filter if it exists
      */
     if (valueFilter != null) {
       firstPageReader.setFilter(valueFilter);
     }
-    firstPageReader.getAllSatisfiedPageData(orderUtils.getAscending(), cachedTsBlockBuilder);
+    firstPageReader.appendPageDataToBuilder(orderUtils.getAscending(), cachedTsBlockBuilder);
     firstPageReader = null;
   }
 
-  private boolean tryToBuildFromMergeReader() throws IOException {
-    int rawSize = cachedTsBlockBuilder.getPositionCount();
+  /** Return true if there is new data written to the builder */
+  private boolean appendDataFromMergeReader() throws IOException {
+    int initialPositionCount = cachedTsBlockBuilder.getPositionCount();
 
     tryToPutAllDirectlyOverlappedUnseqPageReadersIntoMergeReader();
 
@@ -558,7 +553,7 @@ public class SeriesScanUtil {
              * 3. sequence page reader is not empty, which means first page reader last endTime < currentTimeValuePair.getTimestamp(),
              * we could use the first sequence page reader later
              */
-            if (cachedTsBlockBuilder.getPositionCount() > rawSize
+            if (cachedTsBlockBuilder.getPositionCount() > initialPositionCount
                 || firstPageReader != null
                 || !seqPageReaders.isEmpty()) {
               break;
@@ -587,7 +582,7 @@ public class SeriesScanUtil {
                 || (!orderUtils.getAscending()
                     && timeValuePair.getTimestamp()
                         < firstPageReader.getStatistics().getStartTime())) {
-              return cachedTsBlockBuilder.getPositionCount() > rawSize;
+              return cachedTsBlockBuilder.getPositionCount() > initialPositionCount;
             } else if (orderUtils.isOverlapped(
                 timeValuePair.getTimestamp(), firstPageReader.getStatistics())) {
               // current timeValuePair is overlapped with firstPageReader, add it to merged reader
@@ -612,7 +607,7 @@ public class SeriesScanUtil {
                 || (!orderUtils.getAscending()
                     && timeValuePair.getTimestamp()
                         < seqPageReaders.get(0).getStatistics().getStartTime())) {
-              return cachedTsBlockBuilder.getPositionCount() > rawSize;
+              return cachedTsBlockBuilder.getPositionCount() > initialPositionCount;
             } else if (orderUtils.isOverlapped(
                 timeValuePair.getTimestamp(), seqPageReaders.get(0).getStatistics())) {
               VersionPageReader pageReader = seqPageReaders.remove(0);
@@ -697,7 +692,7 @@ public class SeriesScanUtil {
         /*
          * if current overlapped page has valid data, return, otherwise read next overlapped page
          */
-        if (cachedTsBlockBuilder.getPositionCount() > rawSize) {
+        if (cachedTsBlockBuilder.getPositionCount() > initialPositionCount) {
           return true;
         } else if (mergeReader.hasNextTimeValuePair()) {
           // condition: seqPage.endTime < mergeReader.currentTime
@@ -1351,7 +1346,7 @@ public class SeriesScanUtil {
     return cachedTsBlockBuilder;
   }
 
-  protected class VersionPageReader {
+  protected static class VersionPageReader {
 
     protected PriorityMergeReader.MergeReaderPriority version;
     protected IPageReader data;
@@ -1394,8 +1389,8 @@ public class SeriesScanUtil {
       return tsBlock;
     }
 
-    void getAllSatisfiedPageData(boolean ascending, TsBlockBuilder builder) throws IOException {
-      data.getAllSatisfiedData(ascending, builder);
+    void appendPageDataToBuilder(boolean ascending, TsBlockBuilder builder) throws IOException {
+      data.appendAllSatisfiedDataToBuilder(ascending, builder);
     }
 
     void setFilter(Filter filter) {
diff --git a/server/src/main/java/org/apache/iotdb/db/query/reader/chunk/MemAlignedPageReader.java b/server/src/main/java/org/apache/iotdb/db/query/reader/chunk/MemAlignedPageReader.java
index ae13babf9f..b7bfecc3fb 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/reader/chunk/MemAlignedPageReader.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/reader/chunk/MemAlignedPageReader.java
@@ -88,12 +88,12 @@ public class MemAlignedPageReader implements IPageReader, IAlignedPageReader {
   @Override
   public TsBlock getAllSatisfiedData() {
     builder.reset();
-    writeDataToBuilder(builder);
+    appendDataToBuilder(builder);
     return builder.build();
   }
 
   @Override
-  public void writeDataToBuilder(TsBlockBuilder builder) {
+  public void appendDataToBuilder(TsBlockBuilder builder) {
     boolean[] satisfyInfo = new boolean[tsBlock.getPositionCount()];
 
     for (int row = 0; row < tsBlock.getPositionCount(); row++) {
diff --git a/server/src/main/java/org/apache/iotdb/db/query/reader/chunk/MemPageReader.java b/server/src/main/java/org/apache/iotdb/db/query/reader/chunk/MemPageReader.java
index 85f08de1c8..1638d06dfb 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/reader/chunk/MemPageReader.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/reader/chunk/MemPageReader.java
@@ -92,12 +92,12 @@ public class MemPageReader implements IPageReader {
   public TsBlock getAllSatisfiedData() {
     TSDataType dataType = chunkMetadata.getDataType();
     TsBlockBuilder builder = new TsBlockBuilder(Collections.singletonList(dataType));
-    writeDataToBuilder(builder);
+    appendDataToBuilder(builder);
     return builder.build();
   }
 
   @Override
-  public void writeDataToBuilder(TsBlockBuilder builder) {
+  public void appendDataToBuilder(TsBlockBuilder builder) {
     TSDataType dataType = chunkMetadata.getDataType();
     TimeColumnBuilder timeBuilder = builder.getTimeColumnBuilder();
     ColumnBuilder valueBuilder = builder.getColumnBuilder(0);
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/reader/IPageReader.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/reader/IPageReader.java
index 1ba2f30218..761e7044a4 100644
--- a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/reader/IPageReader.java
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/reader/IPageReader.java
@@ -40,11 +40,12 @@ public interface IPageReader {
 
   TsBlock getAllSatisfiedData() throws IOException;
 
-  void writeDataToBuilder(TsBlockBuilder builder) throws IOException;
+  void appendDataToBuilder(TsBlockBuilder builder) throws IOException;
 
-  default void getAllSatisfiedData(boolean ascending, TsBlockBuilder builder) throws IOException {
+  default void appendAllSatisfiedDataToBuilder(boolean ascending, TsBlockBuilder builder)
+      throws IOException {
     if (ascending) {
-      writeDataToBuilder(builder);
+      appendDataToBuilder(builder);
     } else {
       TsBlock tsBlock = getAllSatisfiedData();
       tsBlock.reverse();
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/reader/page/AlignedPageReader.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/reader/page/AlignedPageReader.java
index faae955198..61097d3d9e 100644
--- a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/reader/page/AlignedPageReader.java
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/reader/page/AlignedPageReader.java
@@ -116,12 +116,12 @@ public class AlignedPageReader implements IPageReader, IAlignedPageReader {
   @Override
   public TsBlock getAllSatisfiedData() throws IOException {
     builder.reset();
-    writeDataToBuilder(builder);
+    appendDataToBuilder(builder);
     return builder.build();
   }
 
   @Override
-  public void writeDataToBuilder(TsBlockBuilder builder) throws IOException {
+  public void appendDataToBuilder(TsBlockBuilder builder) throws IOException {
     long[] timeBatch = timePageReader.getNextTimeBatch();
 
     // if all the sub sensors' value are null in current row, just discard it
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/reader/page/PageReader.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/reader/page/PageReader.java
index 190bb11b5c..2fe7f04540 100644
--- a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/reader/page/PageReader.java
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/reader/page/PageReader.java
@@ -161,12 +161,12 @@ public class PageReader implements IPageReader {
   @Override
   public TsBlock getAllSatisfiedData() throws IOException {
     TsBlockBuilder builder = new TsBlockBuilder(Collections.singletonList(dataType));
-    writeDataToBuilder(builder);
+    appendDataToBuilder(builder);
     return builder.build();
   }
 
   @Override
-  public void writeDataToBuilder(TsBlockBuilder builder) throws IOException {
+  public void appendDataToBuilder(TsBlockBuilder builder) throws IOException {
     TimeColumnBuilder timeBuilder = builder.getTimeColumnBuilder();
     ColumnBuilder valueBuilder = builder.getColumnBuilder(0);
     if (filter == null || filter.satisfy(getStatistics())) {


[iotdb] 05/06: fix maxReturnSize

Posted by hu...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

hui pushed a commit to branch lmh/scanOpBatchProcess1.0
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit ceb9331558b142f3d470916372f32e5653d73f5c
Author: Minghui Liu <li...@foxmail.com>
AuthorDate: Wed Nov 30 22:56:58 2022 +0800

    fix maxReturnSize
---
 .../source/AbstractSeriesScanOperator.java         |  10 +-
 .../operator/source/AlignedSeriesScanOperator.java | 180 +--------------------
 .../operator/source/SeriesScanOperator.java        | 174 +-------------------
 3 files changed, 16 insertions(+), 348 deletions(-)

diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/source/AbstractSeriesScanOperator.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/source/AbstractSeriesScanOperator.java
index ae3191626a..98c3dc7d3f 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/source/AbstractSeriesScanOperator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/source/AbstractSeriesScanOperator.java
@@ -22,7 +22,6 @@ package org.apache.iotdb.db.mpp.execution.operator.source;
 import org.apache.iotdb.db.engine.querycontext.QueryDataSource;
 import org.apache.iotdb.db.mpp.execution.operator.OperatorContext;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanNodeId;
-import org.apache.iotdb.tsfile.common.conf.TSFileDescriptor;
 import org.apache.iotdb.tsfile.read.common.block.TsBlock;
 import org.apache.iotdb.tsfile.read.common.block.TsBlockBuilder;
 
@@ -44,16 +43,13 @@ public abstract class AbstractSeriesScanOperator implements DataSourceOperator {
   public AbstractSeriesScanOperator(
       PlanNodeId sourceId,
       SeriesScanUtil seriesScanUtil,
-      int subSensorSize,
-      OperatorContext context) {
+      OperatorContext context,
+      long maxReturnSize) {
     this.sourceId = sourceId;
     this.operatorContext = context;
     this.seriesScanUtil = seriesScanUtil;
     this.resultBuilder = seriesScanUtil.getCachedTsBlockBuilder();
-
-    // time + all value columns
-    this.maxReturnSize =
-        (1L + subSensorSize) * TSFileDescriptor.getInstance().getConfig().getPageSizeInByte();
+    this.maxReturnSize = maxReturnSize;
   }
 
   @Override
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/source/AlignedSeriesScanOperator.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/source/AlignedSeriesScanOperator.java
index 9caaeeac60..482bcb5001 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/source/AlignedSeriesScanOperator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/source/AlignedSeriesScanOperator.java
@@ -19,32 +19,14 @@
 package org.apache.iotdb.db.mpp.execution.operator.source;
 
 import org.apache.iotdb.commons.path.AlignedPath;
-import org.apache.iotdb.db.engine.querycontext.QueryDataSource;
 import org.apache.iotdb.db.mpp.execution.operator.OperatorContext;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanNodeId;
 import org.apache.iotdb.tsfile.common.conf.TSFileDescriptor;
-import org.apache.iotdb.tsfile.read.common.block.TsBlock;
-import org.apache.iotdb.tsfile.read.common.block.TsBlockBuilder;
-import org.apache.iotdb.tsfile.read.common.block.column.Column;
-import org.apache.iotdb.tsfile.read.common.block.column.ColumnBuilder;
-import org.apache.iotdb.tsfile.read.common.block.column.TimeColumn;
-import org.apache.iotdb.tsfile.read.common.block.column.TimeColumnBuilder;
 import org.apache.iotdb.tsfile.read.filter.basic.Filter;
 
-import java.io.IOException;
 import java.util.HashSet;
-import java.util.concurrent.TimeUnit;
 
-public class AlignedSeriesScanOperator implements DataSourceOperator {
-
-  private final OperatorContext operatorContext;
-  private final AlignedSeriesScanUtil seriesScanUtil;
-  private final PlanNodeId sourceId;
-
-  private final TsBlockBuilder builder;
-  private boolean finished = false;
-
-  private final long maxReturnSize;
+public class AlignedSeriesScanOperator extends AbstractSeriesScanOperator {
 
   public AlignedSeriesScanOperator(
       PlanNodeId sourceId,
@@ -53,166 +35,18 @@ public class AlignedSeriesScanOperator implements DataSourceOperator {
       Filter timeFilter,
       Filter valueFilter,
       boolean ascending) {
-    this.sourceId = sourceId;
-    this.operatorContext = context;
-    this.seriesScanUtil =
+    super(
+        sourceId,
         new AlignedSeriesScanUtil(
             seriesPath,
             new HashSet<>(seriesPath.getMeasurementList()),
             context.getInstanceContext(),
             timeFilter,
             valueFilter,
-            ascending);
-    // time + all value columns
-    this.maxReturnSize =
+            ascending),
+        context,
+        // time + all value columns
         (1L + seriesPath.getMeasurementList().size())
-            * TSFileDescriptor.getInstance().getConfig().getPageSizeInByte();
-    this.builder = new TsBlockBuilder(seriesScanUtil.getTsDataTypeList());
-  }
-
-  @Override
-  public OperatorContext getOperatorContext() {
-    return operatorContext;
-  }
-
-  @Override
-  public TsBlock next() {
-    TsBlock block = builder.build();
-    builder.reset();
-    return block;
-  }
-
-  @Override
-  public boolean hasNext() {
-    try {
-
-      // start stopwatch
-      long maxRuntime = operatorContext.getMaxRunTime().roundTo(TimeUnit.NANOSECONDS);
-      long start = System.nanoTime();
-
-      // here use do-while to promise doing this at least once
-      do {
-        /*
-         * consume page data firstly
-         */
-        if (readPageData()) {
-          continue;
-        }
-
-        /*
-         * consume chunk data secondly
-         */
-        if (readChunkData()) {
-          continue;
-        }
-
-        /*
-         * consume next file finally
-         */
-        if (readFileData()) {
-          continue;
-        }
-        break;
-
-      } while (System.nanoTime() - start < maxRuntime && !builder.isFull());
-
-      finished = builder.isEmpty();
-
-      return !finished;
-    } catch (IOException e) {
-      throw new RuntimeException("Error happened while scanning the file", e);
-    }
-  }
-
-  @Override
-  public boolean isFinished() {
-    return finished;
-  }
-
-  @Override
-  public long calculateMaxPeekMemory() {
-    return maxReturnSize;
-  }
-
-  @Override
-  public long calculateMaxReturnSize() {
-    return maxReturnSize;
-  }
-
-  @Override
-  public long calculateRetainedSizeAfterCallingNext() {
-    return 0L;
-  }
-
-  private boolean readFileData() throws IOException {
-    while (seriesScanUtil.hasNextFile()) {
-      if (readChunkData()) {
-        return true;
-      }
-    }
-    return false;
-  }
-
-  private boolean readChunkData() throws IOException {
-    while (seriesScanUtil.hasNextChunk()) {
-      if (readPageData()) {
-        return true;
-      }
-    }
-    return false;
-  }
-
-  private boolean readPageData() throws IOException {
-    while (seriesScanUtil.hasNextPage()) {
-      TsBlock tsBlock = seriesScanUtil.nextPage();
-      if (!isEmpty(tsBlock)) {
-        appendToBuilder(tsBlock);
-        return true;
-      }
-    }
-    return false;
-  }
-
-  private void appendToBuilder(TsBlock tsBlock) {
-    int size = tsBlock.getPositionCount();
-    TimeColumnBuilder timeColumnBuilder = builder.getTimeColumnBuilder();
-    TimeColumn timeColumn = tsBlock.getTimeColumn();
-    for (int i = 0; i < size; i++) {
-      timeColumnBuilder.writeLong(timeColumn.getLong(i));
-      builder.declarePosition();
-    }
-    for (int columnIndex = 0, columnSize = tsBlock.getValueColumnCount();
-        columnIndex < columnSize;
-        columnIndex++) {
-      ColumnBuilder columnBuilder = builder.getColumnBuilder(columnIndex);
-      Column column = tsBlock.getColumn(columnIndex);
-      if (column.mayHaveNull()) {
-        for (int i = 0; i < size; i++) {
-          if (column.isNull(i)) {
-            columnBuilder.appendNull();
-          } else {
-            columnBuilder.write(column, i);
-          }
-        }
-      } else {
-        for (int i = 0; i < size; i++) {
-          columnBuilder.write(column, i);
-        }
-      }
-    }
-  }
-
-  private boolean isEmpty(TsBlock tsBlock) {
-    return tsBlock == null || tsBlock.isEmpty();
-  }
-
-  @Override
-  public PlanNodeId getSourceId() {
-    return sourceId;
-  }
-
-  @Override
-  public void initQueryDataSource(QueryDataSource dataSource) {
-    seriesScanUtil.initQueryDataSource(dataSource);
+            * TSFileDescriptor.getInstance().getConfig().getPageSizeInByte());
   }
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/source/SeriesScanOperator.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/source/SeriesScanOperator.java
index fd831fcd6a..ead57a4943 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/source/SeriesScanOperator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/source/SeriesScanOperator.java
@@ -19,33 +19,15 @@
 package org.apache.iotdb.db.mpp.execution.operator.source;
 
 import org.apache.iotdb.commons.path.PartialPath;
-import org.apache.iotdb.db.engine.querycontext.QueryDataSource;
 import org.apache.iotdb.db.mpp.execution.operator.OperatorContext;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanNodeId;
 import org.apache.iotdb.tsfile.common.conf.TSFileDescriptor;
 import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
-import org.apache.iotdb.tsfile.read.common.block.TsBlock;
-import org.apache.iotdb.tsfile.read.common.block.TsBlockBuilder;
-import org.apache.iotdb.tsfile.read.common.block.column.Column;
-import org.apache.iotdb.tsfile.read.common.block.column.ColumnBuilder;
-import org.apache.iotdb.tsfile.read.common.block.column.TimeColumn;
-import org.apache.iotdb.tsfile.read.common.block.column.TimeColumnBuilder;
 import org.apache.iotdb.tsfile.read.filter.basic.Filter;
 
-import java.io.IOException;
 import java.util.Set;
-import java.util.concurrent.TimeUnit;
 
-public class SeriesScanOperator implements DataSourceOperator {
-
-  private final OperatorContext operatorContext;
-  private final SeriesScanUtil seriesScanUtil;
-  private final PlanNodeId sourceId;
-  private final TsBlockBuilder builder;
-
-  private boolean finished = false;
-
-  private final long maxReturnSize;
+public class SeriesScanOperator extends AbstractSeriesScanOperator {
 
   public SeriesScanOperator(
       PlanNodeId sourceId,
@@ -56,9 +38,8 @@ public class SeriesScanOperator implements DataSourceOperator {
       Filter timeFilter,
       Filter valueFilter,
       boolean ascending) {
-    this.sourceId = sourceId;
-    this.operatorContext = context;
-    this.seriesScanUtil =
+    super(
+        sourceId,
         new SeriesScanUtil(
             seriesPath,
             allSensors,
@@ -66,151 +47,8 @@ public class SeriesScanOperator implements DataSourceOperator {
             context.getInstanceContext(),
             timeFilter,
             valueFilter,
-            ascending);
-    this.maxReturnSize = TSFileDescriptor.getInstance().getConfig().getPageSizeInByte();
-    this.builder = new TsBlockBuilder(seriesScanUtil.getTsDataTypeList());
-  }
-
-  @Override
-  public OperatorContext getOperatorContext() {
-    return operatorContext;
-  }
-
-  @Override
-  public TsBlock next() {
-    TsBlock block = builder.build();
-    builder.reset();
-    return block;
-  }
-
-  @Override
-  public boolean hasNext() {
-    try {
-
-      // start stopwatch
-      long maxRuntime = operatorContext.getMaxRunTime().roundTo(TimeUnit.NANOSECONDS);
-      long start = System.nanoTime();
-
-      // here use do-while to promise doing this at least once
-      do {
-        /*
-         * consume page data firstly
-         */
-        if (readPageData()) {
-          continue;
-        }
-
-        /*
-         * consume chunk data secondly
-         */
-        if (readChunkData()) {
-          continue;
-        }
-
-        /*
-         * consume next file finally
-         */
-        if (readFileData()) {
-          continue;
-        }
-        break;
-
-      } while (System.nanoTime() - start < maxRuntime && !builder.isFull());
-
-      finished = builder.isEmpty();
-
-      return !finished;
-    } catch (IOException e) {
-      throw new RuntimeException("Error happened while scanning the file", e);
-    }
-  }
-
-  @Override
-  public boolean isFinished() {
-    return finished;
-  }
-
-  @Override
-  public long calculateMaxPeekMemory() {
-    return maxReturnSize;
-  }
-
-  @Override
-  public long calculateMaxReturnSize() {
-    return maxReturnSize;
-  }
-
-  @Override
-  public long calculateRetainedSizeAfterCallingNext() {
-    return 0L;
-  }
-
-  private boolean readFileData() throws IOException {
-    while (seriesScanUtil.hasNextFile()) {
-      if (readChunkData()) {
-        return true;
-      }
-    }
-    return false;
-  }
-
-  private boolean readChunkData() throws IOException {
-    while (seriesScanUtil.hasNextChunk()) {
-      if (readPageData()) {
-        return true;
-      }
-    }
-    return false;
-  }
-
-  private boolean readPageData() throws IOException {
-    while (seriesScanUtil.hasNextPage()) {
-      TsBlock tsBlock = seriesScanUtil.nextPage();
-
-      if (!isEmpty(tsBlock)) {
-        appendToBuilder(tsBlock);
-        return true;
-      }
-    }
-    return false;
-  }
-
-  private void appendToBuilder(TsBlock tsBlock) {
-    TimeColumnBuilder timeColumnBuilder = builder.getTimeColumnBuilder();
-    TimeColumn timeColumn = tsBlock.getTimeColumn();
-    ColumnBuilder columnBuilder = builder.getColumnBuilder(0);
-    Column column = tsBlock.getColumn(0);
-
-    if (column.mayHaveNull()) {
-      for (int i = 0, size = tsBlock.getPositionCount(); i < size; i++) {
-        timeColumnBuilder.writeLong(timeColumn.getLong(i));
-        if (column.isNull(i)) {
-          columnBuilder.appendNull();
-        } else {
-          columnBuilder.write(column, i);
-        }
-        builder.declarePosition();
-      }
-    } else {
-      for (int i = 0, size = tsBlock.getPositionCount(); i < size; i++) {
-        timeColumnBuilder.writeLong(timeColumn.getLong(i));
-        columnBuilder.write(column, i);
-        builder.declarePosition();
-      }
-    }
-  }
-
-  private boolean isEmpty(TsBlock tsBlock) {
-    return tsBlock == null || tsBlock.isEmpty();
-  }
-
-  @Override
-  public PlanNodeId getSourceId() {
-    return sourceId;
-  }
-
-  @Override
-  public void initQueryDataSource(QueryDataSource dataSource) {
-    seriesScanUtil.initQueryDataSource(dataSource);
+            ascending),
+        context,
+        TSFileDescriptor.getInstance().getConfig().getPageSizeInByte());
   }
 }


[iotdb] 06/06: try to unify

Posted by hu...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

hui pushed a commit to branch lmh/scanOpBatchProcess1.0
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit cf713db5ccd500175578650a1d1a46fd923fc1ab
Author: Minghui Liu <li...@foxmail.com>
AuthorDate: Wed Nov 30 23:13:20 2022 +0800

    try to unify
---
 .../source/AbstractSeriesScanOperator.java         |   2 +-
 .../execution/operator/source/SeriesScanUtil.java  | 336 ++-------------------
 2 files changed, 33 insertions(+), 305 deletions(-)

diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/source/AbstractSeriesScanOperator.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/source/AbstractSeriesScanOperator.java
index 98c3dc7d3f..fabb66ebd6 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/source/AbstractSeriesScanOperator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/source/AbstractSeriesScanOperator.java
@@ -137,7 +137,7 @@ public abstract class AbstractSeriesScanOperator implements DataSourceOperator {
   }
 
   private boolean readPageData() throws IOException {
-    return seriesScanUtil.tryToFetchDataFromPage();
+    return seriesScanUtil.tryToFetchDataFromPage(true);
   }
 
   @Override
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/source/SeriesScanUtil.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/source/SeriesScanUtil.java
index 7d15555852..a238c24442 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/source/SeriesScanUtil.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/source/SeriesScanUtil.java
@@ -55,7 +55,6 @@ import java.util.Objects;
 import java.util.PriorityQueue;
 import java.util.Set;
 import java.util.function.ToLongFunction;
-import java.util.stream.Collectors;
 
 import static com.google.common.base.Preconditions.checkArgument;
 
@@ -118,9 +117,7 @@ public class SeriesScanUtil {
   /*
    * result cache
    */
-  protected boolean hasCachedNextOverlappedPage;
-  protected TsBlock cachedTsBlock;
-
+  protected boolean lastPageOverlapped;
   protected TsBlockBuilder cachedTsBlockBuilder;
 
   public SeriesScanUtil(
@@ -400,64 +397,13 @@ public class SeriesScanUtil {
    * This method should be called after hasNextChunk() until no next page, make sure that all
    * overlapped pages are consumed
    */
-  @SuppressWarnings("squid:S3776")
-  // Suppress high Cognitive Complexity warning
   public boolean hasNextPage() throws IOException {
-
-    /*
-     * has overlapped data before
-     */
-    if (hasCachedNextOverlappedPage) {
-      return true;
-    } else if (mergeReader.hasNextTimeValuePair() || firstPageOverlapped()) {
-      if (hasNextOverlappedPage()) {
-        cachedTsBlock = nextOverlappedPage();
-        if (cachedTsBlock != null && !cachedTsBlock.isEmpty()) {
-          hasCachedNextOverlappedPage = true;
-          return true;
-        }
-      }
-    }
-
-    if (firstPageReader != null) {
-      return true;
-    }
-
-    /*
-     * construct first page reader
-     */
-    if (firstChunkMetadata != null) {
-      /*
-       * try to unpack all overlapped ChunkMetadata to cachedPageReaders
-       */
-      unpackAllOverlappedChunkMetadataToPageReaders(
-          orderUtils.getOverlapCheckTime(firstChunkMetadata.getStatistics()), true);
-    } else {
-      /*
-       * first chunk metadata is already unpacked, consume cached pages
-       */
-      initFirstPageReader();
-    }
-
-    if (isExistOverlappedPage()) {
-      return true;
-    }
-
-    // make sure firstPageReader won't be null while the unSeqPageReaders has more cached page
-    // readers
-    while (firstPageReader == null && (!seqPageReaders.isEmpty() || !unSeqPageReaders.isEmpty())) {
-
-      initFirstPageReader();
-
-      if (isExistOverlappedPage()) {
-        return true;
-      }
-    }
-    return firstPageReader != null;
+    return tryToFetchDataFromPage(false);
   }
 
-  /** Return true if there is new data written to the builder */
-  public boolean tryToFetchDataFromPage() throws IOException {
+  /** @return true if there is new data written to the builder */
+  public boolean tryToFetchDataFromPage(boolean consumePageReader) throws IOException {
+    lastPageOverlapped = false;
 
     // has overlapped data
     if (mergeReader.hasNextTimeValuePair() || firstPageOverlapped()) {
@@ -467,7 +413,9 @@ public class SeriesScanUtil {
     }
 
     if (firstPageReader != null) {
-      appendDataFromPageReader();
+      if (consumePageReader) {
+        appendDataFromPageReader();
+      }
       return true;
     }
 
@@ -497,13 +445,15 @@ public class SeriesScanUtil {
     }
 
     if (firstPageReader != null) {
-      appendDataFromPageReader();
+      if (consumePageReader) {
+        appendDataFromPageReader();
+      }
       return true;
     }
     return false;
   }
 
-  /** Return true if there is new data written to the builder */
+  /** @return true if there is new data written to the builder */
   private boolean tryToFetchDataFromOverlappedPage() throws IOException {
     if (firstPageOverlapped()) {
       // next page is overlapped, read overlapped data and cache it
@@ -523,7 +473,12 @@ public class SeriesScanUtil {
     firstPageReader = null;
   }
 
-  /** Return true if there is new data written to the builder */
+  /**
+   * read overlapped data till currentLargestEndTime in mergeReader, if current batch does not
+   * contain data, read till next currentLargestEndTime again.
+   *
+   * @return true if there is new data written to the builder
+   */
   private boolean appendDataFromMergeReader() throws IOException {
     int initialPositionCount = cachedTsBlockBuilder.getPositionCount();
 
@@ -582,7 +537,8 @@ public class SeriesScanUtil {
                 || (!orderUtils.getAscending()
                     && timeValuePair.getTimestamp()
                         < firstPageReader.getStatistics().getStartTime())) {
-              return cachedTsBlockBuilder.getPositionCount() > initialPositionCount;
+              lastPageOverlapped = cachedTsBlockBuilder.getPositionCount() > initialPositionCount;
+              return lastPageOverlapped;
             } else if (orderUtils.isOverlapped(
                 timeValuePair.getTimestamp(), firstPageReader.getStatistics())) {
               // current timeValuePair is overlapped with firstPageReader, add it to merged reader
@@ -607,7 +563,8 @@ public class SeriesScanUtil {
                 || (!orderUtils.getAscending()
                     && timeValuePair.getTimestamp()
                         < seqPageReaders.get(0).getStatistics().getStartTime())) {
-              return cachedTsBlockBuilder.getPositionCount() > initialPositionCount;
+              lastPageOverlapped = cachedTsBlockBuilder.getPositionCount() > initialPositionCount;
+              return lastPageOverlapped;
             } else if (orderUtils.isOverlapped(
                 timeValuePair.getTimestamp(), seqPageReaders.get(0).getStatistics())) {
               VersionPageReader pageReader = seqPageReaders.remove(0);
@@ -692,7 +649,8 @@ public class SeriesScanUtil {
         /*
          * if current overlapped page has valid data, return, otherwise read next overlapped page
          */
-        if (cachedTsBlockBuilder.getPositionCount() > initialPositionCount) {
+        lastPageOverlapped = cachedTsBlockBuilder.getPositionCount() > initialPositionCount;
+        if (lastPageOverlapped) {
           return true;
         } else if (mergeReader.hasNextTimeValuePair()) {
           // condition: seqPage.endTime < mergeReader.currentTime
@@ -704,22 +662,6 @@ public class SeriesScanUtil {
     }
   }
 
-  private boolean isExistOverlappedPage() throws IOException {
-    if (firstPageOverlapped()) {
-      /*
-       * next page is overlapped, read overlapped data and cache it
-       */
-      if (hasNextOverlappedPage()) {
-        cachedTsBlock = nextOverlappedPage();
-        if (cachedTsBlock != null && !cachedTsBlock.isEmpty()) {
-          hasCachedNextOverlappedPage = true;
-          return true;
-        }
-      }
-    }
-    return false;
-  }
-
   private boolean firstPageOverlapped() throws IOException {
     if (firstPageReader == null) {
       return false;
@@ -813,17 +755,12 @@ public class SeriesScanUtil {
    * first page is overlapped
    */
   boolean isPageOverlapped() throws IOException {
-
-    /*
-     * has an overlapped page
-     */
-    if (hasCachedNextOverlappedPage) {
+    // has an overlapped page
+    if (lastPageOverlapped) {
       return true;
     }
 
-    /*
-     * has a non-overlapped page in firstPageReader
-     */
+    // has a non-overlapped page in firstPageReader
     if (mergeReader.hasNextTimeValuePair()
         && ((orderUtils.getAscending()
                 && mergeReader.currentTimeValuePair().getTimestamp()
@@ -835,7 +772,6 @@ public class SeriesScanUtil {
     }
 
     Statistics firstPageStatistics = firstPageReader.getStatistics();
-
     return !unSeqPageReaders.isEmpty()
         && orderUtils.isOverlapped(firstPageStatistics, unSeqPageReaders.peek().getStatistics());
   }
@@ -867,207 +803,13 @@ public class SeriesScanUtil {
     firstPageReader = null;
   }
 
-  /** This method should only be used when the method isPageOverlapped() return true. */
   public TsBlock nextPage() throws IOException {
-
-    if (hasCachedNextOverlappedPage) {
-      hasCachedNextOverlappedPage = false;
-      TsBlock res = cachedTsBlock;
-      cachedTsBlock = null;
-      return res;
-    } else {
-
-      /*
-       * next page is not overlapped, push down value filter if it exists
-       */
-      if (valueFilter != null) {
-        firstPageReader.setFilter(valueFilter);
-      }
-      TsBlock tsBlock = firstPageReader.getAllSatisfiedPageData(orderUtils.getAscending());
-      firstPageReader = null;
-
-      return tsBlock;
-    }
-  }
-
-  /**
-   * read overlapped data till currentLargestEndTime in mergeReader, if current batch does not
-   * contain data, read till next currentLargestEndTime again
-   */
-  @SuppressWarnings("squid:S3776") // Suppress high Cognitive Complexity warning
-  private boolean hasNextOverlappedPage() throws IOException {
-
-    if (hasCachedNextOverlappedPage) {
-      return true;
-    }
-
-    tryToPutAllDirectlyOverlappedUnseqPageReadersIntoMergeReader();
-
-    while (true) {
-
-      // may has overlapped data
-      if (mergeReader.hasNextTimeValuePair()) {
-
-        // TODO we still need to consider data type, ascending and descending here
-        TsBlockBuilder builder = new TsBlockBuilder(getTsDataTypeList());
-        TimeColumnBuilder timeBuilder = builder.getTimeColumnBuilder();
-        long currentPageEndPointTime = mergeReader.getCurrentReadStopTime();
-        while (mergeReader.hasNextTimeValuePair()) {
-
-          /*
-           * get current first point in mergeReader, this maybe overlapped later
-           */
-          TimeValuePair timeValuePair = mergeReader.currentTimeValuePair();
-
-          if (orderUtils.isExcessEndpoint(timeValuePair.getTimestamp(), currentPageEndPointTime)) {
-            /*
-             * when the merged point excesses the currentPageEndPointTime, we have read all overlapped data before currentPageEndPointTime
-             * 1. has cached batch data, we don't need to read more data, just use the cached data later
-             * 2. has first page reader, which means first page reader last endTime < currentTimeValuePair.getTimestamp(),
-             * we could just use the first page reader later
-             * 3. sequence page reader is not empty, which means first page reader last endTime < currentTimeValuePair.getTimestamp(),
-             * we could use the first sequence page reader later
-             */
-            if (!builder.isEmpty() || firstPageReader != null || !seqPageReaders.isEmpty()) {
-              break;
-            }
-            // so, we don't have other data except mergeReader
-            currentPageEndPointTime = mergeReader.getCurrentReadStopTime();
-          }
-
-          // unpack all overlapped data for the first timeValuePair
-          unpackAllOverlappedTsFilesToTimeSeriesMetadata(timeValuePair.getTimestamp());
-          unpackAllOverlappedTimeSeriesMetadataToCachedChunkMetadata(
-              timeValuePair.getTimestamp(), false);
-          unpackAllOverlappedChunkMetadataToPageReaders(timeValuePair.getTimestamp(), false);
-          unpackAllOverlappedUnseqPageReadersToMergeReader(timeValuePair.getTimestamp());
-
-          // update if there are unpacked unSeqPageReaders
-          timeValuePair = mergeReader.currentTimeValuePair();
-
-          // from now, the unsequence reader is all unpacked, so we don't need to consider it
-          // we has first page reader now
-          if (firstPageReader != null) {
-            // if current timeValuePair excesses the first page reader's end time, we just use the
-            // cached data
-            if ((orderUtils.getAscending()
-                    && timeValuePair.getTimestamp() > firstPageReader.getStatistics().getEndTime())
-                || (!orderUtils.getAscending()
-                    && timeValuePair.getTimestamp()
-                        < firstPageReader.getStatistics().getStartTime())) {
-              hasCachedNextOverlappedPage = !builder.isEmpty();
-              cachedTsBlock = builder.build();
-              return hasCachedNextOverlappedPage;
-            } else if (orderUtils.isOverlapped(
-                timeValuePair.getTimestamp(), firstPageReader.getStatistics())) {
-              // current timeValuePair is overlapped with firstPageReader, add it to merged reader
-              // and update endTime to the max end time
-              mergeReader.addReader(
-                  getPointReader(
-                      firstPageReader.getAllSatisfiedPageData(orderUtils.getAscending())),
-                  firstPageReader.version,
-                  orderUtils.getOverlapCheckTime(firstPageReader.getStatistics()),
-                  context);
-              currentPageEndPointTime =
-                  updateEndPointTime(currentPageEndPointTime, firstPageReader);
-              firstPageReader = null;
-            }
-          }
-
-          // the seq page readers is not empty, just like first page reader
-          if (!seqPageReaders.isEmpty()) {
-            if ((orderUtils.getAscending()
-                    && timeValuePair.getTimestamp()
-                        > seqPageReaders.get(0).getStatistics().getEndTime())
-                || (!orderUtils.getAscending()
-                    && timeValuePair.getTimestamp()
-                        < seqPageReaders.get(0).getStatistics().getStartTime())) {
-              hasCachedNextOverlappedPage = !builder.isEmpty();
-              cachedTsBlock = builder.build();
-              return hasCachedNextOverlappedPage;
-            } else if (orderUtils.isOverlapped(
-                timeValuePair.getTimestamp(), seqPageReaders.get(0).getStatistics())) {
-              VersionPageReader pageReader = seqPageReaders.remove(0);
-              mergeReader.addReader(
-                  getPointReader(pageReader.getAllSatisfiedPageData(orderUtils.getAscending())),
-                  pageReader.version,
-                  orderUtils.getOverlapCheckTime(pageReader.getStatistics()),
-                  context);
-              currentPageEndPointTime = updateEndPointTime(currentPageEndPointTime, pageReader);
-            }
-          }
-
-          /*
-           * get the latest first point in mergeReader
-           */
-          timeValuePair = mergeReader.nextTimeValuePair();
-
-          Object valueForFilter = timeValuePair.getValue().getValue();
-
-          // TODO fix value filter firstNotNullObject, currently, if it's a value filter, it will
-          // only accept AlignedPath with only one sub sensor
-          if (timeValuePair.getValue().getDataType() == TSDataType.VECTOR) {
-            for (TsPrimitiveType tsPrimitiveType : timeValuePair.getValue().getVector()) {
-              if (tsPrimitiveType != null) {
-                valueForFilter = tsPrimitiveType.getValue();
-                break;
-              }
-            }
-          }
-
-          if (valueFilter == null
-              || valueFilter.satisfy(timeValuePair.getTimestamp(), valueForFilter)) {
-            timeBuilder.writeLong(timeValuePair.getTimestamp());
-            switch (dataType) {
-              case BOOLEAN:
-                builder.getColumnBuilder(0).writeBoolean(timeValuePair.getValue().getBoolean());
-                break;
-              case INT32:
-                builder.getColumnBuilder(0).writeInt(timeValuePair.getValue().getInt());
-                break;
-              case INT64:
-                builder.getColumnBuilder(0).writeLong(timeValuePair.getValue().getLong());
-                break;
-              case FLOAT:
-                builder.getColumnBuilder(0).writeFloat(timeValuePair.getValue().getFloat());
-                break;
-              case DOUBLE:
-                builder.getColumnBuilder(0).writeDouble(timeValuePair.getValue().getDouble());
-                break;
-              case TEXT:
-                builder.getColumnBuilder(0).writeBinary(timeValuePair.getValue().getBinary());
-                break;
-              case VECTOR:
-                TsPrimitiveType[] values = timeValuePair.getValue().getVector();
-                for (int i = 0; i < values.length; i++) {
-                  if (values[i] == null) {
-                    builder.getColumnBuilder(i).appendNull();
-                  } else {
-                    builder.getColumnBuilder(i).writeTsPrimitiveType(values[i]);
-                  }
-                }
-                break;
-              default:
-                throw new UnSupportedDataTypeException(String.valueOf(dataType));
-            }
-            builder.declarePosition();
-          }
-        }
-        hasCachedNextOverlappedPage = !builder.isEmpty();
-        cachedTsBlock = builder.build();
-        /*
-         * if current overlapped page has valid data, return, otherwise read next overlapped page
-         */
-        if (hasCachedNextOverlappedPage) {
-          return true;
-        } else if (mergeReader.hasNextTimeValuePair()) {
-          // condition: seqPage.endTime < mergeReader.currentTime
-          return false;
-        }
-      } else {
-        return false;
-      }
+    if (!lastPageOverlapped) {
+      appendDataFromPageReader();
     }
+    TsBlock cachedTsBlock = cachedTsBlockBuilder.build();
+    cachedTsBlockBuilder.reset();
+    return cachedTsBlock;
   }
 
   private long updateEndPointTime(long currentPageEndPointTime, VersionPageReader pageReader) {
@@ -1176,20 +918,6 @@ public class SeriesScanUtil {
         context);
   }
 
-  private TsBlock nextOverlappedPage() throws IOException {
-    if (hasCachedNextOverlappedPage || hasNextOverlappedPage()) {
-      hasCachedNextOverlappedPage = false;
-      return cachedTsBlock;
-    }
-    throw new IOException("No more batch data");
-  }
-
-  private LinkedList<TsFileResource> sortUnSeqFileResources(List<TsFileResource> tsFileResources) {
-    return tsFileResources.stream()
-        .sorted(orderUtils.comparingLong(tsFileResource -> orderUtils.getOrderTime(tsFileResource)))
-        .collect(Collectors.toCollection(LinkedList::new));
-  }
-
   /**
    * unpack all overlapped seq/unseq files and find the first TimeSeriesMetadata
    *


[iotdb] 01/06: tmp save before run

Posted by hu...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

hui pushed a commit to branch lmh/scanOpBatchProcess1.0
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit fee78afd5b3b98a40852bd7d15822773abe610c6
Author: liuminghui233 <54...@qq.com>
AuthorDate: Mon Nov 28 10:31:40 2022 +0800

    tmp save before run
---
 .../java/org/apache/iotdb/db/conf/IoTDBConfig.java |   8 +-
 .../source/AbstractSeriesScanOperator.java         | 156 ++++++++++++
 .../operator/source/AlignedSeriesScanUtil.java     |   2 +
 .../execution/operator/source/SeriesScanUtil.java  | 269 ++++++++++++++++++++-
 .../query/reader/chunk/MemAlignedPageReader.java   |   7 +-
 .../iotdb/db/query/reader/chunk/MemPageReader.java |   8 +-
 .../tsfile/read/common/block/TsBlockBuilder.java   |   2 +-
 .../iotdb/tsfile/read/reader/IPageReader.java      |  25 ++
 .../tsfile/read/reader/page/AlignedPageReader.java |   7 +-
 .../iotdb/tsfile/read/reader/page/PageReader.java  |   7 +-
 10 files changed, 480 insertions(+), 11 deletions(-)

diff --git a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
index 0f923aef2e..be61fc6f91 100644
--- a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
+++ b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
@@ -397,13 +397,13 @@ public class IoTDBConfig {
   private int avgSeriesPointNumberThreshold = 100000;
 
   /** Enable inner space compaction for sequence files */
-  private boolean enableSeqSpaceCompaction = true;
+  private boolean enableSeqSpaceCompaction = false;
 
   /** Enable inner space compaction for unsequence files */
-  private boolean enableUnseqSpaceCompaction = true;
+  private boolean enableUnseqSpaceCompaction = false;
 
   /** Compact the unsequence files into the overlapped sequence files */
-  private boolean enableCrossSpaceCompaction = true;
+  private boolean enableCrossSpaceCompaction = false;
 
   /**
    * The strategy of inner space compaction task. There are just one inner space compaction strategy
@@ -560,7 +560,7 @@ public class IoTDBConfig {
   private long cacheFileReaderClearPeriod = 100000;
 
   /** the max executing time of query in ms. Unit: millisecond */
-  private long queryTimeoutThreshold = 60000;
+  private long queryTimeoutThreshold = 600000;
 
   /** the max time to live of a session in ms. Unit: millisecond */
   private int sessionTimeoutThreshold = 0;
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/source/AbstractSeriesScanOperator.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/source/AbstractSeriesScanOperator.java
new file mode 100644
index 0000000000..ae3191626a
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/source/AbstractSeriesScanOperator.java
@@ -0,0 +1,156 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.mpp.execution.operator.source;
+
+import org.apache.iotdb.db.engine.querycontext.QueryDataSource;
+import org.apache.iotdb.db.mpp.execution.operator.OperatorContext;
+import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanNodeId;
+import org.apache.iotdb.tsfile.common.conf.TSFileDescriptor;
+import org.apache.iotdb.tsfile.read.common.block.TsBlock;
+import org.apache.iotdb.tsfile.read.common.block.TsBlockBuilder;
+
+import java.io.IOException;
+import java.util.concurrent.TimeUnit;
+
+public abstract class AbstractSeriesScanOperator implements DataSourceOperator {
+
+  private final OperatorContext operatorContext;
+  private final SeriesScanUtil seriesScanUtil;
+  private final PlanNodeId sourceId;
+
+  private boolean finished = false;
+
+  private final TsBlockBuilder resultBuilder;
+
+  private final long maxReturnSize;
+
+  public AbstractSeriesScanOperator(
+      PlanNodeId sourceId,
+      SeriesScanUtil seriesScanUtil,
+      int subSensorSize,
+      OperatorContext context) {
+    this.sourceId = sourceId;
+    this.operatorContext = context;
+    this.seriesScanUtil = seriesScanUtil;
+    this.resultBuilder = seriesScanUtil.getCachedTsBlockBuilder();
+
+    // time + all value columns
+    this.maxReturnSize =
+        (1L + subSensorSize) * TSFileDescriptor.getInstance().getConfig().getPageSizeInByte();
+  }
+
+  @Override
+  public OperatorContext getOperatorContext() {
+    return operatorContext;
+  }
+
+  @Override
+  public TsBlock next() {
+    TsBlock block = resultBuilder.build();
+    resultBuilder.reset();
+    return block;
+  }
+
+  @Override
+  public boolean hasNext() {
+    try {
+      // start stopwatch
+      long maxRuntime = operatorContext.getMaxRunTime().roundTo(TimeUnit.NANOSECONDS);
+      long start = System.nanoTime();
+
+      // here use do-while to promise doing this at least once
+      do {
+        // consume page data firstly
+        if (readPageData()) {
+          continue;
+        }
+
+        // consume chunk data secondly
+        if (readChunkData()) {
+          continue;
+        }
+
+        // consume next file finally
+        if (readFileData()) {
+          continue;
+        }
+        break;
+
+      } while (System.nanoTime() - start < maxRuntime && !resultBuilder.isFull());
+
+      finished = resultBuilder.isEmpty();
+      return !finished;
+    } catch (IOException e) {
+      throw new RuntimeException("Error happened while scanning the file", e);
+    }
+  }
+
+  @Override
+  public boolean isFinished() {
+    return finished;
+  }
+
+  @Override
+  public long calculateMaxPeekMemory() {
+    return maxReturnSize;
+  }
+
+  @Override
+  public long calculateMaxReturnSize() {
+    return maxReturnSize;
+  }
+
+  @Override
+  public long calculateRetainedSizeAfterCallingNext() {
+    return 0L;
+  }
+
+  private boolean readFileData() throws IOException {
+    while (seriesScanUtil.hasNextFile()) {
+      if (readChunkData()) {
+        return true;
+      }
+    }
+    return false;
+  }
+
+  private boolean readChunkData() throws IOException {
+    while (seriesScanUtil.hasNextChunk()) {
+      if (readPageData()) {
+        return true;
+      }
+    }
+    return false;
+  }
+
+  private boolean readPageData() throws IOException {
+    return seriesScanUtil.tryToFetchDataFromPage();
+  }
+
+  @Override
+  public PlanNodeId getSourceId() {
+    return sourceId;
+  }
+
+  @Override
+  public void initQueryDataSource(QueryDataSource dataSource) {
+    seriesScanUtil.initQueryDataSource(dataSource);
+  }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/source/AlignedSeriesScanUtil.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/source/AlignedSeriesScanUtil.java
index f41456b063..6cc0689003 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/source/AlignedSeriesScanUtil.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/source/AlignedSeriesScanUtil.java
@@ -33,6 +33,7 @@ import org.apache.iotdb.tsfile.file.metadata.AlignedTimeSeriesMetadata;
 import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
 import org.apache.iotdb.tsfile.file.metadata.statistics.Statistics;
 import org.apache.iotdb.tsfile.read.common.block.TsBlock;
+import org.apache.iotdb.tsfile.read.common.block.TsBlockBuilder;
 import org.apache.iotdb.tsfile.read.filter.basic.Filter;
 import org.apache.iotdb.tsfile.read.reader.IPointReader;
 import org.apache.iotdb.tsfile.write.schema.IMeasurementSchema;
@@ -57,6 +58,7 @@ public class AlignedSeriesScanUtil extends SeriesScanUtil {
     dataTypes =
         ((AlignedPath) seriesPath)
             .getSchemaList().stream().map(IMeasurementSchema::getType).collect(Collectors.toList());
+    cachedTsBlockBuilder = new TsBlockBuilder(dataTypes);
   }
 
   @Override
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/source/SeriesScanUtil.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/source/SeriesScanUtil.java
index 22fd2cbabc..b667d08858 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/source/SeriesScanUtil.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/source/SeriesScanUtil.java
@@ -70,7 +70,7 @@ public class SeriesScanUtil {
   protected final TSDataType dataType;
 
   // inner class of SeriesReader for order purpose
-  private TimeOrderUtils orderUtils;
+  private final TimeOrderUtils orderUtils;
 
   /*
    * There is at most one is not null between timeFilter and valueFilter
@@ -121,6 +121,8 @@ public class SeriesScanUtil {
   protected boolean hasCachedNextOverlappedPage;
   protected TsBlock cachedTsBlock;
 
+  protected TsBlockBuilder cachedTsBlockBuilder;
+
   public SeriesScanUtil(
       PartialPath seriesPath,
       Set<String> allSensors,
@@ -156,6 +158,10 @@ public class SeriesScanUtil {
         new PriorityQueue<>(
             orderUtils.comparingLong(
                 versionPageReader -> orderUtils.getOrderTime(versionPageReader.getStatistics())));
+
+    if (dataType != TSDataType.VECTOR) {
+      this.cachedTsBlockBuilder = new TsBlockBuilder(Collections.singletonList(dataType));
+    }
   }
 
   public void initQueryDataSource(QueryDataSource dataSource) {
@@ -450,6 +456,259 @@ public class SeriesScanUtil {
     return firstPageReader != null;
   }
 
+  public boolean tryToFetchDataFromPage() throws IOException {
+
+    /*
+     * has overlapped data
+     */
+    if (mergeReader.hasNextTimeValuePair() || firstPageOverlapped()) {
+      if (tryToBuildFromMergeReader()) {
+        return true;
+      }
+    }
+
+    if (firstPageReader != null) {
+      buildFromPageReader();
+      return true;
+    }
+
+    /*
+     * construct first page reader
+     */
+    if (firstChunkMetadata != null) {
+      /*
+       * try to unpack all overlapped ChunkMetadata to cachedPageReaders
+       */
+      unpackAllOverlappedChunkMetadataToPageReaders(
+          orderUtils.getOverlapCheckTime(firstChunkMetadata.getStatistics()), true);
+    } else {
+      /*
+       * first chunk metadata is already unpacked, consume cached pages
+       */
+      initFirstPageReader();
+    }
+
+    if (tryToBuildFromOverlappedPage()) {
+      return true;
+    }
+
+    // make sure firstPageReader won't be null while the unSeqPageReaders has more cached page
+    // readers
+    while (firstPageReader == null && (!seqPageReaders.isEmpty() || !unSeqPageReaders.isEmpty())) {
+
+      initFirstPageReader();
+
+      if (tryToBuildFromOverlappedPage()) {
+        return true;
+      }
+    }
+
+    if (firstPageReader != null) {
+      buildFromPageReader();
+      return true;
+    }
+    return false;
+  }
+
+  private boolean tryToBuildFromOverlappedPage() throws IOException {
+    if (firstPageOverlapped()) {
+      // next page is overlapped, read overlapped data and cache it
+      return tryToBuildFromMergeReader();
+    }
+    return false;
+  }
+
+  public void buildFromPageReader() throws IOException {
+    /*
+     * next page is not overlapped, push down value filter if it exists
+     */
+    if (valueFilter != null) {
+      firstPageReader.setFilter(valueFilter);
+    }
+    firstPageReader.getAllSatisfiedPageData(orderUtils.getAscending(), cachedTsBlockBuilder);
+    firstPageReader = null;
+  }
+
+  private boolean tryToBuildFromMergeReader() throws IOException {
+    int rawSize = cachedTsBlockBuilder.getPositionCount();
+
+    tryToPutAllDirectlyOverlappedUnseqPageReadersIntoMergeReader();
+
+    while (true) {
+
+      // may has overlapped data
+      if (mergeReader.hasNextTimeValuePair()) {
+
+        // TODO we still need to consider data type, ascending and descending here
+        TimeColumnBuilder timeBuilder = cachedTsBlockBuilder.getTimeColumnBuilder();
+        long currentPageEndPointTime = mergeReader.getCurrentReadStopTime();
+        while (mergeReader.hasNextTimeValuePair()) {
+
+          /*
+           * get current first point in mergeReader, this maybe overlapped later
+           */
+          TimeValuePair timeValuePair = mergeReader.currentTimeValuePair();
+
+          if (orderUtils.isExcessEndpoint(timeValuePair.getTimestamp(), currentPageEndPointTime)) {
+            /*
+             * when the merged point excesses the currentPageEndPointTime, we have read all overlapped data before currentPageEndPointTime
+             * 1. has cached batch data, we don't need to read more data, just use the cached data later
+             * 2. has first page reader, which means first page reader last endTime < currentTimeValuePair.getTimestamp(),
+             * we could just use the first page reader later
+             * 3. sequence page reader is not empty, which means first page reader last endTime < currentTimeValuePair.getTimestamp(),
+             * we could use the first sequence page reader later
+             */
+            if (cachedTsBlockBuilder.getPositionCount() > rawSize
+                || firstPageReader != null
+                || !seqPageReaders.isEmpty()) {
+              break;
+            }
+            // so, we don't have other data except mergeReader
+            currentPageEndPointTime = mergeReader.getCurrentReadStopTime();
+          }
+
+          // unpack all overlapped data for the first timeValuePair
+          unpackAllOverlappedTsFilesToTimeSeriesMetadata(timeValuePair.getTimestamp());
+          unpackAllOverlappedTimeSeriesMetadataToCachedChunkMetadata(
+              timeValuePair.getTimestamp(), false);
+          unpackAllOverlappedChunkMetadataToPageReaders(timeValuePair.getTimestamp(), false);
+          unpackAllOverlappedUnseqPageReadersToMergeReader(timeValuePair.getTimestamp());
+
+          // update if there are unpacked unSeqPageReaders
+          timeValuePair = mergeReader.currentTimeValuePair();
+
+          // from now, the unsequence reader is all unpacked, so we don't need to consider it
+          // we has first page reader now
+          if (firstPageReader != null) {
+            // if current timeValuePair excesses the first page reader's end time, we just use the
+            // cached data
+            if ((orderUtils.getAscending()
+                    && timeValuePair.getTimestamp() > firstPageReader.getStatistics().getEndTime())
+                || (!orderUtils.getAscending()
+                    && timeValuePair.getTimestamp()
+                        < firstPageReader.getStatistics().getStartTime())) {
+              return cachedTsBlockBuilder.getPositionCount() > rawSize;
+            } else if (orderUtils.isOverlapped(
+                timeValuePair.getTimestamp(), firstPageReader.getStatistics())) {
+              // current timeValuePair is overlapped with firstPageReader, add it to merged reader
+              // and update endTime to the max end time
+              mergeReader.addReader(
+                  getPointReader(
+                      firstPageReader.getAllSatisfiedPageData(orderUtils.getAscending())),
+                  firstPageReader.version,
+                  orderUtils.getOverlapCheckTime(firstPageReader.getStatistics()),
+                  context);
+              currentPageEndPointTime =
+                  updateEndPointTime(currentPageEndPointTime, firstPageReader);
+              firstPageReader = null;
+            }
+          }
+
+          // the seq page readers is not empty, just like first page reader
+          if (!seqPageReaders.isEmpty()) {
+            if ((orderUtils.getAscending()
+                    && timeValuePair.getTimestamp()
+                        > seqPageReaders.get(0).getStatistics().getEndTime())
+                || (!orderUtils.getAscending()
+                    && timeValuePair.getTimestamp()
+                        < seqPageReaders.get(0).getStatistics().getStartTime())) {
+              return cachedTsBlockBuilder.getPositionCount() > rawSize;
+            } else if (orderUtils.isOverlapped(
+                timeValuePair.getTimestamp(), seqPageReaders.get(0).getStatistics())) {
+              VersionPageReader pageReader = seqPageReaders.remove(0);
+              mergeReader.addReader(
+                  getPointReader(pageReader.getAllSatisfiedPageData(orderUtils.getAscending())),
+                  pageReader.version,
+                  orderUtils.getOverlapCheckTime(pageReader.getStatistics()),
+                  context);
+              currentPageEndPointTime = updateEndPointTime(currentPageEndPointTime, pageReader);
+            }
+          }
+
+          /*
+           * get the latest first point in mergeReader
+           */
+          timeValuePair = mergeReader.nextTimeValuePair();
+
+          Object valueForFilter = timeValuePair.getValue().getValue();
+
+          // TODO fix value filter firstNotNullObject, currently, if it's a value filter, it will
+          // only accept AlignedPath with only one sub sensor
+          if (timeValuePair.getValue().getDataType() == TSDataType.VECTOR) {
+            for (TsPrimitiveType tsPrimitiveType : timeValuePair.getValue().getVector()) {
+              if (tsPrimitiveType != null) {
+                valueForFilter = tsPrimitiveType.getValue();
+                break;
+              }
+            }
+          }
+
+          if (valueFilter == null
+              || valueFilter.satisfy(timeValuePair.getTimestamp(), valueForFilter)) {
+            timeBuilder.writeLong(timeValuePair.getTimestamp());
+            switch (dataType) {
+              case BOOLEAN:
+                cachedTsBlockBuilder
+                    .getColumnBuilder(0)
+                    .writeBoolean(timeValuePair.getValue().getBoolean());
+                break;
+              case INT32:
+                cachedTsBlockBuilder
+                    .getColumnBuilder(0)
+                    .writeInt(timeValuePair.getValue().getInt());
+                break;
+              case INT64:
+                cachedTsBlockBuilder
+                    .getColumnBuilder(0)
+                    .writeLong(timeValuePair.getValue().getLong());
+                break;
+              case FLOAT:
+                cachedTsBlockBuilder
+                    .getColumnBuilder(0)
+                    .writeFloat(timeValuePair.getValue().getFloat());
+                break;
+              case DOUBLE:
+                cachedTsBlockBuilder
+                    .getColumnBuilder(0)
+                    .writeDouble(timeValuePair.getValue().getDouble());
+                break;
+              case TEXT:
+                cachedTsBlockBuilder
+                    .getColumnBuilder(0)
+                    .writeBinary(timeValuePair.getValue().getBinary());
+                break;
+              case VECTOR:
+                TsPrimitiveType[] values = timeValuePair.getValue().getVector();
+                for (int i = 0; i < values.length; i++) {
+                  if (values[i] == null) {
+                    cachedTsBlockBuilder.getColumnBuilder(i).appendNull();
+                  } else {
+                    cachedTsBlockBuilder.getColumnBuilder(i).writeTsPrimitiveType(values[i]);
+                  }
+                }
+                break;
+              default:
+                throw new UnSupportedDataTypeException(String.valueOf(dataType));
+            }
+            cachedTsBlockBuilder.declarePosition();
+          }
+        }
+
+        /*
+         * if current overlapped page has valid data, return, otherwise read next overlapped page
+         */
+        if (cachedTsBlockBuilder.getPositionCount() > rawSize) {
+          return true;
+        } else if (mergeReader.hasNextTimeValuePair()) {
+          // condition: seqPage.endTime < mergeReader.currentTime
+          return false;
+        }
+      } else {
+        return false;
+      }
+    }
+  }
+
   private boolean isExistOverlappedPage() throws IOException {
     if (firstPageOverlapped()) {
       /*
@@ -1088,6 +1347,10 @@ public class SeriesScanUtil {
     return orderUtils;
   }
 
+  public TsBlockBuilder getCachedTsBlockBuilder() {
+    return cachedTsBlockBuilder;
+  }
+
   protected class VersionPageReader {
 
     protected PriorityMergeReader.MergeReaderPriority version;
@@ -1131,6 +1394,10 @@ public class SeriesScanUtil {
       return tsBlock;
     }
 
+    void getAllSatisfiedPageData(boolean ascending, TsBlockBuilder builder) throws IOException {
+      data.getAllSatisfiedData(ascending, builder);
+    }
+
     void setFilter(Filter filter) {
       data.setFilter(filter);
     }
diff --git a/server/src/main/java/org/apache/iotdb/db/query/reader/chunk/MemAlignedPageReader.java b/server/src/main/java/org/apache/iotdb/db/query/reader/chunk/MemAlignedPageReader.java
index 41f4d7c1fd..ae13babf9f 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/reader/chunk/MemAlignedPageReader.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/reader/chunk/MemAlignedPageReader.java
@@ -88,7 +88,12 @@ public class MemAlignedPageReader implements IPageReader, IAlignedPageReader {
   @Override
   public TsBlock getAllSatisfiedData() {
     builder.reset();
+    writeDataToBuilder(builder);
+    return builder.build();
+  }
 
+  @Override
+  public void writeDataToBuilder(TsBlockBuilder builder) {
     boolean[] satisfyInfo = new boolean[tsBlock.getPositionCount()];
 
     for (int row = 0; row < tsBlock.getPositionCount(); row++) {
@@ -130,8 +135,6 @@ public class MemAlignedPageReader implements IPageReader, IAlignedPageReader {
         }
       }
     }
-
-    return builder.build();
   }
 
   @Override
diff --git a/server/src/main/java/org/apache/iotdb/db/query/reader/chunk/MemPageReader.java b/server/src/main/java/org/apache/iotdb/db/query/reader/chunk/MemPageReader.java
index 0baf315eff..85f08de1c8 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/reader/chunk/MemPageReader.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/reader/chunk/MemPageReader.java
@@ -92,6 +92,13 @@ public class MemPageReader implements IPageReader {
   public TsBlock getAllSatisfiedData() {
     TSDataType dataType = chunkMetadata.getDataType();
     TsBlockBuilder builder = new TsBlockBuilder(Collections.singletonList(dataType));
+    writeDataToBuilder(builder);
+    return builder.build();
+  }
+
+  @Override
+  public void writeDataToBuilder(TsBlockBuilder builder) {
+    TSDataType dataType = chunkMetadata.getDataType();
     TimeColumnBuilder timeBuilder = builder.getTimeColumnBuilder();
     ColumnBuilder valueBuilder = builder.getColumnBuilder(0);
     switch (dataType) {
@@ -164,7 +171,6 @@ public class MemPageReader implements IPageReader {
       default:
         throw new UnSupportedDataTypeException(String.valueOf(dataType));
     }
-    return builder.build();
   }
 
   @Override
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/block/TsBlockBuilder.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/block/TsBlockBuilder.java
index d4152d4ddb..c309835a09 100644
--- a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/block/TsBlockBuilder.java
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/block/TsBlockBuilder.java
@@ -259,7 +259,7 @@ public class TsBlockBuilder {
   }
 
   public boolean isFull() {
-    return declaredPositions == MAX_LINE_NUMBER || tsBlockBuilderStatus.isFull();
+    return declaredPositions >= MAX_LINE_NUMBER || tsBlockBuilderStatus.isFull();
   }
 
   public boolean isEmpty() {
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/reader/IPageReader.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/reader/IPageReader.java
index a68f4590b1..a967cae19c 100644
--- a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/reader/IPageReader.java
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/reader/IPageReader.java
@@ -22,6 +22,9 @@ import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
 import org.apache.iotdb.tsfile.file.metadata.statistics.Statistics;
 import org.apache.iotdb.tsfile.read.common.BatchData;
 import org.apache.iotdb.tsfile.read.common.block.TsBlock;
+import org.apache.iotdb.tsfile.read.common.block.TsBlockBuilder;
+import org.apache.iotdb.tsfile.read.common.block.column.ColumnBuilder;
+import org.apache.iotdb.tsfile.read.common.block.column.TimeColumnBuilder;
 import org.apache.iotdb.tsfile.read.filter.basic.Filter;
 
 import java.io.IOException;
@@ -37,6 +40,28 @@ public interface IPageReader {
 
   TsBlock getAllSatisfiedData() throws IOException;
 
+  void writeDataToBuilder(TsBlockBuilder builder) throws IOException;
+
+  default void getAllSatisfiedData(boolean ascending, TsBlockBuilder builder) throws IOException {
+    if (ascending) {
+      writeDataToBuilder(builder);
+    } else {
+      TimeColumnBuilder timeColumnBuilder = builder.getTimeColumnBuilder();
+      ColumnBuilder[] valueColumnBuilders = builder.getValueColumnBuilders();
+      int columnNum = valueColumnBuilders.length;
+
+      TsBlock tsBlock = getAllSatisfiedData();
+      tsBlock.reverse();
+      for (int i = 0, size = tsBlock.getPositionCount(); i < size; i++) {
+        timeColumnBuilder.write(tsBlock.getTimeColumn(), i);
+        for (int columnIndex = 0; columnIndex < columnNum; columnIndex++) {
+          valueColumnBuilders[columnIndex].write(tsBlock.getColumn(columnIndex), i);
+        }
+        builder.declarePosition();
+      }
+    }
+  }
+
   Statistics getStatistics();
 
   void setFilter(Filter filter);
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/reader/page/AlignedPageReader.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/reader/page/AlignedPageReader.java
index 3f076906bb..faae955198 100644
--- a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/reader/page/AlignedPageReader.java
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/reader/page/AlignedPageReader.java
@@ -116,6 +116,12 @@ public class AlignedPageReader implements IPageReader, IAlignedPageReader {
   @Override
   public TsBlock getAllSatisfiedData() throws IOException {
     builder.reset();
+    writeDataToBuilder(builder);
+    return builder.build();
+  }
+
+  @Override
+  public void writeDataToBuilder(TsBlockBuilder builder) throws IOException {
     long[] timeBatch = timePageReader.getNextTimeBatch();
 
     // if all the sub sensors' value are null in current row, just discard it
@@ -189,7 +195,6 @@ public class AlignedPageReader implements IPageReader, IAlignedPageReader {
         }
       }
     }
-    return builder.build();
   }
 
   public void setDeleteIntervalList(List<List<TimeRange>> list) {
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/reader/page/PageReader.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/reader/page/PageReader.java
index e1fce8ff65..190bb11b5c 100644
--- a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/reader/page/PageReader.java
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/reader/page/PageReader.java
@@ -161,6 +161,12 @@ public class PageReader implements IPageReader {
   @Override
   public TsBlock getAllSatisfiedData() throws IOException {
     TsBlockBuilder builder = new TsBlockBuilder(Collections.singletonList(dataType));
+    writeDataToBuilder(builder);
+    return builder.build();
+  }
+
+  @Override
+  public void writeDataToBuilder(TsBlockBuilder builder) throws IOException {
     TimeColumnBuilder timeBuilder = builder.getTimeColumnBuilder();
     ColumnBuilder valueBuilder = builder.getColumnBuilder(0);
     if (filter == null || filter.satisfy(getStatistics())) {
@@ -235,7 +241,6 @@ public class PageReader implements IPageReader {
           throw new UnSupportedDataTypeException(String.valueOf(dataType));
       }
     }
-    return builder.build();
   }
 
   @Override


[iotdb] 02/06: revert config modify

Posted by hu...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

hui pushed a commit to branch lmh/scanOpBatchProcess1.0
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit 3bd94b12ec42f53449910efa9280f8fb2dad8a19
Author: Minghui Liu <li...@foxmail.com>
AuthorDate: Tue Nov 29 11:56:33 2022 +0800

    revert config modify
---
 server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java | 8 ++++----
 1 file changed, 4 insertions(+), 4 deletions(-)

diff --git a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
index be61fc6f91..0f923aef2e 100644
--- a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
+++ b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
@@ -397,13 +397,13 @@ public class IoTDBConfig {
   private int avgSeriesPointNumberThreshold = 100000;
 
   /** Enable inner space compaction for sequence files */
-  private boolean enableSeqSpaceCompaction = false;
+  private boolean enableSeqSpaceCompaction = true;
 
   /** Enable inner space compaction for unsequence files */
-  private boolean enableUnseqSpaceCompaction = false;
+  private boolean enableUnseqSpaceCompaction = true;
 
   /** Compact the unsequence files into the overlapped sequence files */
-  private boolean enableCrossSpaceCompaction = false;
+  private boolean enableCrossSpaceCompaction = true;
 
   /**
    * The strategy of inner space compaction task. There are just one inner space compaction strategy
@@ -560,7 +560,7 @@ public class IoTDBConfig {
   private long cacheFileReaderClearPeriod = 100000;
 
   /** the max executing time of query in ms. Unit: millisecond */
-  private long queryTimeoutThreshold = 600000;
+  private long queryTimeoutThreshold = 60000;
 
   /** the max time to live of a session in ms. Unit: millisecond */
   private int sessionTimeoutThreshold = 0;