You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by hu...@apache.org on 2022/12/01 03:01:56 UTC

[iotdb] 05/06: fix maxReturnSize

This is an automated email from the ASF dual-hosted git repository.

hui pushed a commit to branch lmh/scanOpBatchProcess1.0
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit ceb9331558b142f3d470916372f32e5653d73f5c
Author: Minghui Liu <li...@foxmail.com>
AuthorDate: Wed Nov 30 22:56:58 2022 +0800

    fix maxReturnSize
---
 .../source/AbstractSeriesScanOperator.java         |  10 +-
 .../operator/source/AlignedSeriesScanOperator.java | 180 +--------------------
 .../operator/source/SeriesScanOperator.java        | 174 +-------------------
 3 files changed, 16 insertions(+), 348 deletions(-)

diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/source/AbstractSeriesScanOperator.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/source/AbstractSeriesScanOperator.java
index ae3191626a..98c3dc7d3f 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/source/AbstractSeriesScanOperator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/source/AbstractSeriesScanOperator.java
@@ -22,7 +22,6 @@ package org.apache.iotdb.db.mpp.execution.operator.source;
 import org.apache.iotdb.db.engine.querycontext.QueryDataSource;
 import org.apache.iotdb.db.mpp.execution.operator.OperatorContext;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanNodeId;
-import org.apache.iotdb.tsfile.common.conf.TSFileDescriptor;
 import org.apache.iotdb.tsfile.read.common.block.TsBlock;
 import org.apache.iotdb.tsfile.read.common.block.TsBlockBuilder;
 
@@ -44,16 +43,13 @@ public abstract class AbstractSeriesScanOperator implements DataSourceOperator {
   public AbstractSeriesScanOperator(
       PlanNodeId sourceId,
       SeriesScanUtil seriesScanUtil,
-      int subSensorSize,
-      OperatorContext context) {
+      OperatorContext context,
+      long maxReturnSize) {
     this.sourceId = sourceId;
     this.operatorContext = context;
     this.seriesScanUtil = seriesScanUtil;
     this.resultBuilder = seriesScanUtil.getCachedTsBlockBuilder();
-
-    // time + all value columns
-    this.maxReturnSize =
-        (1L + subSensorSize) * TSFileDescriptor.getInstance().getConfig().getPageSizeInByte();
+    this.maxReturnSize = maxReturnSize;
   }
 
   @Override
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/source/AlignedSeriesScanOperator.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/source/AlignedSeriesScanOperator.java
index 9caaeeac60..482bcb5001 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/source/AlignedSeriesScanOperator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/source/AlignedSeriesScanOperator.java
@@ -19,32 +19,14 @@
 package org.apache.iotdb.db.mpp.execution.operator.source;
 
 import org.apache.iotdb.commons.path.AlignedPath;
-import org.apache.iotdb.db.engine.querycontext.QueryDataSource;
 import org.apache.iotdb.db.mpp.execution.operator.OperatorContext;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanNodeId;
 import org.apache.iotdb.tsfile.common.conf.TSFileDescriptor;
-import org.apache.iotdb.tsfile.read.common.block.TsBlock;
-import org.apache.iotdb.tsfile.read.common.block.TsBlockBuilder;
-import org.apache.iotdb.tsfile.read.common.block.column.Column;
-import org.apache.iotdb.tsfile.read.common.block.column.ColumnBuilder;
-import org.apache.iotdb.tsfile.read.common.block.column.TimeColumn;
-import org.apache.iotdb.tsfile.read.common.block.column.TimeColumnBuilder;
 import org.apache.iotdb.tsfile.read.filter.basic.Filter;
 
-import java.io.IOException;
 import java.util.HashSet;
-import java.util.concurrent.TimeUnit;
 
-public class AlignedSeriesScanOperator implements DataSourceOperator {
-
-  private final OperatorContext operatorContext;
-  private final AlignedSeriesScanUtil seriesScanUtil;
-  private final PlanNodeId sourceId;
-
-  private final TsBlockBuilder builder;
-  private boolean finished = false;
-
-  private final long maxReturnSize;
+public class AlignedSeriesScanOperator extends AbstractSeriesScanOperator {
 
   public AlignedSeriesScanOperator(
       PlanNodeId sourceId,
@@ -53,166 +35,18 @@ public class AlignedSeriesScanOperator implements DataSourceOperator {
       Filter timeFilter,
       Filter valueFilter,
       boolean ascending) {
-    this.sourceId = sourceId;
-    this.operatorContext = context;
-    this.seriesScanUtil =
+    super(
+        sourceId,
         new AlignedSeriesScanUtil(
             seriesPath,
             new HashSet<>(seriesPath.getMeasurementList()),
             context.getInstanceContext(),
             timeFilter,
             valueFilter,
-            ascending);
-    // time + all value columns
-    this.maxReturnSize =
+            ascending),
+        context,
+        // time + all value columns
         (1L + seriesPath.getMeasurementList().size())
-            * TSFileDescriptor.getInstance().getConfig().getPageSizeInByte();
-    this.builder = new TsBlockBuilder(seriesScanUtil.getTsDataTypeList());
-  }
-
-  @Override
-  public OperatorContext getOperatorContext() {
-    return operatorContext;
-  }
-
-  @Override
-  public TsBlock next() {
-    TsBlock block = builder.build();
-    builder.reset();
-    return block;
-  }
-
-  @Override
-  public boolean hasNext() {
-    try {
-
-      // start stopwatch
-      long maxRuntime = operatorContext.getMaxRunTime().roundTo(TimeUnit.NANOSECONDS);
-      long start = System.nanoTime();
-
-      // here use do-while to promise doing this at least once
-      do {
-        /*
-         * consume page data firstly
-         */
-        if (readPageData()) {
-          continue;
-        }
-
-        /*
-         * consume chunk data secondly
-         */
-        if (readChunkData()) {
-          continue;
-        }
-
-        /*
-         * consume next file finally
-         */
-        if (readFileData()) {
-          continue;
-        }
-        break;
-
-      } while (System.nanoTime() - start < maxRuntime && !builder.isFull());
-
-      finished = builder.isEmpty();
-
-      return !finished;
-    } catch (IOException e) {
-      throw new RuntimeException("Error happened while scanning the file", e);
-    }
-  }
-
-  @Override
-  public boolean isFinished() {
-    return finished;
-  }
-
-  @Override
-  public long calculateMaxPeekMemory() {
-    return maxReturnSize;
-  }
-
-  @Override
-  public long calculateMaxReturnSize() {
-    return maxReturnSize;
-  }
-
-  @Override
-  public long calculateRetainedSizeAfterCallingNext() {
-    return 0L;
-  }
-
-  private boolean readFileData() throws IOException {
-    while (seriesScanUtil.hasNextFile()) {
-      if (readChunkData()) {
-        return true;
-      }
-    }
-    return false;
-  }
-
-  private boolean readChunkData() throws IOException {
-    while (seriesScanUtil.hasNextChunk()) {
-      if (readPageData()) {
-        return true;
-      }
-    }
-    return false;
-  }
-
-  private boolean readPageData() throws IOException {
-    while (seriesScanUtil.hasNextPage()) {
-      TsBlock tsBlock = seriesScanUtil.nextPage();
-      if (!isEmpty(tsBlock)) {
-        appendToBuilder(tsBlock);
-        return true;
-      }
-    }
-    return false;
-  }
-
-  private void appendToBuilder(TsBlock tsBlock) {
-    int size = tsBlock.getPositionCount();
-    TimeColumnBuilder timeColumnBuilder = builder.getTimeColumnBuilder();
-    TimeColumn timeColumn = tsBlock.getTimeColumn();
-    for (int i = 0; i < size; i++) {
-      timeColumnBuilder.writeLong(timeColumn.getLong(i));
-      builder.declarePosition();
-    }
-    for (int columnIndex = 0, columnSize = tsBlock.getValueColumnCount();
-        columnIndex < columnSize;
-        columnIndex++) {
-      ColumnBuilder columnBuilder = builder.getColumnBuilder(columnIndex);
-      Column column = tsBlock.getColumn(columnIndex);
-      if (column.mayHaveNull()) {
-        for (int i = 0; i < size; i++) {
-          if (column.isNull(i)) {
-            columnBuilder.appendNull();
-          } else {
-            columnBuilder.write(column, i);
-          }
-        }
-      } else {
-        for (int i = 0; i < size; i++) {
-          columnBuilder.write(column, i);
-        }
-      }
-    }
-  }
-
-  private boolean isEmpty(TsBlock tsBlock) {
-    return tsBlock == null || tsBlock.isEmpty();
-  }
-
-  @Override
-  public PlanNodeId getSourceId() {
-    return sourceId;
-  }
-
-  @Override
-  public void initQueryDataSource(QueryDataSource dataSource) {
-    seriesScanUtil.initQueryDataSource(dataSource);
+            * TSFileDescriptor.getInstance().getConfig().getPageSizeInByte());
   }
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/source/SeriesScanOperator.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/source/SeriesScanOperator.java
index fd831fcd6a..ead57a4943 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/source/SeriesScanOperator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/source/SeriesScanOperator.java
@@ -19,33 +19,15 @@
 package org.apache.iotdb.db.mpp.execution.operator.source;
 
 import org.apache.iotdb.commons.path.PartialPath;
-import org.apache.iotdb.db.engine.querycontext.QueryDataSource;
 import org.apache.iotdb.db.mpp.execution.operator.OperatorContext;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanNodeId;
 import org.apache.iotdb.tsfile.common.conf.TSFileDescriptor;
 import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
-import org.apache.iotdb.tsfile.read.common.block.TsBlock;
-import org.apache.iotdb.tsfile.read.common.block.TsBlockBuilder;
-import org.apache.iotdb.tsfile.read.common.block.column.Column;
-import org.apache.iotdb.tsfile.read.common.block.column.ColumnBuilder;
-import org.apache.iotdb.tsfile.read.common.block.column.TimeColumn;
-import org.apache.iotdb.tsfile.read.common.block.column.TimeColumnBuilder;
 import org.apache.iotdb.tsfile.read.filter.basic.Filter;
 
-import java.io.IOException;
 import java.util.Set;
-import java.util.concurrent.TimeUnit;
 
-public class SeriesScanOperator implements DataSourceOperator {
-
-  private final OperatorContext operatorContext;
-  private final SeriesScanUtil seriesScanUtil;
-  private final PlanNodeId sourceId;
-  private final TsBlockBuilder builder;
-
-  private boolean finished = false;
-
-  private final long maxReturnSize;
+public class SeriesScanOperator extends AbstractSeriesScanOperator {
 
   public SeriesScanOperator(
       PlanNodeId sourceId,
@@ -56,9 +38,8 @@ public class SeriesScanOperator implements DataSourceOperator {
       Filter timeFilter,
       Filter valueFilter,
       boolean ascending) {
-    this.sourceId = sourceId;
-    this.operatorContext = context;
-    this.seriesScanUtil =
+    super(
+        sourceId,
         new SeriesScanUtil(
             seriesPath,
             allSensors,
@@ -66,151 +47,8 @@ public class SeriesScanOperator implements DataSourceOperator {
             context.getInstanceContext(),
             timeFilter,
             valueFilter,
-            ascending);
-    this.maxReturnSize = TSFileDescriptor.getInstance().getConfig().getPageSizeInByte();
-    this.builder = new TsBlockBuilder(seriesScanUtil.getTsDataTypeList());
-  }
-
-  @Override
-  public OperatorContext getOperatorContext() {
-    return operatorContext;
-  }
-
-  @Override
-  public TsBlock next() {
-    TsBlock block = builder.build();
-    builder.reset();
-    return block;
-  }
-
-  @Override
-  public boolean hasNext() {
-    try {
-
-      // start stopwatch
-      long maxRuntime = operatorContext.getMaxRunTime().roundTo(TimeUnit.NANOSECONDS);
-      long start = System.nanoTime();
-
-      // here use do-while to promise doing this at least once
-      do {
-        /*
-         * consume page data firstly
-         */
-        if (readPageData()) {
-          continue;
-        }
-
-        /*
-         * consume chunk data secondly
-         */
-        if (readChunkData()) {
-          continue;
-        }
-
-        /*
-         * consume next file finally
-         */
-        if (readFileData()) {
-          continue;
-        }
-        break;
-
-      } while (System.nanoTime() - start < maxRuntime && !builder.isFull());
-
-      finished = builder.isEmpty();
-
-      return !finished;
-    } catch (IOException e) {
-      throw new RuntimeException("Error happened while scanning the file", e);
-    }
-  }
-
-  @Override
-  public boolean isFinished() {
-    return finished;
-  }
-
-  @Override
-  public long calculateMaxPeekMemory() {
-    return maxReturnSize;
-  }
-
-  @Override
-  public long calculateMaxReturnSize() {
-    return maxReturnSize;
-  }
-
-  @Override
-  public long calculateRetainedSizeAfterCallingNext() {
-    return 0L;
-  }
-
-  private boolean readFileData() throws IOException {
-    while (seriesScanUtil.hasNextFile()) {
-      if (readChunkData()) {
-        return true;
-      }
-    }
-    return false;
-  }
-
-  private boolean readChunkData() throws IOException {
-    while (seriesScanUtil.hasNextChunk()) {
-      if (readPageData()) {
-        return true;
-      }
-    }
-    return false;
-  }
-
-  private boolean readPageData() throws IOException {
-    while (seriesScanUtil.hasNextPage()) {
-      TsBlock tsBlock = seriesScanUtil.nextPage();
-
-      if (!isEmpty(tsBlock)) {
-        appendToBuilder(tsBlock);
-        return true;
-      }
-    }
-    return false;
-  }
-
-  private void appendToBuilder(TsBlock tsBlock) {
-    TimeColumnBuilder timeColumnBuilder = builder.getTimeColumnBuilder();
-    TimeColumn timeColumn = tsBlock.getTimeColumn();
-    ColumnBuilder columnBuilder = builder.getColumnBuilder(0);
-    Column column = tsBlock.getColumn(0);
-
-    if (column.mayHaveNull()) {
-      for (int i = 0, size = tsBlock.getPositionCount(); i < size; i++) {
-        timeColumnBuilder.writeLong(timeColumn.getLong(i));
-        if (column.isNull(i)) {
-          columnBuilder.appendNull();
-        } else {
-          columnBuilder.write(column, i);
-        }
-        builder.declarePosition();
-      }
-    } else {
-      for (int i = 0, size = tsBlock.getPositionCount(); i < size; i++) {
-        timeColumnBuilder.writeLong(timeColumn.getLong(i));
-        columnBuilder.write(column, i);
-        builder.declarePosition();
-      }
-    }
-  }
-
-  private boolean isEmpty(TsBlock tsBlock) {
-    return tsBlock == null || tsBlock.isEmpty();
-  }
-
-  @Override
-  public PlanNodeId getSourceId() {
-    return sourceId;
-  }
-
-  @Override
-  public void initQueryDataSource(QueryDataSource dataSource) {
-    seriesScanUtil.initQueryDataSource(dataSource);
+            ascending),
+        context,
+        TSFileDescriptor.getInstance().getConfig().getPageSizeInByte());
   }
 }