You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by hu...@apache.org on 2022/11/28 02:32:19 UTC
[iotdb] 01/01: tmp save before run
This is an automated email from the ASF dual-hosted git repository.
hui pushed a commit to branch lmh/scanOpBatchProcess
in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit 16dfbebf40a23f22c8861010a00d9c48765f4f49
Author: liuminghui233 <54...@qq.com>
AuthorDate: Mon Nov 28 10:31:40 2022 +0800
tmp save before run
---
.../java/org/apache/iotdb/db/conf/IoTDBConfig.java | 8 +-
...erator.java => AbstractSeriesScanOperator.java} | 122 ++++------
.../operator/source/AlignedSeriesScanOperator.java | 136 +----------
.../operator/source/AlignedSeriesScanUtil.java | 2 +
.../operator/source/SeriesScanOperator.java | 133 +---------
.../execution/operator/source/SeriesScanUtil.java | 269 ++++++++++++++++++++-
.../query/reader/chunk/MemAlignedPageReader.java | 7 +-
.../iotdb/db/query/reader/chunk/MemPageReader.java | 8 +-
.../tsfile/read/common/block/TsBlockBuilder.java | 2 +-
.../iotdb/tsfile/read/reader/IPageReader.java | 25 ++
.../tsfile/read/reader/page/AlignedPageReader.java | 7 +-
.../iotdb/tsfile/read/reader/page/PageReader.java | 7 +-
12 files changed, 388 insertions(+), 338 deletions(-)
diff --git a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
index 337f26d487..2a67c0dd7e 100644
--- a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
+++ b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
@@ -397,13 +397,13 @@ public class IoTDBConfig {
private int avgSeriesPointNumberThreshold = 100000;
/** Enable inner space compaction for sequence files */
- private boolean enableSeqSpaceCompaction = true;
+ private boolean enableSeqSpaceCompaction = false;
/** Enable inner space compaction for unsequence files */
- private boolean enableUnseqSpaceCompaction = true;
+ private boolean enableUnseqSpaceCompaction = false;
/** Compact the unsequence files into the overlapped sequence files */
- private boolean enableCrossSpaceCompaction = true;
+ private boolean enableCrossSpaceCompaction = false;
/**
* The strategy of inner space compaction task. There are just one inner space compaction strategy
@@ -558,7 +558,7 @@ public class IoTDBConfig {
private long cacheFileReaderClearPeriod = 100000;
/** the max executing time of query in ms. Unit: millisecond */
- private long queryTimeoutThreshold = 60000;
+ private long queryTimeoutThreshold = 600000;
/** the max time to live of a session in ms. Unit: millisecond */
private int sessionTimeoutThreshold = 0;
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/source/SeriesScanOperator.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/source/AbstractSeriesScanOperator.java
similarity index 59%
copy from server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/source/SeriesScanOperator.java
copy to server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/source/AbstractSeriesScanOperator.java
index 05685f758d..ae3191626a 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/source/SeriesScanOperator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/source/AbstractSeriesScanOperator.java
@@ -16,52 +16,44 @@
* specific language governing permissions and limitations
* under the License.
*/
+
package org.apache.iotdb.db.mpp.execution.operator.source;
-import org.apache.iotdb.commons.path.PartialPath;
import org.apache.iotdb.db.engine.querycontext.QueryDataSource;
import org.apache.iotdb.db.mpp.execution.operator.OperatorContext;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanNodeId;
import org.apache.iotdb.tsfile.common.conf.TSFileDescriptor;
-import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
import org.apache.iotdb.tsfile.read.common.block.TsBlock;
-import org.apache.iotdb.tsfile.read.filter.basic.Filter;
+import org.apache.iotdb.tsfile.read.common.block.TsBlockBuilder;
import java.io.IOException;
-import java.util.Set;
+import java.util.concurrent.TimeUnit;
-public class SeriesScanOperator implements DataSourceOperator {
+public abstract class AbstractSeriesScanOperator implements DataSourceOperator {
private final OperatorContext operatorContext;
private final SeriesScanUtil seriesScanUtil;
private final PlanNodeId sourceId;
- private TsBlock tsBlock;
- private boolean hasCachedTsBlock = false;
+
private boolean finished = false;
+ private final TsBlockBuilder resultBuilder;
+
private final long maxReturnSize;
- public SeriesScanOperator(
+ public AbstractSeriesScanOperator(
PlanNodeId sourceId,
- PartialPath seriesPath,
- Set<String> allSensors,
- TSDataType dataType,
- OperatorContext context,
- Filter timeFilter,
- Filter valueFilter,
- boolean ascending) {
+ SeriesScanUtil seriesScanUtil,
+ int subSensorSize,
+ OperatorContext context) {
this.sourceId = sourceId;
this.operatorContext = context;
- this.seriesScanUtil =
- new SeriesScanUtil(
- seriesPath,
- allSensors,
- dataType,
- context.getInstanceContext(),
- timeFilter,
- valueFilter,
- ascending);
- this.maxReturnSize = TSFileDescriptor.getInstance().getConfig().getPageSizeInByte();
+ this.seriesScanUtil = seriesScanUtil;
+ this.resultBuilder = seriesScanUtil.getCachedTsBlockBuilder();
+
+ // time + all value columns
+ this.maxReturnSize =
+ (1L + subSensorSize) * TSFileDescriptor.getInstance().getConfig().getPageSizeInByte();
}
@Override
@@ -71,49 +63,40 @@ public class SeriesScanOperator implements DataSourceOperator {
@Override
public TsBlock next() {
- if (hasCachedTsBlock || hasNext()) {
- hasCachedTsBlock = false;
- TsBlock res = tsBlock;
- tsBlock = null;
- return res;
- }
- throw new IllegalStateException("no next batch");
+ TsBlock block = resultBuilder.build();
+ resultBuilder.reset();
+ return block;
}
@Override
public boolean hasNext() {
-
try {
- if (hasCachedTsBlock) {
- return true;
- }
-
- /*
- * consume page data firstly
- */
- if (readPageData()) {
- hasCachedTsBlock = true;
- return true;
- }
-
- /*
- * consume chunk data secondly
- */
- if (readChunkData()) {
- hasCachedTsBlock = true;
- return true;
- }
+ // start stopwatch
+ long maxRuntime = operatorContext.getMaxRunTime().roundTo(TimeUnit.NANOSECONDS);
+ long start = System.nanoTime();
+
+ // here use do-while to promise doing this at least once
+ do {
+ // consume page data firstly
+ if (readPageData()) {
+ continue;
+ }
- /*
- * consume next file finally
- */
- while (seriesScanUtil.hasNextFile()) {
+ // consume chunk data secondly
if (readChunkData()) {
- hasCachedTsBlock = true;
- return true;
+ continue;
}
- }
- return hasCachedTsBlock;
+
+ // consume next file finally
+ if (readFileData()) {
+ continue;
+ }
+ break;
+
+ } while (System.nanoTime() - start < maxRuntime && !resultBuilder.isFull());
+
+ finished = resultBuilder.isEmpty();
+ return !finished;
} catch (IOException e) {
throw new RuntimeException("Error happened while scanning the file", e);
}
@@ -121,7 +104,7 @@ public class SeriesScanOperator implements DataSourceOperator {
@Override
public boolean isFinished() {
- return finished || (finished = !hasNext());
+ return finished;
}
@Override
@@ -139,27 +122,26 @@ public class SeriesScanOperator implements DataSourceOperator {
return 0L;
}
- private boolean readChunkData() throws IOException {
- while (seriesScanUtil.hasNextChunk()) {
- if (readPageData()) {
+ private boolean readFileData() throws IOException {
+ while (seriesScanUtil.hasNextFile()) {
+ if (readChunkData()) {
return true;
}
}
return false;
}
- private boolean readPageData() throws IOException {
- while (seriesScanUtil.hasNextPage()) {
- tsBlock = seriesScanUtil.nextPage();
- if (!isEmpty(tsBlock)) {
+ private boolean readChunkData() throws IOException {
+ while (seriesScanUtil.hasNextChunk()) {
+ if (readPageData()) {
return true;
}
}
return false;
}
- private boolean isEmpty(TsBlock tsBlock) {
- return tsBlock == null || tsBlock.isEmpty();
+ private boolean readPageData() throws IOException {
+ return seriesScanUtil.tryToFetchDataFromPage();
}
@Override
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/source/AlignedSeriesScanOperator.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/source/AlignedSeriesScanOperator.java
index 8406437802..1adcce23b2 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/source/AlignedSeriesScanOperator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/source/AlignedSeriesScanOperator.java
@@ -19,26 +19,13 @@
package org.apache.iotdb.db.mpp.execution.operator.source;
import org.apache.iotdb.commons.path.AlignedPath;
-import org.apache.iotdb.db.engine.querycontext.QueryDataSource;
import org.apache.iotdb.db.mpp.execution.operator.OperatorContext;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanNodeId;
-import org.apache.iotdb.tsfile.common.conf.TSFileDescriptor;
-import org.apache.iotdb.tsfile.read.common.block.TsBlock;
import org.apache.iotdb.tsfile.read.filter.basic.Filter;
-import java.io.IOException;
import java.util.HashSet;
-public class AlignedSeriesScanOperator implements DataSourceOperator {
-
- private final OperatorContext operatorContext;
- private final AlignedSeriesScanUtil seriesScanUtil;
- private final PlanNodeId sourceId;
- private TsBlock tsBlock;
- private boolean hasCachedTsBlock = false;
- private boolean finished = false;
-
- private final long maxReturnSize;
+public class AlignedSeriesScanOperator extends AbstractSeriesScanOperator {
public AlignedSeriesScanOperator(
PlanNodeId sourceId,
@@ -47,127 +34,16 @@ public class AlignedSeriesScanOperator implements DataSourceOperator {
Filter timeFilter,
Filter valueFilter,
boolean ascending) {
- this.sourceId = sourceId;
- this.operatorContext = context;
- this.seriesScanUtil =
+ super(
+ sourceId,
new AlignedSeriesScanUtil(
seriesPath,
new HashSet<>(seriesPath.getMeasurementList()),
context.getInstanceContext(),
timeFilter,
valueFilter,
- ascending);
- // time + all value columns
- this.maxReturnSize =
- (1L + seriesPath.getMeasurementList().size())
- * TSFileDescriptor.getInstance().getConfig().getPageSizeInByte();
- }
-
- @Override
- public OperatorContext getOperatorContext() {
- return operatorContext;
- }
-
- @Override
- public TsBlock next() {
- if (hasCachedTsBlock || hasNext()) {
- hasCachedTsBlock = false;
- TsBlock res = tsBlock;
- tsBlock = null;
- return res;
- }
- throw new IllegalStateException("no next batch");
- }
-
- @Override
- public boolean hasNext() {
-
- try {
- if (hasCachedTsBlock) {
- return true;
- }
-
- /*
- * consume page data firstly
- */
- if (readPageData()) {
- hasCachedTsBlock = true;
- return true;
- }
-
- /*
- * consume chunk data secondly
- */
- if (readChunkData()) {
- hasCachedTsBlock = true;
- return true;
- }
-
- /*
- * consume next file finally
- */
- while (seriesScanUtil.hasNextFile()) {
- if (readChunkData()) {
- hasCachedTsBlock = true;
- return true;
- }
- }
- return hasCachedTsBlock;
- } catch (IOException e) {
- throw new RuntimeException("Error happened while scanning the file", e);
- }
- }
-
- @Override
- public boolean isFinished() {
- return finished || (finished = !hasNext());
- }
-
- @Override
- public long calculateMaxPeekMemory() {
- return maxReturnSize;
- }
-
- @Override
- public long calculateMaxReturnSize() {
- return maxReturnSize;
- }
-
- @Override
- public long calculateRetainedSizeAfterCallingNext() {
- return 0L;
- }
-
- private boolean readChunkData() throws IOException {
- while (seriesScanUtil.hasNextChunk()) {
- if (readPageData()) {
- return true;
- }
- }
- return false;
- }
-
- private boolean readPageData() throws IOException {
- while (seriesScanUtil.hasNextPage()) {
- tsBlock = seriesScanUtil.nextPage();
- if (!isEmpty(tsBlock)) {
- return true;
- }
- }
- return false;
- }
-
- private boolean isEmpty(TsBlock tsBlock) {
- return tsBlock == null || tsBlock.isEmpty();
- }
-
- @Override
- public PlanNodeId getSourceId() {
- return sourceId;
- }
-
- @Override
- public void initQueryDataSource(QueryDataSource dataSource) {
- seriesScanUtil.initQueryDataSource(dataSource);
+ ascending),
+ seriesPath.getMeasurementList().size(),
+ context);
}
}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/source/AlignedSeriesScanUtil.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/source/AlignedSeriesScanUtil.java
index f41456b063..6cc0689003 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/source/AlignedSeriesScanUtil.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/source/AlignedSeriesScanUtil.java
@@ -33,6 +33,7 @@ import org.apache.iotdb.tsfile.file.metadata.AlignedTimeSeriesMetadata;
import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
import org.apache.iotdb.tsfile.file.metadata.statistics.Statistics;
import org.apache.iotdb.tsfile.read.common.block.TsBlock;
+import org.apache.iotdb.tsfile.read.common.block.TsBlockBuilder;
import org.apache.iotdb.tsfile.read.filter.basic.Filter;
import org.apache.iotdb.tsfile.read.reader.IPointReader;
import org.apache.iotdb.tsfile.write.schema.IMeasurementSchema;
@@ -57,6 +58,7 @@ public class AlignedSeriesScanUtil extends SeriesScanUtil {
dataTypes =
((AlignedPath) seriesPath)
.getSchemaList().stream().map(IMeasurementSchema::getType).collect(Collectors.toList());
+ cachedTsBlockBuilder = new TsBlockBuilder(dataTypes);
}
@Override
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/source/SeriesScanOperator.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/source/SeriesScanOperator.java
index 05685f758d..491607692b 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/source/SeriesScanOperator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/source/SeriesScanOperator.java
@@ -19,27 +19,14 @@
package org.apache.iotdb.db.mpp.execution.operator.source;
import org.apache.iotdb.commons.path.PartialPath;
-import org.apache.iotdb.db.engine.querycontext.QueryDataSource;
import org.apache.iotdb.db.mpp.execution.operator.OperatorContext;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanNodeId;
-import org.apache.iotdb.tsfile.common.conf.TSFileDescriptor;
import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
-import org.apache.iotdb.tsfile.read.common.block.TsBlock;
import org.apache.iotdb.tsfile.read.filter.basic.Filter;
-import java.io.IOException;
import java.util.Set;
-public class SeriesScanOperator implements DataSourceOperator {
-
- private final OperatorContext operatorContext;
- private final SeriesScanUtil seriesScanUtil;
- private final PlanNodeId sourceId;
- private TsBlock tsBlock;
- private boolean hasCachedTsBlock = false;
- private boolean finished = false;
-
- private final long maxReturnSize;
+public class SeriesScanOperator extends AbstractSeriesScanOperator {
public SeriesScanOperator(
PlanNodeId sourceId,
@@ -50,9 +37,8 @@ public class SeriesScanOperator implements DataSourceOperator {
Filter timeFilter,
Filter valueFilter,
boolean ascending) {
- this.sourceId = sourceId;
- this.operatorContext = context;
- this.seriesScanUtil =
+ super(
+ sourceId,
new SeriesScanUtil(
seriesPath,
allSensors,
@@ -60,115 +46,8 @@ public class SeriesScanOperator implements DataSourceOperator {
context.getInstanceContext(),
timeFilter,
valueFilter,
- ascending);
- this.maxReturnSize = TSFileDescriptor.getInstance().getConfig().getPageSizeInByte();
- }
-
- @Override
- public OperatorContext getOperatorContext() {
- return operatorContext;
- }
-
- @Override
- public TsBlock next() {
- if (hasCachedTsBlock || hasNext()) {
- hasCachedTsBlock = false;
- TsBlock res = tsBlock;
- tsBlock = null;
- return res;
- }
- throw new IllegalStateException("no next batch");
- }
-
- @Override
- public boolean hasNext() {
-
- try {
- if (hasCachedTsBlock) {
- return true;
- }
-
- /*
- * consume page data firstly
- */
- if (readPageData()) {
- hasCachedTsBlock = true;
- return true;
- }
-
- /*
- * consume chunk data secondly
- */
- if (readChunkData()) {
- hasCachedTsBlock = true;
- return true;
- }
-
- /*
- * consume next file finally
- */
- while (seriesScanUtil.hasNextFile()) {
- if (readChunkData()) {
- hasCachedTsBlock = true;
- return true;
- }
- }
- return hasCachedTsBlock;
- } catch (IOException e) {
- throw new RuntimeException("Error happened while scanning the file", e);
- }
- }
-
- @Override
- public boolean isFinished() {
- return finished || (finished = !hasNext());
- }
-
- @Override
- public long calculateMaxPeekMemory() {
- return maxReturnSize;
- }
-
- @Override
- public long calculateMaxReturnSize() {
- return maxReturnSize;
- }
-
- @Override
- public long calculateRetainedSizeAfterCallingNext() {
- return 0L;
- }
-
- private boolean readChunkData() throws IOException {
- while (seriesScanUtil.hasNextChunk()) {
- if (readPageData()) {
- return true;
- }
- }
- return false;
- }
-
- private boolean readPageData() throws IOException {
- while (seriesScanUtil.hasNextPage()) {
- tsBlock = seriesScanUtil.nextPage();
- if (!isEmpty(tsBlock)) {
- return true;
- }
- }
- return false;
- }
-
- private boolean isEmpty(TsBlock tsBlock) {
- return tsBlock == null || tsBlock.isEmpty();
- }
-
- @Override
- public PlanNodeId getSourceId() {
- return sourceId;
- }
-
- @Override
- public void initQueryDataSource(QueryDataSource dataSource) {
- seriesScanUtil.initQueryDataSource(dataSource);
+ ascending),
+ 1,
+ context);
}
}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/source/SeriesScanUtil.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/source/SeriesScanUtil.java
index 22fd2cbabc..b667d08858 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/source/SeriesScanUtil.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/source/SeriesScanUtil.java
@@ -70,7 +70,7 @@ public class SeriesScanUtil {
protected final TSDataType dataType;
// inner class of SeriesReader for order purpose
- private TimeOrderUtils orderUtils;
+ private final TimeOrderUtils orderUtils;
/*
* There is at most one is not null between timeFilter and valueFilter
@@ -121,6 +121,8 @@ public class SeriesScanUtil {
protected boolean hasCachedNextOverlappedPage;
protected TsBlock cachedTsBlock;
+ protected TsBlockBuilder cachedTsBlockBuilder;
+
public SeriesScanUtil(
PartialPath seriesPath,
Set<String> allSensors,
@@ -156,6 +158,10 @@ public class SeriesScanUtil {
new PriorityQueue<>(
orderUtils.comparingLong(
versionPageReader -> orderUtils.getOrderTime(versionPageReader.getStatistics())));
+
+ if (dataType != TSDataType.VECTOR) {
+ this.cachedTsBlockBuilder = new TsBlockBuilder(Collections.singletonList(dataType));
+ }
}
public void initQueryDataSource(QueryDataSource dataSource) {
@@ -450,6 +456,259 @@ public class SeriesScanUtil {
return firstPageReader != null;
}
+ public boolean tryToFetchDataFromPage() throws IOException {
+
+ /*
+ * has overlapped data
+ */
+ if (mergeReader.hasNextTimeValuePair() || firstPageOverlapped()) {
+ if (tryToBuildFromMergeReader()) {
+ return true;
+ }
+ }
+
+ if (firstPageReader != null) {
+ buildFromPageReader();
+ return true;
+ }
+
+ /*
+ * construct first page reader
+ */
+ if (firstChunkMetadata != null) {
+ /*
+ * try to unpack all overlapped ChunkMetadata to cachedPageReaders
+ */
+ unpackAllOverlappedChunkMetadataToPageReaders(
+ orderUtils.getOverlapCheckTime(firstChunkMetadata.getStatistics()), true);
+ } else {
+ /*
+ * first chunk metadata is already unpacked, consume cached pages
+ */
+ initFirstPageReader();
+ }
+
+ if (tryToBuildFromOverlappedPage()) {
+ return true;
+ }
+
+ // make sure firstPageReader won't be null while the unSeqPageReaders has more cached page
+ // readers
+ while (firstPageReader == null && (!seqPageReaders.isEmpty() || !unSeqPageReaders.isEmpty())) {
+
+ initFirstPageReader();
+
+ if (tryToBuildFromOverlappedPage()) {
+ return true;
+ }
+ }
+
+ if (firstPageReader != null) {
+ buildFromPageReader();
+ return true;
+ }
+ return false;
+ }
+
+ private boolean tryToBuildFromOverlappedPage() throws IOException {
+ if (firstPageOverlapped()) {
+ // next page is overlapped, read overlapped data and cache it
+ return tryToBuildFromMergeReader();
+ }
+ return false;
+ }
+
+ public void buildFromPageReader() throws IOException {
+ /*
+ * next page is not overlapped, push down value filter if it exists
+ */
+ if (valueFilter != null) {
+ firstPageReader.setFilter(valueFilter);
+ }
+ firstPageReader.getAllSatisfiedPageData(orderUtils.getAscending(), cachedTsBlockBuilder);
+ firstPageReader = null;
+ }
+
+ private boolean tryToBuildFromMergeReader() throws IOException {
+ int rawSize = cachedTsBlockBuilder.getPositionCount();
+
+ tryToPutAllDirectlyOverlappedUnseqPageReadersIntoMergeReader();
+
+ while (true) {
+
+ // may has overlapped data
+ if (mergeReader.hasNextTimeValuePair()) {
+
+ // TODO we still need to consider data type, ascending and descending here
+ TimeColumnBuilder timeBuilder = cachedTsBlockBuilder.getTimeColumnBuilder();
+ long currentPageEndPointTime = mergeReader.getCurrentReadStopTime();
+ while (mergeReader.hasNextTimeValuePair()) {
+
+ /*
+ * get current first point in mergeReader, this maybe overlapped later
+ */
+ TimeValuePair timeValuePair = mergeReader.currentTimeValuePair();
+
+ if (orderUtils.isExcessEndpoint(timeValuePair.getTimestamp(), currentPageEndPointTime)) {
+ /*
+ * when the merged point excesses the currentPageEndPointTime, we have read all overlapped data before currentPageEndPointTime
+ * 1. has cached batch data, we don't need to read more data, just use the cached data later
+ * 2. has first page reader, which means first page reader last endTime < currentTimeValuePair.getTimestamp(),
+ * we could just use the first page reader later
+ * 3. sequence page reader is not empty, which means first page reader last endTime < currentTimeValuePair.getTimestamp(),
+ * we could use the first sequence page reader later
+ */
+ if (cachedTsBlockBuilder.getPositionCount() > rawSize
+ || firstPageReader != null
+ || !seqPageReaders.isEmpty()) {
+ break;
+ }
+ // so, we don't have other data except mergeReader
+ currentPageEndPointTime = mergeReader.getCurrentReadStopTime();
+ }
+
+ // unpack all overlapped data for the first timeValuePair
+ unpackAllOverlappedTsFilesToTimeSeriesMetadata(timeValuePair.getTimestamp());
+ unpackAllOverlappedTimeSeriesMetadataToCachedChunkMetadata(
+ timeValuePair.getTimestamp(), false);
+ unpackAllOverlappedChunkMetadataToPageReaders(timeValuePair.getTimestamp(), false);
+ unpackAllOverlappedUnseqPageReadersToMergeReader(timeValuePair.getTimestamp());
+
+ // update if there are unpacked unSeqPageReaders
+ timeValuePair = mergeReader.currentTimeValuePair();
+
+ // from now, the unsequence reader is all unpacked, so we don't need to consider it
+ // we has first page reader now
+ if (firstPageReader != null) {
+ // if current timeValuePair excesses the first page reader's end time, we just use the
+ // cached data
+ if ((orderUtils.getAscending()
+ && timeValuePair.getTimestamp() > firstPageReader.getStatistics().getEndTime())
+ || (!orderUtils.getAscending()
+ && timeValuePair.getTimestamp()
+ < firstPageReader.getStatistics().getStartTime())) {
+ return cachedTsBlockBuilder.getPositionCount() > rawSize;
+ } else if (orderUtils.isOverlapped(
+ timeValuePair.getTimestamp(), firstPageReader.getStatistics())) {
+ // current timeValuePair is overlapped with firstPageReader, add it to merged reader
+ // and update endTime to the max end time
+ mergeReader.addReader(
+ getPointReader(
+ firstPageReader.getAllSatisfiedPageData(orderUtils.getAscending())),
+ firstPageReader.version,
+ orderUtils.getOverlapCheckTime(firstPageReader.getStatistics()),
+ context);
+ currentPageEndPointTime =
+ updateEndPointTime(currentPageEndPointTime, firstPageReader);
+ firstPageReader = null;
+ }
+ }
+
+ // the seq page readers is not empty, just like first page reader
+ if (!seqPageReaders.isEmpty()) {
+ if ((orderUtils.getAscending()
+ && timeValuePair.getTimestamp()
+ > seqPageReaders.get(0).getStatistics().getEndTime())
+ || (!orderUtils.getAscending()
+ && timeValuePair.getTimestamp()
+ < seqPageReaders.get(0).getStatistics().getStartTime())) {
+ return cachedTsBlockBuilder.getPositionCount() > rawSize;
+ } else if (orderUtils.isOverlapped(
+ timeValuePair.getTimestamp(), seqPageReaders.get(0).getStatistics())) {
+ VersionPageReader pageReader = seqPageReaders.remove(0);
+ mergeReader.addReader(
+ getPointReader(pageReader.getAllSatisfiedPageData(orderUtils.getAscending())),
+ pageReader.version,
+ orderUtils.getOverlapCheckTime(pageReader.getStatistics()),
+ context);
+ currentPageEndPointTime = updateEndPointTime(currentPageEndPointTime, pageReader);
+ }
+ }
+
+ /*
+ * get the latest first point in mergeReader
+ */
+ timeValuePair = mergeReader.nextTimeValuePair();
+
+ Object valueForFilter = timeValuePair.getValue().getValue();
+
+ // TODO fix value filter firstNotNullObject, currently, if it's a value filter, it will
+ // only accept AlignedPath with only one sub sensor
+ if (timeValuePair.getValue().getDataType() == TSDataType.VECTOR) {
+ for (TsPrimitiveType tsPrimitiveType : timeValuePair.getValue().getVector()) {
+ if (tsPrimitiveType != null) {
+ valueForFilter = tsPrimitiveType.getValue();
+ break;
+ }
+ }
+ }
+
+ if (valueFilter == null
+ || valueFilter.satisfy(timeValuePair.getTimestamp(), valueForFilter)) {
+ timeBuilder.writeLong(timeValuePair.getTimestamp());
+ switch (dataType) {
+ case BOOLEAN:
+ cachedTsBlockBuilder
+ .getColumnBuilder(0)
+ .writeBoolean(timeValuePair.getValue().getBoolean());
+ break;
+ case INT32:
+ cachedTsBlockBuilder
+ .getColumnBuilder(0)
+ .writeInt(timeValuePair.getValue().getInt());
+ break;
+ case INT64:
+ cachedTsBlockBuilder
+ .getColumnBuilder(0)
+ .writeLong(timeValuePair.getValue().getLong());
+ break;
+ case FLOAT:
+ cachedTsBlockBuilder
+ .getColumnBuilder(0)
+ .writeFloat(timeValuePair.getValue().getFloat());
+ break;
+ case DOUBLE:
+ cachedTsBlockBuilder
+ .getColumnBuilder(0)
+ .writeDouble(timeValuePair.getValue().getDouble());
+ break;
+ case TEXT:
+ cachedTsBlockBuilder
+ .getColumnBuilder(0)
+ .writeBinary(timeValuePair.getValue().getBinary());
+ break;
+ case VECTOR:
+ TsPrimitiveType[] values = timeValuePair.getValue().getVector();
+ for (int i = 0; i < values.length; i++) {
+ if (values[i] == null) {
+ cachedTsBlockBuilder.getColumnBuilder(i).appendNull();
+ } else {
+ cachedTsBlockBuilder.getColumnBuilder(i).writeTsPrimitiveType(values[i]);
+ }
+ }
+ break;
+ default:
+ throw new UnSupportedDataTypeException(String.valueOf(dataType));
+ }
+ cachedTsBlockBuilder.declarePosition();
+ }
+ }
+
+ /*
+ * if current overlapped page has valid data, return, otherwise read next overlapped page
+ */
+ if (cachedTsBlockBuilder.getPositionCount() > rawSize) {
+ return true;
+ } else if (mergeReader.hasNextTimeValuePair()) {
+ // condition: seqPage.endTime < mergeReader.currentTime
+ return false;
+ }
+ } else {
+ return false;
+ }
+ }
+ }
+
private boolean isExistOverlappedPage() throws IOException {
if (firstPageOverlapped()) {
/*
@@ -1088,6 +1347,10 @@ public class SeriesScanUtil {
return orderUtils;
}
+ public TsBlockBuilder getCachedTsBlockBuilder() {
+ return cachedTsBlockBuilder;
+ }
+
protected class VersionPageReader {
protected PriorityMergeReader.MergeReaderPriority version;
@@ -1131,6 +1394,10 @@ public class SeriesScanUtil {
return tsBlock;
}
+ void getAllSatisfiedPageData(boolean ascending, TsBlockBuilder builder) throws IOException {
+ data.getAllSatisfiedData(ascending, builder);
+ }
+
void setFilter(Filter filter) {
data.setFilter(filter);
}
diff --git a/server/src/main/java/org/apache/iotdb/db/query/reader/chunk/MemAlignedPageReader.java b/server/src/main/java/org/apache/iotdb/db/query/reader/chunk/MemAlignedPageReader.java
index 41f4d7c1fd..ae13babf9f 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/reader/chunk/MemAlignedPageReader.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/reader/chunk/MemAlignedPageReader.java
@@ -88,7 +88,12 @@ public class MemAlignedPageReader implements IPageReader, IAlignedPageReader {
@Override
public TsBlock getAllSatisfiedData() {
builder.reset();
+ writeDataToBuilder(builder);
+ return builder.build();
+ }
+ @Override
+ public void writeDataToBuilder(TsBlockBuilder builder) {
boolean[] satisfyInfo = new boolean[tsBlock.getPositionCount()];
for (int row = 0; row < tsBlock.getPositionCount(); row++) {
@@ -130,8 +135,6 @@ public class MemAlignedPageReader implements IPageReader, IAlignedPageReader {
}
}
}
-
- return builder.build();
}
@Override
diff --git a/server/src/main/java/org/apache/iotdb/db/query/reader/chunk/MemPageReader.java b/server/src/main/java/org/apache/iotdb/db/query/reader/chunk/MemPageReader.java
index 0baf315eff..85f08de1c8 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/reader/chunk/MemPageReader.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/reader/chunk/MemPageReader.java
@@ -92,6 +92,13 @@ public class MemPageReader implements IPageReader {
public TsBlock getAllSatisfiedData() {
TSDataType dataType = chunkMetadata.getDataType();
TsBlockBuilder builder = new TsBlockBuilder(Collections.singletonList(dataType));
+ writeDataToBuilder(builder);
+ return builder.build();
+ }
+
+ @Override
+ public void writeDataToBuilder(TsBlockBuilder builder) {
+ TSDataType dataType = chunkMetadata.getDataType();
TimeColumnBuilder timeBuilder = builder.getTimeColumnBuilder();
ColumnBuilder valueBuilder = builder.getColumnBuilder(0);
switch (dataType) {
@@ -164,7 +171,6 @@ public class MemPageReader implements IPageReader {
default:
throw new UnSupportedDataTypeException(String.valueOf(dataType));
}
- return builder.build();
}
@Override
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/block/TsBlockBuilder.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/block/TsBlockBuilder.java
index d4152d4ddb..c309835a09 100644
--- a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/block/TsBlockBuilder.java
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/block/TsBlockBuilder.java
@@ -259,7 +259,7 @@ public class TsBlockBuilder {
}
public boolean isFull() {
- return declaredPositions == MAX_LINE_NUMBER || tsBlockBuilderStatus.isFull();
+ return declaredPositions >= MAX_LINE_NUMBER || tsBlockBuilderStatus.isFull();
}
public boolean isEmpty() {
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/reader/IPageReader.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/reader/IPageReader.java
index a68f4590b1..a967cae19c 100644
--- a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/reader/IPageReader.java
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/reader/IPageReader.java
@@ -22,6 +22,9 @@ import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
import org.apache.iotdb.tsfile.file.metadata.statistics.Statistics;
import org.apache.iotdb.tsfile.read.common.BatchData;
import org.apache.iotdb.tsfile.read.common.block.TsBlock;
+import org.apache.iotdb.tsfile.read.common.block.TsBlockBuilder;
+import org.apache.iotdb.tsfile.read.common.block.column.ColumnBuilder;
+import org.apache.iotdb.tsfile.read.common.block.column.TimeColumnBuilder;
import org.apache.iotdb.tsfile.read.filter.basic.Filter;
import java.io.IOException;
@@ -37,6 +40,28 @@ public interface IPageReader {
TsBlock getAllSatisfiedData() throws IOException;
+ void writeDataToBuilder(TsBlockBuilder builder) throws IOException;
+
+ default void getAllSatisfiedData(boolean ascending, TsBlockBuilder builder) throws IOException {
+ if (ascending) {
+ writeDataToBuilder(builder);
+ } else {
+ TimeColumnBuilder timeColumnBuilder = builder.getTimeColumnBuilder();
+ ColumnBuilder[] valueColumnBuilders = builder.getValueColumnBuilders();
+ int columnNum = valueColumnBuilders.length;
+
+ TsBlock tsBlock = getAllSatisfiedData();
+ tsBlock.reverse();
+ for (int i = 0, size = tsBlock.getPositionCount(); i < size; i++) {
+ timeColumnBuilder.write(tsBlock.getTimeColumn(), i);
+ for (int columnIndex = 0; columnIndex < columnNum; columnIndex++) {
+ valueColumnBuilders[columnIndex].write(tsBlock.getColumn(columnIndex), i);
+ }
+ builder.declarePosition();
+ }
+ }
+ }
+
Statistics getStatistics();
void setFilter(Filter filter);
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/reader/page/AlignedPageReader.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/reader/page/AlignedPageReader.java
index 3f076906bb..faae955198 100644
--- a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/reader/page/AlignedPageReader.java
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/reader/page/AlignedPageReader.java
@@ -116,6 +116,12 @@ public class AlignedPageReader implements IPageReader, IAlignedPageReader {
@Override
public TsBlock getAllSatisfiedData() throws IOException {
builder.reset();
+ writeDataToBuilder(builder);
+ return builder.build();
+ }
+
+ @Override
+ public void writeDataToBuilder(TsBlockBuilder builder) throws IOException {
long[] timeBatch = timePageReader.getNextTimeBatch();
// if all the sub sensors' value are null in current row, just discard it
@@ -189,7 +195,6 @@ public class AlignedPageReader implements IPageReader, IAlignedPageReader {
}
}
}
- return builder.build();
}
public void setDeleteIntervalList(List<List<TimeRange>> list) {
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/reader/page/PageReader.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/reader/page/PageReader.java
index e1fce8ff65..190bb11b5c 100644
--- a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/reader/page/PageReader.java
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/reader/page/PageReader.java
@@ -161,6 +161,12 @@ public class PageReader implements IPageReader {
@Override
public TsBlock getAllSatisfiedData() throws IOException {
TsBlockBuilder builder = new TsBlockBuilder(Collections.singletonList(dataType));
+ writeDataToBuilder(builder);
+ return builder.build();
+ }
+
+ @Override
+ public void writeDataToBuilder(TsBlockBuilder builder) throws IOException {
TimeColumnBuilder timeBuilder = builder.getTimeColumnBuilder();
ColumnBuilder valueBuilder = builder.getColumnBuilder(0);
if (filter == null || filter.satisfy(getStatistics())) {
@@ -235,7 +241,6 @@ public class PageReader implements IPageReader {
throw new UnSupportedDataTypeException(String.valueOf(dataType));
}
}
- return builder.build();
}
@Override