You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by hu...@apache.org on 2022/12/01 03:01:56 UTC
[iotdb] 05/06: fix maxReturnSize
This is an automated email from the ASF dual-hosted git repository.
hui pushed a commit to branch lmh/scanOpBatchProcess1.0
in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit ceb9331558b142f3d470916372f32e5653d73f5c
Author: Minghui Liu <li...@foxmail.com>
AuthorDate: Wed Nov 30 22:56:58 2022 +0800
fix maxReturnSize
---
.../source/AbstractSeriesScanOperator.java | 10 +-
.../operator/source/AlignedSeriesScanOperator.java | 180 +--------------------
.../operator/source/SeriesScanOperator.java | 174 +-------------------
3 files changed, 16 insertions(+), 348 deletions(-)
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/source/AbstractSeriesScanOperator.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/source/AbstractSeriesScanOperator.java
index ae3191626a..98c3dc7d3f 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/source/AbstractSeriesScanOperator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/source/AbstractSeriesScanOperator.java
@@ -22,7 +22,6 @@ package org.apache.iotdb.db.mpp.execution.operator.source;
import org.apache.iotdb.db.engine.querycontext.QueryDataSource;
import org.apache.iotdb.db.mpp.execution.operator.OperatorContext;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanNodeId;
-import org.apache.iotdb.tsfile.common.conf.TSFileDescriptor;
import org.apache.iotdb.tsfile.read.common.block.TsBlock;
import org.apache.iotdb.tsfile.read.common.block.TsBlockBuilder;
@@ -44,16 +43,13 @@ public abstract class AbstractSeriesScanOperator implements DataSourceOperator {
public AbstractSeriesScanOperator(
PlanNodeId sourceId,
SeriesScanUtil seriesScanUtil,
- int subSensorSize,
- OperatorContext context) {
+ OperatorContext context,
+ long maxReturnSize) {
this.sourceId = sourceId;
this.operatorContext = context;
this.seriesScanUtil = seriesScanUtil;
this.resultBuilder = seriesScanUtil.getCachedTsBlockBuilder();
-
- // time + all value columns
- this.maxReturnSize =
- (1L + subSensorSize) * TSFileDescriptor.getInstance().getConfig().getPageSizeInByte();
+ this.maxReturnSize = maxReturnSize;
}
@Override
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/source/AlignedSeriesScanOperator.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/source/AlignedSeriesScanOperator.java
index 9caaeeac60..482bcb5001 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/source/AlignedSeriesScanOperator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/source/AlignedSeriesScanOperator.java
@@ -19,32 +19,14 @@
package org.apache.iotdb.db.mpp.execution.operator.source;
import org.apache.iotdb.commons.path.AlignedPath;
-import org.apache.iotdb.db.engine.querycontext.QueryDataSource;
import org.apache.iotdb.db.mpp.execution.operator.OperatorContext;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanNodeId;
import org.apache.iotdb.tsfile.common.conf.TSFileDescriptor;
-import org.apache.iotdb.tsfile.read.common.block.TsBlock;
-import org.apache.iotdb.tsfile.read.common.block.TsBlockBuilder;
-import org.apache.iotdb.tsfile.read.common.block.column.Column;
-import org.apache.iotdb.tsfile.read.common.block.column.ColumnBuilder;
-import org.apache.iotdb.tsfile.read.common.block.column.TimeColumn;
-import org.apache.iotdb.tsfile.read.common.block.column.TimeColumnBuilder;
import org.apache.iotdb.tsfile.read.filter.basic.Filter;
-import java.io.IOException;
import java.util.HashSet;
-import java.util.concurrent.TimeUnit;
-public class AlignedSeriesScanOperator implements DataSourceOperator {
-
- private final OperatorContext operatorContext;
- private final AlignedSeriesScanUtil seriesScanUtil;
- private final PlanNodeId sourceId;
-
- private final TsBlockBuilder builder;
- private boolean finished = false;
-
- private final long maxReturnSize;
+public class AlignedSeriesScanOperator extends AbstractSeriesScanOperator {
public AlignedSeriesScanOperator(
PlanNodeId sourceId,
@@ -53,166 +35,18 @@ public class AlignedSeriesScanOperator implements DataSourceOperator {
Filter timeFilter,
Filter valueFilter,
boolean ascending) {
- this.sourceId = sourceId;
- this.operatorContext = context;
- this.seriesScanUtil =
+ super(
+ sourceId,
new AlignedSeriesScanUtil(
seriesPath,
new HashSet<>(seriesPath.getMeasurementList()),
context.getInstanceContext(),
timeFilter,
valueFilter,
- ascending);
- // time + all value columns
- this.maxReturnSize =
+ ascending),
+ context,
+ // time + all value columns
(1L + seriesPath.getMeasurementList().size())
- * TSFileDescriptor.getInstance().getConfig().getPageSizeInByte();
- this.builder = new TsBlockBuilder(seriesScanUtil.getTsDataTypeList());
- }
-
- @Override
- public OperatorContext getOperatorContext() {
- return operatorContext;
- }
-
- @Override
- public TsBlock next() {
- TsBlock block = builder.build();
- builder.reset();
- return block;
- }
-
- @Override
- public boolean hasNext() {
- try {
-
- // start stopwatch
- long maxRuntime = operatorContext.getMaxRunTime().roundTo(TimeUnit.NANOSECONDS);
- long start = System.nanoTime();
-
- // here use do-while to promise doing this at least once
- do {
- /*
- * consume page data firstly
- */
- if (readPageData()) {
- continue;
- }
-
- /*
- * consume chunk data secondly
- */
- if (readChunkData()) {
- continue;
- }
-
- /*
- * consume next file finally
- */
- if (readFileData()) {
- continue;
- }
- break;
-
- } while (System.nanoTime() - start < maxRuntime && !builder.isFull());
-
- finished = builder.isEmpty();
-
- return !finished;
- } catch (IOException e) {
- throw new RuntimeException("Error happened while scanning the file", e);
- }
- }
-
- @Override
- public boolean isFinished() {
- return finished;
- }
-
- @Override
- public long calculateMaxPeekMemory() {
- return maxReturnSize;
- }
-
- @Override
- public long calculateMaxReturnSize() {
- return maxReturnSize;
- }
-
- @Override
- public long calculateRetainedSizeAfterCallingNext() {
- return 0L;
- }
-
- private boolean readFileData() throws IOException {
- while (seriesScanUtil.hasNextFile()) {
- if (readChunkData()) {
- return true;
- }
- }
- return false;
- }
-
- private boolean readChunkData() throws IOException {
- while (seriesScanUtil.hasNextChunk()) {
- if (readPageData()) {
- return true;
- }
- }
- return false;
- }
-
- private boolean readPageData() throws IOException {
- while (seriesScanUtil.hasNextPage()) {
- TsBlock tsBlock = seriesScanUtil.nextPage();
- if (!isEmpty(tsBlock)) {
- appendToBuilder(tsBlock);
- return true;
- }
- }
- return false;
- }
-
- private void appendToBuilder(TsBlock tsBlock) {
- int size = tsBlock.getPositionCount();
- TimeColumnBuilder timeColumnBuilder = builder.getTimeColumnBuilder();
- TimeColumn timeColumn = tsBlock.getTimeColumn();
- for (int i = 0; i < size; i++) {
- timeColumnBuilder.writeLong(timeColumn.getLong(i));
- builder.declarePosition();
- }
- for (int columnIndex = 0, columnSize = tsBlock.getValueColumnCount();
- columnIndex < columnSize;
- columnIndex++) {
- ColumnBuilder columnBuilder = builder.getColumnBuilder(columnIndex);
- Column column = tsBlock.getColumn(columnIndex);
- if (column.mayHaveNull()) {
- for (int i = 0; i < size; i++) {
- if (column.isNull(i)) {
- columnBuilder.appendNull();
- } else {
- columnBuilder.write(column, i);
- }
- }
- } else {
- for (int i = 0; i < size; i++) {
- columnBuilder.write(column, i);
- }
- }
- }
- }
-
- private boolean isEmpty(TsBlock tsBlock) {
- return tsBlock == null || tsBlock.isEmpty();
- }
-
- @Override
- public PlanNodeId getSourceId() {
- return sourceId;
- }
-
- @Override
- public void initQueryDataSource(QueryDataSource dataSource) {
- seriesScanUtil.initQueryDataSource(dataSource);
+ * TSFileDescriptor.getInstance().getConfig().getPageSizeInByte());
}
}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/source/SeriesScanOperator.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/source/SeriesScanOperator.java
index fd831fcd6a..ead57a4943 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/source/SeriesScanOperator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/source/SeriesScanOperator.java
@@ -19,33 +19,15 @@
package org.apache.iotdb.db.mpp.execution.operator.source;
import org.apache.iotdb.commons.path.PartialPath;
-import org.apache.iotdb.db.engine.querycontext.QueryDataSource;
import org.apache.iotdb.db.mpp.execution.operator.OperatorContext;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanNodeId;
import org.apache.iotdb.tsfile.common.conf.TSFileDescriptor;
import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
-import org.apache.iotdb.tsfile.read.common.block.TsBlock;
-import org.apache.iotdb.tsfile.read.common.block.TsBlockBuilder;
-import org.apache.iotdb.tsfile.read.common.block.column.Column;
-import org.apache.iotdb.tsfile.read.common.block.column.ColumnBuilder;
-import org.apache.iotdb.tsfile.read.common.block.column.TimeColumn;
-import org.apache.iotdb.tsfile.read.common.block.column.TimeColumnBuilder;
import org.apache.iotdb.tsfile.read.filter.basic.Filter;
-import java.io.IOException;
import java.util.Set;
-import java.util.concurrent.TimeUnit;
-public class SeriesScanOperator implements DataSourceOperator {
-
- private final OperatorContext operatorContext;
- private final SeriesScanUtil seriesScanUtil;
- private final PlanNodeId sourceId;
- private final TsBlockBuilder builder;
-
- private boolean finished = false;
-
- private final long maxReturnSize;
+public class SeriesScanOperator extends AbstractSeriesScanOperator {
public SeriesScanOperator(
PlanNodeId sourceId,
@@ -56,9 +38,8 @@ public class SeriesScanOperator implements DataSourceOperator {
Filter timeFilter,
Filter valueFilter,
boolean ascending) {
- this.sourceId = sourceId;
- this.operatorContext = context;
- this.seriesScanUtil =
+ super(
+ sourceId,
new SeriesScanUtil(
seriesPath,
allSensors,
@@ -66,151 +47,8 @@ public class SeriesScanOperator implements DataSourceOperator {
context.getInstanceContext(),
timeFilter,
valueFilter,
- ascending);
- this.maxReturnSize = TSFileDescriptor.getInstance().getConfig().getPageSizeInByte();
- this.builder = new TsBlockBuilder(seriesScanUtil.getTsDataTypeList());
- }
-
- @Override
- public OperatorContext getOperatorContext() {
- return operatorContext;
- }
-
- @Override
- public TsBlock next() {
- TsBlock block = builder.build();
- builder.reset();
- return block;
- }
-
- @Override
- public boolean hasNext() {
- try {
-
- // start stopwatch
- long maxRuntime = operatorContext.getMaxRunTime().roundTo(TimeUnit.NANOSECONDS);
- long start = System.nanoTime();
-
- // here use do-while to promise doing this at least once
- do {
- /*
- * consume page data firstly
- */
- if (readPageData()) {
- continue;
- }
-
- /*
- * consume chunk data secondly
- */
- if (readChunkData()) {
- continue;
- }
-
- /*
- * consume next file finally
- */
- if (readFileData()) {
- continue;
- }
- break;
-
- } while (System.nanoTime() - start < maxRuntime && !builder.isFull());
-
- finished = builder.isEmpty();
-
- return !finished;
- } catch (IOException e) {
- throw new RuntimeException("Error happened while scanning the file", e);
- }
- }
-
- @Override
- public boolean isFinished() {
- return finished;
- }
-
- @Override
- public long calculateMaxPeekMemory() {
- return maxReturnSize;
- }
-
- @Override
- public long calculateMaxReturnSize() {
- return maxReturnSize;
- }
-
- @Override
- public long calculateRetainedSizeAfterCallingNext() {
- return 0L;
- }
-
- private boolean readFileData() throws IOException {
- while (seriesScanUtil.hasNextFile()) {
- if (readChunkData()) {
- return true;
- }
- }
- return false;
- }
-
- private boolean readChunkData() throws IOException {
- while (seriesScanUtil.hasNextChunk()) {
- if (readPageData()) {
- return true;
- }
- }
- return false;
- }
-
- private boolean readPageData() throws IOException {
- while (seriesScanUtil.hasNextPage()) {
- TsBlock tsBlock = seriesScanUtil.nextPage();
-
- if (!isEmpty(tsBlock)) {
- appendToBuilder(tsBlock);
- return true;
- }
- }
- return false;
- }
-
- private void appendToBuilder(TsBlock tsBlock) {
- TimeColumnBuilder timeColumnBuilder = builder.getTimeColumnBuilder();
- TimeColumn timeColumn = tsBlock.getTimeColumn();
- ColumnBuilder columnBuilder = builder.getColumnBuilder(0);
- Column column = tsBlock.getColumn(0);
-
- if (column.mayHaveNull()) {
- for (int i = 0, size = tsBlock.getPositionCount(); i < size; i++) {
- timeColumnBuilder.writeLong(timeColumn.getLong(i));
- if (column.isNull(i)) {
- columnBuilder.appendNull();
- } else {
- columnBuilder.write(column, i);
- }
- builder.declarePosition();
- }
- } else {
- for (int i = 0, size = tsBlock.getPositionCount(); i < size; i++) {
- timeColumnBuilder.writeLong(timeColumn.getLong(i));
- columnBuilder.write(column, i);
- builder.declarePosition();
- }
- }
- }
-
- private boolean isEmpty(TsBlock tsBlock) {
- return tsBlock == null || tsBlock.isEmpty();
- }
-
- @Override
- public PlanNodeId getSourceId() {
- return sourceId;
- }
-
- @Override
- public void initQueryDataSource(QueryDataSource dataSource) {
- seriesScanUtil.initQueryDataSource(dataSource);
+ ascending),
+ context,
+ TSFileDescriptor.getInstance().getConfig().getPageSizeInByte());
}
}