You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ja...@apache.org on 2022/07/16 01:58:42 UTC
[iotdb] branch master updated: [IOTDB-3778] Use SeriesScanUtil in compaction for point reader (#6668)
This is an automated email from the ASF dual-hosted git repository.
jackietien pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new 70772cb868 [IOTDB-3778] Use SeriesScanUtil in compaction for point reader (#6668)
70772cb868 is described below
commit 70772cb8684315814959974811440f746e5c19c3
Author: Mrquan <50...@users.noreply.github.com>
AuthorDate: Sat Jul 16 09:58:37 2022 +0800
[IOTDB-3778] Use SeriesScanUtil in compaction for point reader (#6668)
---
.../rewrite/task/ReadPointPerformerSubTask.java | 19 +-
.../impl/ReadPointCompactionPerformer.java | 56 +-
.../engine/compaction/reader/IDataBlockReader.java | 31 +
.../compaction/reader/SeriesDataBlockReader.java | 156 ++
.../fragment/FragmentInstanceContext.java | 11 +
.../execution/operator/source/SeriesScanUtil.java | 17 +-
.../iotdb/db/query/context/QueryContext.java | 2 +-
.../ReadPointCompactionPerformerTest.java | 1689 ++++++++++----------
8 files changed, 1104 insertions(+), 877 deletions(-)
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/compaction/cross/rewrite/task/ReadPointPerformerSubTask.java b/server/src/main/java/org/apache/iotdb/db/engine/compaction/cross/rewrite/task/ReadPointPerformerSubTask.java
index d22362746c..4280d2a6f5 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/compaction/cross/rewrite/task/ReadPointPerformerSubTask.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/compaction/cross/rewrite/task/ReadPointPerformerSubTask.java
@@ -20,10 +20,10 @@ package org.apache.iotdb.db.engine.compaction.cross.rewrite.task;
import org.apache.iotdb.commons.conf.IoTDBConstant;
import org.apache.iotdb.db.engine.compaction.performer.impl.ReadPointCompactionPerformer;
+import org.apache.iotdb.db.engine.compaction.reader.IDataBlockReader;
import org.apache.iotdb.db.engine.compaction.writer.AbstractCompactionWriter;
import org.apache.iotdb.db.engine.querycontext.QueryDataSource;
-import org.apache.iotdb.db.query.context.QueryContext;
-import org.apache.iotdb.tsfile.read.reader.IBatchReader;
+import org.apache.iotdb.db.mpp.execution.fragment.FragmentInstanceContext;
import org.apache.iotdb.tsfile.write.schema.IMeasurementSchema;
import org.apache.iotdb.tsfile.write.schema.MeasurementSchema;
@@ -46,7 +46,7 @@ public class ReadPointPerformerSubTask implements Callable<Void> {
LoggerFactory.getLogger(IoTDBConstant.COMPACTION_LOGGER_NAME);
private final String device;
private final Set<String> measurementList;
- private final QueryContext queryContext;
+ private final FragmentInstanceContext fragmentInstanceContext;
private final QueryDataSource queryDataSource;
private final AbstractCompactionWriter compactionWriter;
private final Map<String, MeasurementSchema> schemaMap;
@@ -55,14 +55,14 @@ public class ReadPointPerformerSubTask implements Callable<Void> {
public ReadPointPerformerSubTask(
String device,
Set<String> measurementList,
- QueryContext queryContext,
+ FragmentInstanceContext fragmentInstanceContext,
QueryDataSource queryDataSource,
AbstractCompactionWriter compactionWriter,
Map<String, MeasurementSchema> schemaMap,
int taskId) {
this.device = device;
this.measurementList = measurementList;
- this.queryContext = queryContext;
+ this.fragmentInstanceContext = fragmentInstanceContext;
this.queryDataSource = queryDataSource;
this.compactionWriter = compactionWriter;
this.schemaMap = schemaMap;
@@ -74,19 +74,20 @@ public class ReadPointPerformerSubTask implements Callable<Void> {
for (String measurement : measurementList) {
List<IMeasurementSchema> measurementSchemas =
Collections.singletonList(schemaMap.get(measurement));
- IBatchReader dataBatchReader =
+ IDataBlockReader dataBlockReader =
ReadPointCompactionPerformer.constructReader(
device,
Collections.singletonList(measurement),
measurementSchemas,
measurementList,
- queryContext,
+ fragmentInstanceContext,
queryDataSource,
false);
- if (dataBatchReader.hasNextBatch()) {
+ if (dataBlockReader.hasNextBatch()) {
compactionWriter.startMeasurement(measurementSchemas, taskId);
- ReadPointCompactionPerformer.writeWithReader(compactionWriter, dataBatchReader, taskId);
+ ReadPointCompactionPerformer.writeWithReader(
+ compactionWriter, dataBlockReader, taskId, false);
compactionWriter.endMeasurement(taskId);
}
}
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/compaction/performer/impl/ReadPointCompactionPerformer.java b/server/src/main/java/org/apache/iotdb/db/engine/compaction/performer/impl/ReadPointCompactionPerformer.java
index eecb159196..a47574a7b0 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/compaction/performer/impl/ReadPointCompactionPerformer.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/compaction/performer/impl/ReadPointCompactionPerformer.java
@@ -29,6 +29,8 @@ import org.apache.iotdb.db.engine.compaction.cross.rewrite.task.ReadPointPerform
import org.apache.iotdb.db.engine.compaction.inner.utils.MultiTsFileDeviceIterator;
import org.apache.iotdb.db.engine.compaction.performer.ICrossCompactionPerformer;
import org.apache.iotdb.db.engine.compaction.performer.IUnseqCompactionPerformer;
+import org.apache.iotdb.db.engine.compaction.reader.IDataBlockReader;
+import org.apache.iotdb.db.engine.compaction.reader.SeriesDataBlockReader;
import org.apache.iotdb.db.engine.compaction.task.CompactionTaskSummary;
import org.apache.iotdb.db.engine.compaction.writer.AbstractCompactionWriter;
import org.apache.iotdb.db.engine.compaction.writer.CrossSpaceCompactionWriter;
@@ -39,19 +41,19 @@ import org.apache.iotdb.db.engine.storagegroup.TsFileResource;
import org.apache.iotdb.db.exception.StorageEngineException;
import org.apache.iotdb.db.metadata.path.AlignedPath;
import org.apache.iotdb.db.metadata.path.MeasurementPath;
-import org.apache.iotdb.db.query.context.QueryContext;
+import org.apache.iotdb.db.mpp.execution.fragment.FragmentInstanceContext;
import org.apache.iotdb.db.query.control.FileReaderManager;
import org.apache.iotdb.db.query.control.QueryResourceManager;
-import org.apache.iotdb.db.query.reader.series.SeriesRawDataBatchReader;
import org.apache.iotdb.db.utils.QueryUtils;
import org.apache.iotdb.tsfile.file.header.ChunkHeader;
import org.apache.iotdb.tsfile.file.metadata.ChunkMetadata;
import org.apache.iotdb.tsfile.file.metadata.TimeseriesMetadata;
import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+import org.apache.iotdb.tsfile.read.TimeValuePair;
import org.apache.iotdb.tsfile.read.TsFileSequenceReader;
-import org.apache.iotdb.tsfile.read.common.BatchData;
import org.apache.iotdb.tsfile.read.common.Chunk;
-import org.apache.iotdb.tsfile.read.reader.IBatchReader;
+import org.apache.iotdb.tsfile.read.common.block.TsBlock;
+import org.apache.iotdb.tsfile.read.reader.IPointReader;
import org.apache.iotdb.tsfile.utils.Pair;
import org.apache.iotdb.tsfile.write.schema.IMeasurementSchema;
import org.apache.iotdb.tsfile.write.schema.MeasurementSchema;
@@ -106,7 +108,8 @@ public class ReadPointCompactionPerformer
public void perform()
throws IOException, MetadataException, StorageEngineException, InterruptedException {
long queryId = QueryResourceManager.getInstance().assignCompactionQueryId();
- QueryContext queryContext = new QueryContext(queryId);
+ FragmentInstanceContext fragmentInstanceContext =
+ FragmentInstanceContext.createFragmentInstanceContextForCompaction(queryId);
QueryDataSource queryDataSource = new QueryDataSource(seqFiles, unseqFiles);
QueryResourceManager.getInstance()
.getQueryFileManager()
@@ -126,10 +129,10 @@ public class ReadPointCompactionPerformer
if (isAligned) {
compactAlignedSeries(
- device, deviceIterator, compactionWriter, queryContext, queryDataSource);
+ device, deviceIterator, compactionWriter, fragmentInstanceContext, queryDataSource);
} else {
compactNonAlignedSeries(
- device, deviceIterator, compactionWriter, queryContext, queryDataSource);
+ device, deviceIterator, compactionWriter, fragmentInstanceContext, queryDataSource);
}
}
@@ -156,7 +159,7 @@ public class ReadPointCompactionPerformer
String device,
MultiTsFileDeviceIterator deviceIterator,
AbstractCompactionWriter compactionWriter,
- QueryContext queryContext,
+ FragmentInstanceContext fragmentInstanceContext,
QueryDataSource queryDataSource)
throws IOException, MetadataException {
MultiTsFileDeviceIterator.AlignedMeasurementIterator alignedMeasurementIterator =
@@ -171,21 +174,21 @@ public class ReadPointCompactionPerformer
measurementSchemas.stream()
.map(IMeasurementSchema::getMeasurementId)
.collect(Collectors.toList());
- IBatchReader dataBatchReader =
+ IDataBlockReader dataBlockReader =
constructReader(
device,
existedMeasurements,
measurementSchemas,
allMeasurements,
- queryContext,
+ fragmentInstanceContext,
queryDataSource,
true);
- if (dataBatchReader.hasNextBatch()) {
+ if (dataBlockReader.hasNextBatch()) {
// chunkgroup is serialized only when at least one timeseries under this device has data
compactionWriter.startChunkGroup(device, true);
compactionWriter.startMeasurement(measurementSchemas, 0);
- writeWithReader(compactionWriter, dataBatchReader, 0);
+ writeWithReader(compactionWriter, dataBlockReader, 0, true);
compactionWriter.endMeasurement(0);
compactionWriter.endChunkGroup();
}
@@ -195,7 +198,7 @@ public class ReadPointCompactionPerformer
String device,
MultiTsFileDeviceIterator deviceIterator,
AbstractCompactionWriter compactionWriter,
- QueryContext queryContext,
+ FragmentInstanceContext fragmentInstanceContext,
QueryDataSource queryDataSource)
throws IOException, InterruptedException, IllegalPathException {
MultiTsFileDeviceIterator.MeasurementIterator measurementIterator =
@@ -224,7 +227,7 @@ public class ReadPointCompactionPerformer
new ReadPointPerformerSubTask(
device,
measurementsForEachSubTask[i],
- queryContext,
+ fragmentInstanceContext,
queryDataSource,
compactionWriter,
schemaMap,
@@ -341,12 +344,12 @@ public class ReadPointCompactionPerformer
* @param measurementIds if device is aligned, then measurementIds contain all measurements. If
* device is not aligned, then measurementIds only contain one measurement.
*/
- public static IBatchReader constructReader(
+ public static IDataBlockReader constructReader(
String deviceId,
List<String> measurementIds,
List<IMeasurementSchema> measurementSchemas,
Set<String> allSensors,
- QueryContext queryContext,
+ FragmentInstanceContext fragmentInstanceContext,
QueryDataSource queryDataSource,
boolean isAlign)
throws IllegalPathException {
@@ -359,17 +362,24 @@ public class ReadPointCompactionPerformer
seriesPath = new MeasurementPath(deviceId, measurementIds.get(0), measurementSchemas.get(0));
tsDataType = measurementSchemas.get(0).getType();
}
- return new SeriesRawDataBatchReader(
- seriesPath, allSensors, tsDataType, queryContext, queryDataSource, null, null, null, true);
+ return new SeriesDataBlockReader(
+ seriesPath, allSensors, tsDataType, fragmentInstanceContext, queryDataSource, true);
}
public static void writeWithReader(
- AbstractCompactionWriter writer, IBatchReader reader, int subTaskId) throws IOException {
+ AbstractCompactionWriter writer, IDataBlockReader reader, int subTaskId, boolean isAligned)
+ throws IOException {
while (reader.hasNextBatch()) {
- BatchData batchData = reader.nextBatch();
- while (batchData.hasCurrent()) {
- writer.write(batchData.currentTime(), batchData.currentValue(), subTaskId);
- batchData.next();
+ TsBlock tsBlock = reader.nextBatch();
+ IPointReader pointReader;
+ if (isAligned) {
+ pointReader = tsBlock.getTsBlockAlignedRowIterator();
+ } else {
+ pointReader = tsBlock.getTsBlockSingleColumnIterator();
+ }
+ while (pointReader.hasNextTimeValuePair()) {
+ TimeValuePair timeValuePair = pointReader.nextTimeValuePair();
+ writer.write(timeValuePair.getTimestamp(), timeValuePair.getValue().getValue(), subTaskId);
}
}
}
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/compaction/reader/IDataBlockReader.java b/server/src/main/java/org/apache/iotdb/db/engine/compaction/reader/IDataBlockReader.java
new file mode 100644
index 0000000000..6f5bd080f3
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/engine/compaction/reader/IDataBlockReader.java
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.db.engine.compaction.reader;
+
+import org.apache.iotdb.tsfile.read.common.block.TsBlock;
+
+import java.io.IOException;
+
+public interface IDataBlockReader {
+ boolean hasNextBatch() throws IOException;
+
+ TsBlock nextBatch() throws IOException;
+
+ void close() throws IOException;
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/compaction/reader/SeriesDataBlockReader.java b/server/src/main/java/org/apache/iotdb/db/engine/compaction/reader/SeriesDataBlockReader.java
new file mode 100644
index 0000000000..ef42dbe7bf
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/engine/compaction/reader/SeriesDataBlockReader.java
@@ -0,0 +1,156 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.db.engine.compaction.reader;
+
+import org.apache.iotdb.commons.path.PartialPath;
+import org.apache.iotdb.commons.utils.TestOnly;
+import org.apache.iotdb.db.engine.querycontext.QueryDataSource;
+import org.apache.iotdb.db.engine.storagegroup.TsFileResource;
+import org.apache.iotdb.db.metadata.path.AlignedPath;
+import org.apache.iotdb.db.metadata.path.MeasurementPath;
+import org.apache.iotdb.db.mpp.execution.fragment.FragmentInstanceContext;
+import org.apache.iotdb.db.mpp.execution.operator.source.AlignedSeriesScanUtil;
+import org.apache.iotdb.db.mpp.execution.operator.source.SeriesScanUtil;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+import org.apache.iotdb.tsfile.read.common.block.TsBlock;
+
+import java.io.IOException;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+
+public class SeriesDataBlockReader implements IDataBlockReader {
+
+ private final SeriesScanUtil seriesScanUtil;
+
+ private TsBlock tsBlock;
+
+ private boolean hasCachedBatchData = false;
+
+ public SeriesDataBlockReader(
+ PartialPath seriesPath,
+ Set<String> allSensors,
+ TSDataType dataType,
+ FragmentInstanceContext context,
+ QueryDataSource dataSource,
+ boolean ascending) {
+ if (seriesPath instanceof AlignedPath) {
+ this.seriesScanUtil =
+ new AlignedSeriesScanUtil(seriesPath, allSensors, context, null, null, ascending);
+ } else if (seriesPath instanceof MeasurementPath) {
+ this.seriesScanUtil =
+ new SeriesScanUtil(seriesPath, allSensors, dataType, context, null, null, ascending);
+ } else {
+ throw new IllegalArgumentException("Should call exact sub class!");
+ }
+ this.seriesScanUtil.initQueryDataSource(dataSource);
+ }
+
+ @TestOnly
+ public SeriesDataBlockReader(
+ PartialPath seriesPath,
+ TSDataType dataType,
+ FragmentInstanceContext context,
+ List<TsFileResource> seqFileResource,
+ List<TsFileResource> unseqFileResource,
+ boolean ascending) {
+ Set<String> allSensors = new HashSet<>();
+ if (seriesPath instanceof AlignedPath) {
+ this.seriesScanUtil =
+ new AlignedSeriesScanUtil(seriesPath, allSensors, context, null, null, ascending);
+ } else {
+ allSensors.add(seriesPath.getMeasurement());
+ this.seriesScanUtil =
+ new SeriesScanUtil(seriesPath, allSensors, dataType, context, null, null, ascending);
+ }
+ seriesScanUtil.initQueryDataSource(seqFileResource, unseqFileResource);
+ }
+
+ @Override
+ public boolean hasNextBatch() throws IOException {
+
+ if (hasCachedBatchData) {
+ return true;
+ }
+
+ /*
+ * consume page data firstly
+ */
+ if (readPageData()) {
+ hasCachedBatchData = true;
+ return true;
+ }
+
+ /*
+ * consume chunk data secondly
+ */
+ if (readChunkData()) {
+ hasCachedBatchData = true;
+ return true;
+ }
+
+ /*
+ * consume next file finally
+ */
+ while (seriesScanUtil.hasNextFile()) {
+ if (readChunkData()) {
+ hasCachedBatchData = true;
+ return true;
+ }
+ }
+ return hasCachedBatchData;
+ }
+
+ @Override
+ public TsBlock nextBatch() throws IOException {
+ if (hasCachedBatchData || hasNextBatch()) {
+ hasCachedBatchData = false;
+ return tsBlock;
+ }
+ throw new IOException("no next block");
+ }
+
+ @Override
+ public void close() throws IOException {
+ // no resources need to close
+ }
+
+ private boolean readChunkData() throws IOException {
+ while (seriesScanUtil.hasNextChunk()) {
+ if (readPageData()) {
+ return true;
+ }
+ }
+ return false;
+ }
+
+ private boolean readPageData() throws IOException {
+ while (seriesScanUtil.hasNextPage()) {
+ tsBlock = seriesScanUtil.nextPage();
+ if (!isEmpty(tsBlock)) {
+ return true;
+ }
+ }
+ return false;
+ }
+
+ private boolean isEmpty(TsBlock tsBlock) {
+ return tsBlock == null || tsBlock.isEmpty();
+ }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/fragment/FragmentInstanceContext.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/fragment/FragmentInstanceContext.java
index ffc9d101b3..088896c34b 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/fragment/FragmentInstanceContext.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/fragment/FragmentInstanceContext.java
@@ -72,6 +72,10 @@ public class FragmentInstanceContext extends QueryContext {
return instanceContext;
}
+ public static FragmentInstanceContext createFragmentInstanceContextForCompaction(long queryId) {
+ return new FragmentInstanceContext(queryId);
+ }
+
private FragmentInstanceContext(
FragmentInstanceId id, FragmentInstanceStateMachine stateMachine) {
this.id = id;
@@ -79,6 +83,13 @@ public class FragmentInstanceContext extends QueryContext {
this.executionEndTime.set(END_TIME_INITIAL_VALUE);
}
+ // used for compaction
+ private FragmentInstanceContext(long queryId) {
+ this.queryId = queryId;
+ this.id = null;
+ this.stateMachine = null;
+ }
+
public void start() {
long now = System.currentTimeMillis();
executionStartTime.compareAndSet(null, now);
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/source/SeriesScanUtil.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/source/SeriesScanUtil.java
index 483ef03afa..254693e7b9 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/source/SeriesScanUtil.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/source/SeriesScanUtil.java
@@ -19,6 +19,7 @@
package org.apache.iotdb.db.mpp.execution.operator.source;
import org.apache.iotdb.commons.path.PartialPath;
+import org.apache.iotdb.commons.utils.TestOnly;
import org.apache.iotdb.db.engine.querycontext.QueryDataSource;
import org.apache.iotdb.db.engine.storagegroup.TsFileResource;
import org.apache.iotdb.db.metadata.idtable.IDTable;
@@ -163,6 +164,14 @@ public class SeriesScanUtil {
orderUtils.setCurSeqFileIndex(dataSource);
}
+ @TestOnly
+ public void initQueryDataSource(
+ List<TsFileResource> seqFileResource, List<TsFileResource> unseqFileResource) {
+ dataSource = new QueryDataSource(seqFileResource, unseqFileResource);
+ QueryUtils.fillOrderIndexes(dataSource, seriesPath.getDevice(), orderUtils.getAscending());
+ orderUtils.setCurSeqFileIndex(dataSource);
+ }
+
protected PriorityMergeReader getPriorityMergeReader() {
return new PriorityMergeReader();
}
@@ -175,7 +184,7 @@ public class SeriesScanUtil {
return !(hasNextPage() || hasNextChunk() || hasNextFile());
}
- boolean hasNextFile() throws IOException {
+ public boolean hasNextFile() throws IOException {
if (!unSeqPageReaders.isEmpty()
|| firstPageReader != null
@@ -250,7 +259,7 @@ public class SeriesScanUtil {
* This method should be called after hasNextFile() until no next chunk, make sure that all
* overlapped chunks are consumed
*/
- boolean hasNextChunk() throws IOException {
+ public boolean hasNextChunk() throws IOException {
if (!unSeqPageReaders.isEmpty()
|| firstPageReader != null
@@ -383,7 +392,7 @@ public class SeriesScanUtil {
*/
@SuppressWarnings("squid:S3776")
// Suppress high Cognitive Complexity warning
- boolean hasNextPage() throws IOException {
+ public boolean hasNextPage() throws IOException {
/*
* has overlapped data before
@@ -601,7 +610,7 @@ public class SeriesScanUtil {
}
/** This method should only be used when the method isPageOverlapped() return true. */
- TsBlock nextPage() throws IOException {
+ public TsBlock nextPage() throws IOException {
if (hasCachedNextOverlappedPage) {
hasCachedNextOverlappedPage = false;
diff --git a/server/src/main/java/org/apache/iotdb/db/query/context/QueryContext.java b/server/src/main/java/org/apache/iotdb/db/query/context/QueryContext.java
index f8de63034c..5ce05c1816 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/context/QueryContext.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/context/QueryContext.java
@@ -48,7 +48,7 @@ public class QueryContext {
*/
private final Map<String, List<Modification>> fileModCache = new HashMap<>();
- private long queryId;
+ protected long queryId;
private long queryTimeLowerBound = Long.MIN_VALUE;
diff --git a/server/src/test/java/org/apache/iotdb/db/engine/compaction/ReadPointCompactionPerformerTest.java b/server/src/test/java/org/apache/iotdb/db/engine/compaction/ReadPointCompactionPerformerTest.java
index c0064917f8..f0ee6155a1 100644
--- a/server/src/test/java/org/apache/iotdb/db/engine/compaction/ReadPointCompactionPerformerTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/engine/compaction/ReadPointCompactionPerformerTest.java
@@ -24,14 +24,16 @@ import org.apache.iotdb.commons.path.PartialPath;
import org.apache.iotdb.db.conf.IoTDBDescriptor;
import org.apache.iotdb.db.engine.compaction.performer.ICompactionPerformer;
import org.apache.iotdb.db.engine.compaction.performer.impl.ReadPointCompactionPerformer;
+import org.apache.iotdb.db.engine.compaction.reader.IDataBlockReader;
+import org.apache.iotdb.db.engine.compaction.reader.SeriesDataBlockReader;
import org.apache.iotdb.db.engine.compaction.task.CompactionTaskSummary;
import org.apache.iotdb.db.engine.compaction.utils.CompactionFileGeneratorUtils;
import org.apache.iotdb.db.engine.storagegroup.TsFileResource;
import org.apache.iotdb.db.exception.StorageEngineException;
import org.apache.iotdb.db.metadata.path.AlignedPath;
import org.apache.iotdb.db.metadata.path.MeasurementPath;
+import org.apache.iotdb.db.mpp.execution.fragment.FragmentInstanceContext;
import org.apache.iotdb.db.query.control.FileReaderManager;
-import org.apache.iotdb.db.query.reader.series.SeriesRawDataBatchReader;
import org.apache.iotdb.db.utils.EnvironmentUtils;
import org.apache.iotdb.tsfile.common.conf.TSFileConfig;
import org.apache.iotdb.tsfile.common.conf.TSFileDescriptor;
@@ -41,8 +43,8 @@ import org.apache.iotdb.tsfile.file.header.ChunkGroupHeader;
import org.apache.iotdb.tsfile.file.header.ChunkHeader;
import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
import org.apache.iotdb.tsfile.read.TsFileSequenceReader;
-import org.apache.iotdb.tsfile.read.common.BatchData;
-import org.apache.iotdb.tsfile.read.reader.IBatchReader;
+import org.apache.iotdb.tsfile.read.common.IBatchDataIterator;
+import org.apache.iotdb.tsfile.read.common.block.TsBlock;
import org.apache.iotdb.tsfile.utils.Pair;
import org.apache.iotdb.tsfile.utils.TsFileGeneratorUtils;
import org.apache.iotdb.tsfile.utils.TsPrimitiveType;
@@ -99,26 +101,27 @@ public class ReadPointCompactionPerformerTest extends AbstractCompactionTest {
COMPACTION_TEST_SG + PATH_SEPARATOR + "d1",
"s1",
new MeasurementSchema("s1", TSDataType.INT64));
- IBatchReader tsFilesReader =
- new SeriesRawDataBatchReader(
+ IDataBlockReader tsBlockReader =
+ new SeriesDataBlockReader(
path,
TSDataType.INT64,
- EnvironmentUtils.TEST_QUERY_CONTEXT,
+ FragmentInstanceContext.createFragmentInstanceContextForCompaction(
+ EnvironmentUtils.TEST_QUERY_CONTEXT.getQueryId()),
seqResources,
unseqResources,
- null,
- null,
true);
int count = 0;
- while (tsFilesReader.hasNextBatch()) {
- BatchData batchData = tsFilesReader.nextBatch();
- while (batchData.hasCurrent()) {
- assertEquals(batchData.currentTime(), batchData.currentValue());
+ while (tsBlockReader.hasNextBatch()) {
+ TsBlock block = tsBlockReader.nextBatch();
+ IBatchDataIterator iterator = block.getTsBlockSingleColumnIterator();
+ while (iterator.hasNext()) {
+ assertEquals(iterator.currentTime(), iterator.currentValue());
count++;
- batchData.next();
+ iterator.next();
}
}
- tsFilesReader.close();
+
+ tsBlockReader.close();
assertEquals(500, count);
List<TsFileResource> targetResources =
@@ -129,26 +132,28 @@ public class ReadPointCompactionPerformerTest extends AbstractCompactionTest {
performer.perform();
CompactionUtils.moveTargetFile(targetResources, true, COMPACTION_TEST_SG);
- tsFilesReader =
- new SeriesRawDataBatchReader(
+ tsBlockReader =
+ new SeriesDataBlockReader(
path,
TSDataType.INT64,
- EnvironmentUtils.TEST_QUERY_CONTEXT,
+ FragmentInstanceContext.createFragmentInstanceContextForCompaction(
+ EnvironmentUtils.TEST_QUERY_CONTEXT.getQueryId()),
targetResources,
new ArrayList<>(),
- null,
- null,
true);
+
count = 0;
- while (tsFilesReader.hasNextBatch()) {
- BatchData batchData = tsFilesReader.nextBatch();
- while (batchData.hasCurrent()) {
- assertEquals(batchData.currentTime(), batchData.currentValue());
+ while (tsBlockReader.hasNextBatch()) {
+ TsBlock block = tsBlockReader.nextBatch();
+ IBatchDataIterator iterator = block.getTsBlockSingleColumnIterator();
+ while (iterator.hasNext()) {
+ assertEquals(iterator.currentTime(), iterator.currentValue());
count++;
- batchData.next();
+ iterator.next();
}
}
- tsFilesReader.close();
+
+ tsBlockReader.close();
assertEquals(500, count);
}
@@ -175,30 +180,31 @@ public class ReadPointCompactionPerformerTest extends AbstractCompactionTest {
COMPACTION_TEST_SG + PATH_SEPARATOR + "d" + i,
"s" + j,
new MeasurementSchema("s" + j, TSDataType.INT64));
- IBatchReader tsFilesReader =
- new SeriesRawDataBatchReader(
+ IDataBlockReader tsBlockReader =
+ new SeriesDataBlockReader(
path,
TSDataType.INT64,
- EnvironmentUtils.TEST_QUERY_CONTEXT,
+ FragmentInstanceContext.createFragmentInstanceContextForCompaction(
+ EnvironmentUtils.TEST_QUERY_CONTEXT.getQueryId()),
seqResources,
unseqResources,
- null,
- null,
true);
int count = 0;
- while (tsFilesReader.hasNextBatch()) {
- BatchData batchData = tsFilesReader.nextBatch();
- while (batchData.hasCurrent()) {
- if (batchData.currentTime() >= 600) {
- assertEquals(batchData.currentTime() + 200, batchData.currentValue());
+ while (tsBlockReader.hasNextBatch()) {
+ TsBlock block = tsBlockReader.nextBatch();
+ IBatchDataIterator iterator = block.getTsBlockSingleColumnIterator();
+ while (iterator.hasNext()) {
+ if (iterator.currentTime() >= 600) {
+ assertEquals(iterator.currentTime() + 200, iterator.currentValue());
} else {
- assertEquals(batchData.currentTime(), batchData.currentValue());
+ assertEquals(iterator.currentTime(), iterator.currentValue());
}
count++;
- batchData.next();
+ iterator.next();
}
}
- tsFilesReader.close();
+
+ tsBlockReader.close();
if (i < 2 && j < 3) {
assertEquals(400, count);
} else if (i < 3) {
@@ -240,30 +246,31 @@ public class ReadPointCompactionPerformerTest extends AbstractCompactionTest {
COMPACTION_TEST_SG + PATH_SEPARATOR + "d" + i,
"s" + j,
new MeasurementSchema("s" + j, TSDataType.INT64));
- IBatchReader tsFilesReader =
- new SeriesRawDataBatchReader(
+ IDataBlockReader tsBlockReader =
+ new SeriesDataBlockReader(
path,
TSDataType.INT64,
- EnvironmentUtils.TEST_QUERY_CONTEXT,
+ FragmentInstanceContext.createFragmentInstanceContextForCompaction(
+ EnvironmentUtils.TEST_QUERY_CONTEXT.getQueryId()),
targetResources,
new ArrayList<>(),
- null,
- null,
true);
int count = 0;
- while (tsFilesReader.hasNextBatch()) {
- BatchData batchData = tsFilesReader.nextBatch();
- while (batchData.hasCurrent()) {
- if (batchData.currentTime() >= 600) {
- assertEquals(batchData.currentTime() + 200, batchData.currentValue());
+ while (tsBlockReader.hasNextBatch()) {
+ TsBlock block = tsBlockReader.nextBatch();
+ IBatchDataIterator iterator = block.getTsBlockSingleColumnIterator();
+
+ while (iterator.hasNext()) {
+ if (iterator.currentTime() >= 600) {
+ assertEquals(iterator.currentTime() + 200, iterator.currentValue());
} else {
- assertEquals(batchData.currentTime(), batchData.currentValue());
+ assertEquals(iterator.currentTime(), iterator.currentValue());
}
count++;
- batchData.next();
+ iterator.next();
}
}
- tsFilesReader.close();
+ tsBlockReader.close();
if (i < 2 && j < 3) {
assertEquals(400, count);
} else if (i < 3 && j < 5) {
@@ -290,26 +297,26 @@ public class ReadPointCompactionPerformerTest extends AbstractCompactionTest {
COMPACTION_TEST_SG + PATH_SEPARATOR + "d" + i,
"s1",
new MeasurementSchema("s" + j, TSDataType.INT64));
- IBatchReader tsFilesReader =
- new SeriesRawDataBatchReader(
+ IDataBlockReader tsBlockReader =
+ new SeriesDataBlockReader(
path,
TSDataType.INT64,
- EnvironmentUtils.TEST_QUERY_CONTEXT,
+ FragmentInstanceContext.createFragmentInstanceContextForCompaction(
+ EnvironmentUtils.TEST_QUERY_CONTEXT.getQueryId()),
seqResources,
unseqResources,
- null,
- null,
true);
int count = 0;
- while (tsFilesReader.hasNextBatch()) {
- BatchData batchData = tsFilesReader.nextBatch();
- while (batchData.hasCurrent()) {
- assertEquals(batchData.currentTime(), batchData.currentValue());
+ while (tsBlockReader.hasNextBatch()) {
+ TsBlock block = tsBlockReader.nextBatch();
+ IBatchDataIterator iterator = block.getTsBlockSingleColumnIterator();
+ while (iterator.hasNext()) {
+ assertEquals(iterator.currentTime(), iterator.currentValue());
count++;
- batchData.next();
+ iterator.next();
}
}
- tsFilesReader.close();
+ tsBlockReader.close();
assertEquals(500, count);
}
}
@@ -329,26 +336,26 @@ public class ReadPointCompactionPerformerTest extends AbstractCompactionTest {
COMPACTION_TEST_SG + PATH_SEPARATOR + "d" + i,
"s1",
new MeasurementSchema("s" + j, TSDataType.INT64));
- IBatchReader tsFilesReader =
- new SeriesRawDataBatchReader(
+ IDataBlockReader tsBlockReader =
+ new SeriesDataBlockReader(
path,
TSDataType.INT64,
- EnvironmentUtils.TEST_QUERY_CONTEXT,
+ FragmentInstanceContext.createFragmentInstanceContextForCompaction(
+ EnvironmentUtils.TEST_QUERY_CONTEXT.getQueryId()),
new ArrayList<>(),
targetResources,
- null,
- null,
true);
int count = 0;
- while (tsFilesReader.hasNextBatch()) {
- BatchData batchData = tsFilesReader.nextBatch();
- while (batchData.hasCurrent()) {
- assertEquals(batchData.currentTime(), batchData.currentValue());
+ while (tsBlockReader.hasNextBatch()) {
+ TsBlock block = tsBlockReader.nextBatch();
+ IBatchDataIterator iterator = block.getTsBlockSingleColumnIterator();
+ while (iterator.hasNext()) {
+ assertEquals(iterator.currentTime(), iterator.currentValue());
count++;
- batchData.next();
+ iterator.next();
}
}
- tsFilesReader.close();
+ tsBlockReader.close();
assertEquals(500, count);
}
}
@@ -380,34 +387,34 @@ public class ReadPointCompactionPerformerTest extends AbstractCompactionTest {
COMPACTION_TEST_SG + PATH_SEPARATOR + "d" + i,
"s" + j,
new MeasurementSchema("s" + j, TSDataType.INT64));
- IBatchReader tsFilesReader =
- new SeriesRawDataBatchReader(
+ IDataBlockReader tsBlockReader =
+ new SeriesDataBlockReader(
path,
TSDataType.INT64,
- EnvironmentUtils.TEST_QUERY_CONTEXT,
+ FragmentInstanceContext.createFragmentInstanceContextForCompaction(
+ EnvironmentUtils.TEST_QUERY_CONTEXT.getQueryId()),
seqResources,
unseqResources,
- null,
- null,
true);
int count = 0;
- while (tsFilesReader.hasNextBatch()) {
- BatchData batchData = tsFilesReader.nextBatch();
- while (batchData.hasCurrent()) {
- if ((100 <= batchData.currentTime() && batchData.currentTime() < 170)
- || (270 <= batchData.currentTime() && batchData.currentTime() < 340)) {
- assertEquals(batchData.currentTime() + 200, batchData.currentValue());
- } else if ((200 <= batchData.currentTime() && batchData.currentTime() < 270)
- || (370 <= batchData.currentTime() && batchData.currentTime() < 440)) {
- assertEquals(batchData.currentTime() + 100, batchData.currentValue());
+ while (tsBlockReader.hasNextBatch()) {
+ TsBlock block = tsBlockReader.nextBatch();
+ IBatchDataIterator iterator = block.getTsBlockSingleColumnIterator();
+ while (iterator.hasNext()) {
+ if ((100 <= iterator.currentTime() && iterator.currentTime() < 170)
+ || (270 <= iterator.currentTime() && iterator.currentTime() < 340)) {
+ assertEquals(iterator.currentTime() + 200, iterator.currentValue());
+ } else if ((200 <= iterator.currentTime() && iterator.currentTime() < 270)
+ || (370 <= iterator.currentTime() && iterator.currentTime() < 440)) {
+ assertEquals(iterator.currentTime() + 100, iterator.currentValue());
} else {
- assertEquals(batchData.currentTime(), batchData.currentValue());
+ assertEquals(iterator.currentTime(), iterator.currentValue());
}
count++;
- batchData.next();
+ iterator.next();
}
}
- tsFilesReader.close();
+ tsBlockReader.close();
if (i < 2 && j < 3) {
assertEquals(410, count);
} else if (i < 3 && j < 5) {
@@ -437,34 +444,35 @@ public class ReadPointCompactionPerformerTest extends AbstractCompactionTest {
COMPACTION_TEST_SG + PATH_SEPARATOR + "d" + i,
"s" + j,
new MeasurementSchema("s" + j, TSDataType.INT64));
- IBatchReader tsFilesReader =
- new SeriesRawDataBatchReader(
+
+ IDataBlockReader tsBlockReader =
+ new SeriesDataBlockReader(
path,
TSDataType.INT64,
- EnvironmentUtils.TEST_QUERY_CONTEXT,
+ FragmentInstanceContext.createFragmentInstanceContextForCompaction(
+ EnvironmentUtils.TEST_QUERY_CONTEXT.getQueryId()),
new ArrayList<>(),
targetResources,
- null,
- null,
true);
int count = 0;
- while (tsFilesReader.hasNextBatch()) {
- BatchData batchData = tsFilesReader.nextBatch();
- while (batchData.hasCurrent()) {
- if ((100 <= batchData.currentTime() && batchData.currentTime() < 170)
- || (270 <= batchData.currentTime() && batchData.currentTime() < 340)) {
- assertEquals(batchData.currentTime() + 200, batchData.currentValue());
- } else if ((200 <= batchData.currentTime() && batchData.currentTime() < 270)
- || (370 <= batchData.currentTime() && batchData.currentTime() < 440)) {
- assertEquals(batchData.currentTime() + 100, batchData.currentValue());
+ while (tsBlockReader.hasNextBatch()) {
+ TsBlock block = tsBlockReader.nextBatch();
+ IBatchDataIterator iterator = block.getTsBlockSingleColumnIterator();
+ while (iterator.hasNext()) {
+ if ((100 <= iterator.currentTime() && iterator.currentTime() < 170)
+ || (270 <= iterator.currentTime() && iterator.currentTime() < 340)) {
+ assertEquals(iterator.currentTime() + 200, iterator.currentValue());
+ } else if ((200 <= iterator.currentTime() && iterator.currentTime() < 270)
+ || (370 <= iterator.currentTime() && iterator.currentTime() < 440)) {
+ assertEquals(iterator.currentTime() + 100, iterator.currentValue());
} else {
- assertEquals(batchData.currentTime(), batchData.currentValue());
+ assertEquals(iterator.currentTime(), iterator.currentValue());
}
count++;
- batchData.next();
+ iterator.next();
}
}
- tsFilesReader.close();
+ tsBlockReader.close();
if (i < 2 && j < 3) {
assertEquals(410, count);
} else if (i < 3 && j < 5) {
@@ -522,33 +530,33 @@ public class ReadPointCompactionPerformerTest extends AbstractCompactionTest {
COMPACTION_TEST_SG + PATH_SEPARATOR + "d" + i,
"s" + j,
new MeasurementSchema("s" + j, TSDataType.INT64));
- IBatchReader tsFilesReader =
- new SeriesRawDataBatchReader(
+ IDataBlockReader tsBlockReader =
+ new SeriesDataBlockReader(
path,
TSDataType.INT64,
- EnvironmentUtils.TEST_QUERY_CONTEXT,
+ FragmentInstanceContext.createFragmentInstanceContextForCompaction(
+ EnvironmentUtils.TEST_QUERY_CONTEXT.getQueryId()),
seqResources,
unseqResources,
- null,
- null,
true);
int count = 0;
- while (tsFilesReader.hasNextBatch()) {
- BatchData batchData = tsFilesReader.nextBatch();
- while (batchData.hasCurrent()) {
- if (batchData.currentTime() < 200
- || (batchData.currentTime() < 550 && batchData.currentTime() >= 500)) {
- assertEquals(batchData.currentTime(), batchData.currentValue());
- } else if (batchData.currentTime() < 850) {
- assertEquals(batchData.currentTime() + 100, batchData.currentValue());
+ while (tsBlockReader.hasNextBatch()) {
+ TsBlock block = tsBlockReader.nextBatch();
+ IBatchDataIterator iterator = block.getTsBlockSingleColumnIterator();
+ while (iterator.hasNext()) {
+ if (iterator.currentTime() < 200
+ || (iterator.currentTime() < 550 && iterator.currentTime() >= 500)) {
+ assertEquals(iterator.currentTime(), iterator.currentValue());
+ } else if (iterator.currentTime() < 850) {
+ assertEquals(iterator.currentTime() + 100, iterator.currentValue());
} else {
- assertEquals(batchData.currentTime() + 200, batchData.currentValue());
+ assertEquals(iterator.currentTime() + 200, iterator.currentValue());
}
count++;
- batchData.next();
+ iterator.next();
}
}
- tsFilesReader.close();
+ tsBlockReader.close();
if ((i == 0 && j == 0) || (i == 0 && j == 1) || (i == 2 && j == 4) || (i == 3 && j == 5)) {
assertEquals(0, count);
} else if (i < 2 && j < 3) {
@@ -575,33 +583,33 @@ public class ReadPointCompactionPerformerTest extends AbstractCompactionTest {
COMPACTION_TEST_SG + PATH_SEPARATOR + "d" + i,
"s" + j,
new MeasurementSchema("s" + j, TSDataType.INT64));
- IBatchReader tsFilesReader =
- new SeriesRawDataBatchReader(
+ IDataBlockReader tsBlockReader =
+ new SeriesDataBlockReader(
path,
TSDataType.INT64,
- EnvironmentUtils.TEST_QUERY_CONTEXT,
+ FragmentInstanceContext.createFragmentInstanceContextForCompaction(
+ EnvironmentUtils.TEST_QUERY_CONTEXT.getQueryId()),
new ArrayList<>(),
targetResources,
- null,
- null,
true);
int count = 0;
- while (tsFilesReader.hasNextBatch()) {
- BatchData batchData = tsFilesReader.nextBatch();
- while (batchData.hasCurrent()) {
- if (batchData.currentTime() < 200
- || (batchData.currentTime() < 550 && batchData.currentTime() >= 500)) {
- assertEquals(batchData.currentTime(), batchData.currentValue());
- } else if (batchData.currentTime() < 850) {
- assertEquals(batchData.currentTime() + 100, batchData.currentValue());
+ while (tsBlockReader.hasNextBatch()) {
+ TsBlock block = tsBlockReader.nextBatch();
+ IBatchDataIterator iterator = block.getTsBlockSingleColumnIterator();
+ while (iterator.hasNext()) {
+ if (iterator.currentTime() < 200
+ || (iterator.currentTime() < 550 && iterator.currentTime() >= 500)) {
+ assertEquals(iterator.currentTime(), iterator.currentValue());
+ } else if (iterator.currentTime() < 850) {
+ assertEquals(iterator.currentTime() + 100, iterator.currentValue());
} else {
- assertEquals(batchData.currentTime() + 200, batchData.currentValue());
+ assertEquals(iterator.currentTime() + 200, iterator.currentValue());
}
count++;
- batchData.next();
+ iterator.next();
}
}
- tsFilesReader.close();
+ tsBlockReader.close();
if ((i == 0 && j == 0) || (i == 0 && j == 1) || (i == 2 && j == 4) || (i == 3 && j == 5)) {
assertEquals(0, count);
} else if (i < 2 && j < 3) {
@@ -650,33 +658,33 @@ public class ReadPointCompactionPerformerTest extends AbstractCompactionTest {
COMPACTION_TEST_SG + PATH_SEPARATOR + "d" + i,
"s" + j,
new MeasurementSchema("s" + j, TSDataType.INT64));
- IBatchReader tsFilesReader =
- new SeriesRawDataBatchReader(
+ IDataBlockReader tsBlockReader =
+ new SeriesDataBlockReader(
path,
TSDataType.INT64,
- EnvironmentUtils.TEST_QUERY_CONTEXT,
+ FragmentInstanceContext.createFragmentInstanceContextForCompaction(
+ EnvironmentUtils.TEST_QUERY_CONTEXT.getQueryId()),
seqResources,
unseqResources,
- null,
- null,
true);
int count = 0;
- while (tsFilesReader.hasNextBatch()) {
- BatchData batchData = tsFilesReader.nextBatch();
- while (batchData.hasCurrent()) {
- if (batchData.currentTime() < 200
- || (batchData.currentTime() < 550 && batchData.currentTime() >= 500)) {
- assertEquals(batchData.currentTime(), batchData.currentValue());
- } else if (batchData.currentTime() < 850) {
- assertEquals(batchData.currentTime() + 100, batchData.currentValue());
+ while (tsBlockReader.hasNextBatch()) {
+ TsBlock block = tsBlockReader.nextBatch();
+ IBatchDataIterator iterator = block.getTsBlockSingleColumnIterator();
+ while (iterator.hasNext()) {
+ if (iterator.currentTime() < 200
+ || (iterator.currentTime() < 550 && iterator.currentTime() >= 500)) {
+ assertEquals(iterator.currentTime(), iterator.currentValue());
+ } else if (iterator.currentTime() < 850) {
+ assertEquals(iterator.currentTime() + 100, iterator.currentValue());
} else {
- assertEquals(batchData.currentTime() + 200, batchData.currentValue());
+ assertEquals(iterator.currentTime() + 200, iterator.currentValue());
}
count++;
- batchData.next();
+ iterator.next();
}
}
- tsFilesReader.close();
+ tsBlockReader.close();
if (i == 0) {
assertEquals(0, count);
} else if (i < 2 && j < 3) {
@@ -703,33 +711,33 @@ public class ReadPointCompactionPerformerTest extends AbstractCompactionTest {
COMPACTION_TEST_SG + PATH_SEPARATOR + "d" + i,
"s" + j,
new MeasurementSchema("s" + j, TSDataType.INT64));
- IBatchReader tsFilesReader =
- new SeriesRawDataBatchReader(
+ IDataBlockReader tsBlockReader =
+ new SeriesDataBlockReader(
path,
TSDataType.INT64,
- EnvironmentUtils.TEST_QUERY_CONTEXT,
+ FragmentInstanceContext.createFragmentInstanceContextForCompaction(
+ EnvironmentUtils.TEST_QUERY_CONTEXT.getQueryId()),
new ArrayList<>(),
targetResources,
- null,
- null,
true);
int count = 0;
- while (tsFilesReader.hasNextBatch()) {
- BatchData batchData = tsFilesReader.nextBatch();
- while (batchData.hasCurrent()) {
- if (batchData.currentTime() < 200
- || (batchData.currentTime() < 550 && batchData.currentTime() >= 500)) {
- assertEquals(batchData.currentTime(), batchData.currentValue());
- } else if (batchData.currentTime() < 850) {
- assertEquals(batchData.currentTime() + 100, batchData.currentValue());
+ while (tsBlockReader.hasNextBatch()) {
+ TsBlock block = tsBlockReader.nextBatch();
+ IBatchDataIterator iterator = block.getTsBlockSingleColumnIterator();
+ while (iterator.hasNext()) {
+ if (iterator.currentTime() < 200
+ || (iterator.currentTime() < 550 && iterator.currentTime() >= 500)) {
+ assertEquals(iterator.currentTime(), iterator.currentValue());
+ } else if (iterator.currentTime() < 850) {
+ assertEquals(iterator.currentTime() + 100, iterator.currentValue());
} else {
- assertEquals(batchData.currentTime() + 200, batchData.currentValue());
+ assertEquals(iterator.currentTime() + 200, iterator.currentValue());
}
count++;
- batchData.next();
+ iterator.next();
}
}
- tsFilesReader.close();
+ tsBlockReader.close();
if (i == 0) {
assertEquals(0, count);
} else if (i < 2 && j < 3) {
@@ -780,25 +788,25 @@ public class ReadPointCompactionPerformerTest extends AbstractCompactionTest {
COMPACTION_TEST_SG + PATH_SEPARATOR + "d" + i,
"s" + j,
new MeasurementSchema("s" + j, TSDataType.INT64));
- IBatchReader tsFilesReader =
- new SeriesRawDataBatchReader(
+ IDataBlockReader tsBlockReader =
+ new SeriesDataBlockReader(
path,
TSDataType.INT64,
- EnvironmentUtils.TEST_QUERY_CONTEXT,
+ FragmentInstanceContext.createFragmentInstanceContextForCompaction(
+ EnvironmentUtils.TEST_QUERY_CONTEXT.getQueryId()),
seqResources,
unseqResources,
- null,
- null,
true);
int count = 0;
- while (tsFilesReader.hasNextBatch()) {
- BatchData batchData = tsFilesReader.nextBatch();
- while (batchData.hasCurrent()) {
+ while (tsBlockReader.hasNextBatch()) {
+ TsBlock block = tsBlockReader.nextBatch();
+ IBatchDataIterator iterator = block.getTsBlockSingleColumnIterator();
+ while (iterator.hasNext()) {
count++;
- batchData.next();
+ iterator.next();
}
}
- tsFilesReader.close();
+ tsBlockReader.close();
assertEquals(0, count);
}
}
@@ -817,25 +825,25 @@ public class ReadPointCompactionPerformerTest extends AbstractCompactionTest {
COMPACTION_TEST_SG + PATH_SEPARATOR + "d" + i,
"s" + j,
new MeasurementSchema("s" + j, TSDataType.INT64));
- IBatchReader tsFilesReader =
- new SeriesRawDataBatchReader(
+ IDataBlockReader tsBlockReader =
+ new SeriesDataBlockReader(
path,
TSDataType.INT64,
- EnvironmentUtils.TEST_QUERY_CONTEXT,
+ FragmentInstanceContext.createFragmentInstanceContextForCompaction(
+ EnvironmentUtils.TEST_QUERY_CONTEXT.getQueryId()),
new ArrayList<>(),
targetResources,
- null,
- null,
true);
int count = 0;
- while (tsFilesReader.hasNextBatch()) {
- BatchData batchData = tsFilesReader.nextBatch();
- while (batchData.hasCurrent()) {
+ while (tsBlockReader.hasNextBatch()) {
+ TsBlock block = tsBlockReader.nextBatch();
+ IBatchDataIterator iterator = block.getTsBlockSingleColumnIterator();
+ while (iterator.hasNext()) {
count++;
- batchData.next();
+ iterator.next();
}
}
- tsFilesReader.close();
+ tsBlockReader.close();
assertEquals(0, count);
}
}
@@ -860,28 +868,29 @@ public class ReadPointCompactionPerformerTest extends AbstractCompactionTest {
COMPACTION_TEST_SG + PATH_SEPARATOR + "d" + i,
Collections.singletonList("s" + j),
schemas);
- IBatchReader tsFilesReader =
- new SeriesRawDataBatchReader(
+ IDataBlockReader tsBlockReader =
+ new SeriesDataBlockReader(
path,
TSDataType.VECTOR,
- EnvironmentUtils.TEST_QUERY_CONTEXT,
+ FragmentInstanceContext.createFragmentInstanceContextForCompaction(
+ EnvironmentUtils.TEST_QUERY_CONTEXT.getQueryId()),
seqResources,
unseqResources,
- null,
- null,
true);
+
int count = 0;
- while (tsFilesReader.hasNextBatch()) {
- BatchData batchData = tsFilesReader.nextBatch();
- while (batchData.hasCurrent()) {
+ while (tsBlockReader.hasNextBatch()) {
+ TsBlock block = tsBlockReader.nextBatch();
+ IBatchDataIterator iterator = block.getTsBlockAlignedRowIterator();
+ while (iterator.hasNext()) {
assertEquals(
- batchData.currentTime(),
- ((TsPrimitiveType[]) (batchData.currentValue()))[0].getValue());
+ iterator.currentTime(),
+ ((TsPrimitiveType[]) (iterator.currentValue()))[0].getValue());
count++;
- batchData.next();
+ iterator.next();
}
}
- tsFilesReader.close();
+ tsBlockReader.close();
assertEquals(500, count);
}
}
@@ -904,28 +913,28 @@ public class ReadPointCompactionPerformerTest extends AbstractCompactionTest {
COMPACTION_TEST_SG + PATH_SEPARATOR + "d" + i,
Collections.singletonList("s" + j),
schemas);
- IBatchReader tsFilesReader =
- new SeriesRawDataBatchReader(
+ IDataBlockReader tsBlockReader =
+ new SeriesDataBlockReader(
path,
TSDataType.VECTOR,
- EnvironmentUtils.TEST_QUERY_CONTEXT,
+ FragmentInstanceContext.createFragmentInstanceContextForCompaction(
+ EnvironmentUtils.TEST_QUERY_CONTEXT.getQueryId()),
targetResources,
new ArrayList<>(),
- null,
- null,
true);
int count = 0;
- while (tsFilesReader.hasNextBatch()) {
- BatchData batchData = tsFilesReader.nextBatch();
- while (batchData.hasCurrent()) {
+ while (tsBlockReader.hasNextBatch()) {
+ TsBlock block = tsBlockReader.nextBatch();
+ IBatchDataIterator iterator = block.getTsBlockAlignedRowIterator();
+ while (iterator.hasNext()) {
assertEquals(
- batchData.currentTime(),
- ((TsPrimitiveType[]) (batchData.currentValue()))[0].getValue());
+ iterator.currentTime(),
+ ((TsPrimitiveType[]) (iterator.currentValue()))[0].getValue());
count++;
- batchData.next();
+ iterator.next();
}
}
- tsFilesReader.close();
+ tsBlockReader.close();
assertEquals(500, count);
}
}
@@ -959,34 +968,34 @@ public class ReadPointCompactionPerformerTest extends AbstractCompactionTest {
COMPACTION_TEST_SG + PATH_SEPARATOR + "d" + i,
Collections.singletonList("s" + j),
schemas);
- IBatchReader tsFilesReader =
- new SeriesRawDataBatchReader(
+ IDataBlockReader tsBlockReader =
+ new SeriesDataBlockReader(
path,
TSDataType.VECTOR,
- EnvironmentUtils.TEST_QUERY_CONTEXT,
+ FragmentInstanceContext.createFragmentInstanceContextForCompaction(
+ EnvironmentUtils.TEST_QUERY_CONTEXT.getQueryId()),
seqResources,
unseqResources,
- null,
- null,
true);
int count = 0;
- while (tsFilesReader.hasNextBatch()) {
- BatchData batchData = tsFilesReader.nextBatch();
- while (batchData.hasCurrent()) {
- if (batchData.currentTime() >= 600) {
+ while (tsBlockReader.hasNextBatch()) {
+ TsBlock block = tsBlockReader.nextBatch();
+ IBatchDataIterator iterator = block.getTsBlockAlignedRowIterator();
+ while (iterator.hasNext()) {
+ if (iterator.currentTime() >= 600) {
assertEquals(
- batchData.currentTime() + 200,
- ((TsPrimitiveType[]) (batchData.currentValue()))[0].getValue());
+ iterator.currentTime() + 200,
+ ((TsPrimitiveType[]) (iterator.currentValue()))[0].getValue());
} else {
assertEquals(
- batchData.currentTime(),
- ((TsPrimitiveType[]) (batchData.currentValue()))[0].getValue());
+ iterator.currentTime(),
+ ((TsPrimitiveType[]) (iterator.currentValue()))[0].getValue());
}
count++;
- batchData.next();
+ iterator.next();
}
}
- tsFilesReader.close();
+ tsBlockReader.close();
if (i < TsFileGeneratorUtils.getAlignDeviceOffset() + 2 && j < 3) {
assertEquals(400, count);
} else if (i < TsFileGeneratorUtils.getAlignDeviceOffset() + 3 && j < 5) {
@@ -1015,34 +1024,34 @@ public class ReadPointCompactionPerformerTest extends AbstractCompactionTest {
COMPACTION_TEST_SG + PATH_SEPARATOR + "d" + i,
Collections.singletonList("s" + j),
schemas);
- IBatchReader tsFilesReader =
- new SeriesRawDataBatchReader(
+ IDataBlockReader tsBlockReader =
+ new SeriesDataBlockReader(
path,
TSDataType.VECTOR,
- EnvironmentUtils.TEST_QUERY_CONTEXT,
+ FragmentInstanceContext.createFragmentInstanceContextForCompaction(
+ EnvironmentUtils.TEST_QUERY_CONTEXT.getQueryId()),
targetResources,
new ArrayList<>(),
- null,
- null,
true);
int count = 0;
- while (tsFilesReader.hasNextBatch()) {
- BatchData batchData = tsFilesReader.nextBatch();
- while (batchData.hasCurrent()) {
- if (batchData.currentTime() >= 600) {
+ while (tsBlockReader.hasNextBatch()) {
+ TsBlock block = tsBlockReader.nextBatch();
+ IBatchDataIterator iterator = block.getTsBlockAlignedRowIterator();
+ while (iterator.hasNext()) {
+ if (iterator.currentTime() >= 600) {
assertEquals(
- batchData.currentTime() + 200,
- ((TsPrimitiveType[]) (batchData.currentValue()))[0].getValue());
+ iterator.currentTime() + 200,
+ ((TsPrimitiveType[]) (iterator.currentValue()))[0].getValue());
} else {
assertEquals(
- batchData.currentTime(),
- ((TsPrimitiveType[]) (batchData.currentValue()))[0].getValue());
+ iterator.currentTime(),
+ ((TsPrimitiveType[]) (iterator.currentValue()))[0].getValue());
}
count++;
- batchData.next();
+ iterator.next();
}
}
- tsFilesReader.close();
+ tsBlockReader.close();
if (i < TsFileGeneratorUtils.getAlignDeviceOffset() + 2 && j < 3) {
assertEquals(400, count);
} else if (i < TsFileGeneratorUtils.getAlignDeviceOffset() + 3 && j < 5) {
@@ -1080,34 +1089,34 @@ public class ReadPointCompactionPerformerTest extends AbstractCompactionTest {
COMPACTION_TEST_SG + PATH_SEPARATOR + "d" + i,
Collections.singletonList("s" + j),
schemas);
- IBatchReader tsFilesReader =
- new SeriesRawDataBatchReader(
+ IDataBlockReader tsBlockReader =
+ new SeriesDataBlockReader(
path,
TSDataType.VECTOR,
- EnvironmentUtils.TEST_QUERY_CONTEXT,
+ FragmentInstanceContext.createFragmentInstanceContextForCompaction(
+ EnvironmentUtils.TEST_QUERY_CONTEXT.getQueryId()),
seqResources,
unseqResources,
- null,
- null,
true);
int count = 0;
- while (tsFilesReader.hasNextBatch()) {
- BatchData batchData = tsFilesReader.nextBatch();
- while (batchData.hasCurrent()) {
- if (batchData.currentTime() >= 600) {
+ while (tsBlockReader.hasNextBatch()) {
+ TsBlock block = tsBlockReader.nextBatch();
+ IBatchDataIterator iterator = block.getTsBlockAlignedRowIterator();
+ while (iterator.hasNext()) {
+ if (iterator.currentTime() >= 600) {
assertEquals(
- batchData.currentTime() + 200,
- ((TsPrimitiveType[]) (batchData.currentValue()))[0].getValue());
+ iterator.currentTime() + 200,
+ ((TsPrimitiveType[]) (iterator.currentValue()))[0].getValue());
} else {
assertEquals(
- batchData.currentTime(),
- ((TsPrimitiveType[]) (batchData.currentValue()))[0].getValue());
+ iterator.currentTime(),
+ ((TsPrimitiveType[]) (iterator.currentValue()))[0].getValue());
}
count++;
- batchData.next();
+ iterator.next();
}
}
- tsFilesReader.close();
+ tsBlockReader.close();
if (i < TsFileGeneratorUtils.getAlignDeviceOffset() + 2 && j < 3) {
assertEquals(400, count);
} else if (i < TsFileGeneratorUtils.getAlignDeviceOffset() + 3 && j < 5) {
@@ -1136,34 +1145,34 @@ public class ReadPointCompactionPerformerTest extends AbstractCompactionTest {
COMPACTION_TEST_SG + PATH_SEPARATOR + "d" + i,
Collections.singletonList("s" + j),
schemas);
- IBatchReader tsFilesReader =
- new SeriesRawDataBatchReader(
+ IDataBlockReader tsBlockReader =
+ new SeriesDataBlockReader(
path,
TSDataType.VECTOR,
- EnvironmentUtils.TEST_QUERY_CONTEXT,
+ FragmentInstanceContext.createFragmentInstanceContextForCompaction(
+ EnvironmentUtils.TEST_QUERY_CONTEXT.getQueryId()),
targetResources,
new ArrayList<>(),
- null,
- null,
false);
int count = 0;
- while (tsFilesReader.hasNextBatch()) {
- BatchData batchData = tsFilesReader.nextBatch();
- while (batchData.hasCurrent()) {
- if (batchData.currentTime() >= 600) {
+ while (tsBlockReader.hasNextBatch()) {
+ TsBlock block = tsBlockReader.nextBatch();
+ IBatchDataIterator iterator = block.getTsBlockAlignedRowIterator();
+ while (iterator.hasNext()) {
+ if (iterator.currentTime() >= 600) {
assertEquals(
- batchData.currentTime() + 200,
- ((TsPrimitiveType[]) (batchData.currentValue()))[0].getValue());
+ iterator.currentTime() + 200,
+ ((TsPrimitiveType[]) (iterator.currentValue()))[0].getValue());
} else {
assertEquals(
- batchData.currentTime(),
- ((TsPrimitiveType[]) (batchData.currentValue()))[0].getValue());
+ iterator.currentTime(),
+ ((TsPrimitiveType[]) (iterator.currentValue()))[0].getValue());
}
count++;
- batchData.next();
+ iterator.next();
}
}
- tsFilesReader.close();
+ tsBlockReader.close();
if (i < TsFileGeneratorUtils.getAlignDeviceOffset() + 2 && j < 3) {
assertEquals(400, count);
} else if (i < TsFileGeneratorUtils.getAlignDeviceOffset() + 3 && j < 5) {
@@ -1202,39 +1211,39 @@ public class ReadPointCompactionPerformerTest extends AbstractCompactionTest {
COMPACTION_TEST_SG + PATH_SEPARATOR + "d" + i,
Collections.singletonList("s" + j),
schemas);
- IBatchReader tsFilesReader =
- new SeriesRawDataBatchReader(
+ IDataBlockReader tsBlockReader =
+ new SeriesDataBlockReader(
path,
TSDataType.VECTOR,
- EnvironmentUtils.TEST_QUERY_CONTEXT,
+ FragmentInstanceContext.createFragmentInstanceContextForCompaction(
+ EnvironmentUtils.TEST_QUERY_CONTEXT.getQueryId()),
seqResources,
unseqResources,
- null,
- null,
true);
int count = 0;
- while (tsFilesReader.hasNextBatch()) {
- BatchData batchData = tsFilesReader.nextBatch();
- while (batchData.hasCurrent()) {
- if (batchData.currentTime() < 200
- || (batchData.currentTime() < 550 && batchData.currentTime() >= 500)) {
+ while (tsBlockReader.hasNextBatch()) {
+ TsBlock block = tsBlockReader.nextBatch();
+ IBatchDataIterator iterator = block.getTsBlockAlignedRowIterator();
+ while (iterator.hasNext()) {
+ if (iterator.currentTime() < 200
+ || (iterator.currentTime() < 550 && iterator.currentTime() >= 500)) {
assertEquals(
- batchData.currentTime(),
- ((TsPrimitiveType[]) (batchData.currentValue()))[0].getValue());
- } else if (batchData.currentTime() < 850) {
+ iterator.currentTime(),
+ ((TsPrimitiveType[]) (iterator.currentValue()))[0].getValue());
+ } else if (iterator.currentTime() < 850) {
assertEquals(
- batchData.currentTime() + 100,
- ((TsPrimitiveType[]) (batchData.currentValue()))[0].getValue());
+ iterator.currentTime() + 100,
+ ((TsPrimitiveType[]) (iterator.currentValue()))[0].getValue());
} else {
assertEquals(
- batchData.currentTime() + 200,
- ((TsPrimitiveType[]) (batchData.currentValue()))[0].getValue());
+ iterator.currentTime() + 200,
+ ((TsPrimitiveType[]) (iterator.currentValue()))[0].getValue());
}
count++;
- batchData.next();
+ iterator.next();
}
}
- tsFilesReader.close();
+ tsBlockReader.close();
if (i < TsFileGeneratorUtils.getAlignDeviceOffset() + 2 && j < 3) {
assertEquals(1450, count);
} else if (i < TsFileGeneratorUtils.getAlignDeviceOffset() + 3 && j < 5) {
@@ -1263,39 +1272,39 @@ public class ReadPointCompactionPerformerTest extends AbstractCompactionTest {
COMPACTION_TEST_SG + PATH_SEPARATOR + "d" + i,
Collections.singletonList("s" + j),
schemas);
- IBatchReader tsFilesReader =
- new SeriesRawDataBatchReader(
+ IDataBlockReader tsBlockReader =
+ new SeriesDataBlockReader(
path,
TSDataType.VECTOR,
- EnvironmentUtils.TEST_QUERY_CONTEXT,
+ FragmentInstanceContext.createFragmentInstanceContextForCompaction(
+ EnvironmentUtils.TEST_QUERY_CONTEXT.getQueryId()),
seqResources,
unseqResources,
- null,
- null,
true);
int count = 0;
- while (tsFilesReader.hasNextBatch()) {
- BatchData batchData = tsFilesReader.nextBatch();
- while (batchData.hasCurrent()) {
- if (batchData.currentTime() < 200
- || (batchData.currentTime() < 550 && batchData.currentTime() >= 500)) {
+ while (tsBlockReader.hasNextBatch()) {
+ TsBlock block = tsBlockReader.nextBatch();
+ IBatchDataIterator iterator = block.getTsBlockAlignedRowIterator();
+ while (iterator.hasNext()) {
+ if (iterator.currentTime() < 200
+ || (iterator.currentTime() < 550 && iterator.currentTime() >= 500)) {
assertEquals(
- batchData.currentTime(),
- ((TsPrimitiveType[]) (batchData.currentValue()))[0].getValue());
- } else if (batchData.currentTime() < 850) {
+ iterator.currentTime(),
+ ((TsPrimitiveType[]) (iterator.currentValue()))[0].getValue());
+ } else if (iterator.currentTime() < 850) {
assertEquals(
- batchData.currentTime() + 100,
- ((TsPrimitiveType[]) (batchData.currentValue()))[0].getValue());
+ iterator.currentTime() + 100,
+ ((TsPrimitiveType[]) (iterator.currentValue()))[0].getValue());
} else {
assertEquals(
- batchData.currentTime() + 200,
- ((TsPrimitiveType[]) (batchData.currentValue()))[0].getValue());
+ iterator.currentTime() + 200,
+ ((TsPrimitiveType[]) (iterator.currentValue()))[0].getValue());
}
count++;
- batchData.next();
+ iterator.next();
}
}
- tsFilesReader.close();
+ tsBlockReader.close();
if (i < TsFileGeneratorUtils.getAlignDeviceOffset() + 2 && j < 3) {
assertEquals(1450, count);
} else if (i < TsFileGeneratorUtils.getAlignDeviceOffset() + 3 && j < 5) {
@@ -1373,39 +1382,39 @@ public class ReadPointCompactionPerformerTest extends AbstractCompactionTest {
COMPACTION_TEST_SG + PATH_SEPARATOR + "d" + i,
Collections.singletonList("s" + j),
schemas);
- IBatchReader tsFilesReader =
- new SeriesRawDataBatchReader(
+ IDataBlockReader tsBlockReader =
+ new SeriesDataBlockReader(
path,
TSDataType.VECTOR,
- EnvironmentUtils.TEST_QUERY_CONTEXT,
+ FragmentInstanceContext.createFragmentInstanceContextForCompaction(
+ EnvironmentUtils.TEST_QUERY_CONTEXT.getQueryId()),
seqResources,
unseqResources,
- null,
- null,
true);
int count = 0;
- while (tsFilesReader.hasNextBatch()) {
- BatchData batchData = tsFilesReader.nextBatch();
- while (batchData.hasCurrent()) {
- if (batchData.currentTime() < 200
- || (batchData.currentTime() < 550 && batchData.currentTime() >= 500)) {
+ while (tsBlockReader.hasNextBatch()) {
+ TsBlock block = tsBlockReader.nextBatch();
+ IBatchDataIterator iterator = block.getTsBlockAlignedRowIterator();
+ while (iterator.hasNext()) {
+ if (iterator.currentTime() < 200
+ || (iterator.currentTime() < 550 && iterator.currentTime() >= 500)) {
assertEquals(
- batchData.currentTime(),
- ((TsPrimitiveType[]) (batchData.currentValue()))[0].getValue());
- } else if (batchData.currentTime() < 850) {
+ iterator.currentTime(),
+ ((TsPrimitiveType[]) (iterator.currentValue()))[0].getValue());
+ } else if (iterator.currentTime() < 850) {
assertEquals(
- batchData.currentTime() + 100,
- ((TsPrimitiveType[]) (batchData.currentValue()))[0].getValue());
+ iterator.currentTime() + 100,
+ ((TsPrimitiveType[]) (iterator.currentValue()))[0].getValue());
} else {
assertEquals(
- batchData.currentTime() + 200,
- ((TsPrimitiveType[]) (batchData.currentValue()))[0].getValue());
+ iterator.currentTime() + 200,
+ ((TsPrimitiveType[]) (iterator.currentValue()))[0].getValue());
}
count++;
- batchData.next();
+ iterator.next();
}
}
- tsFilesReader.close();
+ tsBlockReader.close();
if ((i == TsFileGeneratorUtils.getAlignDeviceOffset() && j == 0)
|| (i == TsFileGeneratorUtils.getAlignDeviceOffset() && j == 1)
|| (i == TsFileGeneratorUtils.getAlignDeviceOffset() + 2 && j == 4)
@@ -1439,39 +1448,39 @@ public class ReadPointCompactionPerformerTest extends AbstractCompactionTest {
COMPACTION_TEST_SG + PATH_SEPARATOR + "d" + i,
Collections.singletonList("s" + j),
schemas);
- IBatchReader tsFilesReader =
- new SeriesRawDataBatchReader(
+ IDataBlockReader tsBlockReader =
+ new SeriesDataBlockReader(
path,
TSDataType.VECTOR,
- EnvironmentUtils.TEST_QUERY_CONTEXT,
+ FragmentInstanceContext.createFragmentInstanceContextForCompaction(
+ EnvironmentUtils.TEST_QUERY_CONTEXT.getQueryId()),
seqResources,
unseqResources,
- null,
- null,
true);
int count = 0;
- while (tsFilesReader.hasNextBatch()) {
- BatchData batchData = tsFilesReader.nextBatch();
- while (batchData.hasCurrent()) {
- if (batchData.currentTime() < 200
- || (batchData.currentTime() < 550 && batchData.currentTime() >= 500)) {
+ while (tsBlockReader.hasNextBatch()) {
+ TsBlock block = tsBlockReader.nextBatch();
+ IBatchDataIterator iterator = block.getTsBlockAlignedRowIterator();
+ while (iterator.hasNext()) {
+ if (iterator.currentTime() < 200
+ || (iterator.currentTime() < 550 && iterator.currentTime() >= 500)) {
assertEquals(
- batchData.currentTime(),
- ((TsPrimitiveType[]) (batchData.currentValue()))[0].getValue());
- } else if (batchData.currentTime() < 850) {
+ iterator.currentTime(),
+ ((TsPrimitiveType[]) (iterator.currentValue()))[0].getValue());
+ } else if (iterator.currentTime() < 850) {
assertEquals(
- batchData.currentTime() + 100,
- ((TsPrimitiveType[]) (batchData.currentValue()))[0].getValue());
+ iterator.currentTime() + 100,
+ ((TsPrimitiveType[]) (iterator.currentValue()))[0].getValue());
} else {
assertEquals(
- batchData.currentTime() + 200,
- ((TsPrimitiveType[]) (batchData.currentValue()))[0].getValue());
+ iterator.currentTime() + 200,
+ ((TsPrimitiveType[]) (iterator.currentValue()))[0].getValue());
}
count++;
- batchData.next();
+ iterator.next();
}
}
- tsFilesReader.close();
+ tsBlockReader.close();
if ((i == TsFileGeneratorUtils.getAlignDeviceOffset() && j == 0)
|| (i == TsFileGeneratorUtils.getAlignDeviceOffset() && j == 1)
|| (i == TsFileGeneratorUtils.getAlignDeviceOffset() + 2 && j == 4)
@@ -1533,39 +1542,39 @@ public class ReadPointCompactionPerformerTest extends AbstractCompactionTest {
COMPACTION_TEST_SG + PATH_SEPARATOR + "d" + i,
Collections.singletonList("s" + j),
schemas);
- IBatchReader tsFilesReader =
- new SeriesRawDataBatchReader(
+ IDataBlockReader tsBlockReader =
+ new SeriesDataBlockReader(
path,
TSDataType.VECTOR,
- EnvironmentUtils.TEST_QUERY_CONTEXT,
+ FragmentInstanceContext.createFragmentInstanceContextForCompaction(
+ EnvironmentUtils.TEST_QUERY_CONTEXT.getQueryId()),
seqResources,
unseqResources,
- null,
- null,
true);
int count = 0;
- while (tsFilesReader.hasNextBatch()) {
- BatchData batchData = tsFilesReader.nextBatch();
- while (batchData.hasCurrent()) {
- if (batchData.currentTime() < 200
- || (batchData.currentTime() < 550 && batchData.currentTime() >= 500)) {
+ while (tsBlockReader.hasNextBatch()) {
+ TsBlock block = tsBlockReader.nextBatch();
+ IBatchDataIterator iterator = block.getTsBlockAlignedRowIterator();
+ while (iterator.hasNext()) {
+ if (iterator.currentTime() < 200
+ || (iterator.currentTime() < 550 && iterator.currentTime() >= 500)) {
assertEquals(
- batchData.currentTime(),
- ((TsPrimitiveType[]) (batchData.currentValue()))[0].getValue());
- } else if (batchData.currentTime() < 850) {
+ iterator.currentTime(),
+ ((TsPrimitiveType[]) (iterator.currentValue()))[0].getValue());
+ } else if (iterator.currentTime() < 850) {
assertEquals(
- batchData.currentTime() + 100,
- ((TsPrimitiveType[]) (batchData.currentValue()))[0].getValue());
+ iterator.currentTime() + 100,
+ ((TsPrimitiveType[]) (iterator.currentValue()))[0].getValue());
} else {
assertEquals(
- batchData.currentTime() + 200,
- ((TsPrimitiveType[]) (batchData.currentValue()))[0].getValue());
+ iterator.currentTime() + 200,
+ ((TsPrimitiveType[]) (iterator.currentValue()))[0].getValue());
}
count++;
- batchData.next();
+ iterator.next();
}
}
- tsFilesReader.close();
+ tsBlockReader.close();
if (i == TsFileGeneratorUtils.getAlignDeviceOffset()) {
assertEquals(0, count);
} else if (i < TsFileGeneratorUtils.getAlignDeviceOffset() + 2 && j < 3) {
@@ -1596,39 +1605,39 @@ public class ReadPointCompactionPerformerTest extends AbstractCompactionTest {
COMPACTION_TEST_SG + PATH_SEPARATOR + "d" + i,
Collections.singletonList("s" + j),
schemas);
- IBatchReader tsFilesReader =
- new SeriesRawDataBatchReader(
+ IDataBlockReader tsBlockReader =
+ new SeriesDataBlockReader(
path,
TSDataType.VECTOR,
- EnvironmentUtils.TEST_QUERY_CONTEXT,
+ FragmentInstanceContext.createFragmentInstanceContextForCompaction(
+ EnvironmentUtils.TEST_QUERY_CONTEXT.getQueryId()),
seqResources,
unseqResources,
- null,
- null,
true);
int count = 0;
- while (tsFilesReader.hasNextBatch()) {
- BatchData batchData = tsFilesReader.nextBatch();
- while (batchData.hasCurrent()) {
- if (batchData.currentTime() < 200
- || (batchData.currentTime() < 550 && batchData.currentTime() >= 500)) {
+ while (tsBlockReader.hasNextBatch()) {
+ TsBlock block = tsBlockReader.nextBatch();
+ IBatchDataIterator iterator = block.getTsBlockAlignedRowIterator();
+ while (iterator.hasNext()) {
+ if (iterator.currentTime() < 200
+ || (iterator.currentTime() < 550 && iterator.currentTime() >= 500)) {
assertEquals(
- batchData.currentTime(),
- ((TsPrimitiveType[]) (batchData.currentValue()))[0].getValue());
- } else if (batchData.currentTime() < 850) {
+ iterator.currentTime(),
+ ((TsPrimitiveType[]) (iterator.currentValue()))[0].getValue());
+ } else if (iterator.currentTime() < 850) {
assertEquals(
- batchData.currentTime() + 100,
- ((TsPrimitiveType[]) (batchData.currentValue()))[0].getValue());
+ iterator.currentTime() + 100,
+ ((TsPrimitiveType[]) (iterator.currentValue()))[0].getValue());
} else {
assertEquals(
- batchData.currentTime() + 200,
- ((TsPrimitiveType[]) (batchData.currentValue()))[0].getValue());
+ iterator.currentTime() + 200,
+ ((TsPrimitiveType[]) (iterator.currentValue()))[0].getValue());
}
count++;
- batchData.next();
+ iterator.next();
}
}
- tsFilesReader.close();
+ tsBlockReader.close();
if (i == TsFileGeneratorUtils.getAlignDeviceOffset()) {
assertEquals(0, count);
} else if (i < TsFileGeneratorUtils.getAlignDeviceOffset() + 2 && j < 3) {
@@ -1661,28 +1670,28 @@ public class ReadPointCompactionPerformerTest extends AbstractCompactionTest {
COMPACTION_TEST_SG + PATH_SEPARATOR + "d" + i,
Collections.singletonList("s" + j),
schemas);
- IBatchReader tsFilesReader =
- new SeriesRawDataBatchReader(
+ IDataBlockReader tsBlockReader =
+ new SeriesDataBlockReader(
path,
TSDataType.VECTOR,
- EnvironmentUtils.TEST_QUERY_CONTEXT,
+ FragmentInstanceContext.createFragmentInstanceContextForCompaction(
+ EnvironmentUtils.TEST_QUERY_CONTEXT.getQueryId()),
seqResources,
unseqResources,
- null,
- null,
true);
int count = 0;
- while (tsFilesReader.hasNextBatch()) {
- BatchData batchData = tsFilesReader.nextBatch();
- while (batchData.hasCurrent()) {
+ while (tsBlockReader.hasNextBatch()) {
+ TsBlock block = tsBlockReader.nextBatch();
+ IBatchDataIterator iterator = block.getTsBlockAlignedRowIterator();
+ while (iterator.hasNext()) {
assertEquals(
- batchData.currentTime(),
- ((TsPrimitiveType[]) (batchData.currentValue()))[0].getValue());
+ iterator.currentTime(),
+ ((TsPrimitiveType[]) (iterator.currentValue()))[0].getValue());
count++;
- batchData.next();
+ iterator.next();
}
}
- tsFilesReader.close();
+ tsBlockReader.close();
assertEquals(500, count);
}
}
@@ -1705,28 +1714,28 @@ public class ReadPointCompactionPerformerTest extends AbstractCompactionTest {
COMPACTION_TEST_SG + PATH_SEPARATOR + "d" + i,
Collections.singletonList("s" + j),
schemas);
- IBatchReader tsFilesReader =
- new SeriesRawDataBatchReader(
+ IDataBlockReader tsBlockReader =
+ new SeriesDataBlockReader(
path,
TSDataType.VECTOR,
- EnvironmentUtils.TEST_QUERY_CONTEXT,
+ FragmentInstanceContext.createFragmentInstanceContextForCompaction(
+ EnvironmentUtils.TEST_QUERY_CONTEXT.getQueryId()),
new ArrayList<>(),
targetResources,
- null,
- null,
true);
int count = 0;
- while (tsFilesReader.hasNextBatch()) {
- BatchData batchData = tsFilesReader.nextBatch();
- while (batchData.hasCurrent()) {
+ while (tsBlockReader.hasNextBatch()) {
+ TsBlock block = tsBlockReader.nextBatch();
+ IBatchDataIterator iterator = block.getTsBlockAlignedRowIterator();
+ while (iterator.hasNext()) {
assertEquals(
- batchData.currentTime(),
- ((TsPrimitiveType[]) (batchData.currentValue()))[0].getValue());
+ iterator.currentTime(),
+ ((TsPrimitiveType[]) (iterator.currentValue()))[0].getValue());
count++;
- batchData.next();
+ iterator.next();
}
}
- tsFilesReader.close();
+ tsBlockReader.close();
assertEquals(500, count);
}
}
@@ -1755,31 +1764,31 @@ public class ReadPointCompactionPerformerTest extends AbstractCompactionTest {
COMPACTION_TEST_SG + PATH_SEPARATOR + "d1",
"s1",
new MeasurementSchema("s1", TSDataType.INT64));
- IBatchReader tsFilesReader =
- new SeriesRawDataBatchReader(
+ IDataBlockReader tsBlockReader =
+ new SeriesDataBlockReader(
path,
TSDataType.INT64,
- EnvironmentUtils.TEST_QUERY_CONTEXT,
+ FragmentInstanceContext.createFragmentInstanceContextForCompaction(
+ EnvironmentUtils.TEST_QUERY_CONTEXT.getQueryId()),
seqResources,
unseqResources,
- null,
- null,
true);
int count = 0;
- while (tsFilesReader.hasNextBatch()) {
- BatchData batchData = tsFilesReader.nextBatch();
- while (batchData.hasCurrent()) {
- if (batchData.currentTime() % 100 < 50) {
- assertEquals(batchData.currentTime() + 10000, batchData.currentValue());
+ while (tsBlockReader.hasNextBatch()) {
+ TsBlock block = tsBlockReader.nextBatch();
+ IBatchDataIterator iterator = block.getTsBlockSingleColumnIterator();
+ while (iterator.hasNext()) {
+ if (iterator.currentTime() % 100 < 50) {
+ assertEquals(iterator.currentTime() + 10000, iterator.currentValue());
} else {
- assertEquals(batchData.currentTime(), batchData.currentValue());
+ assertEquals(iterator.currentTime(), iterator.currentValue());
}
count++;
- batchData.next();
+ iterator.next();
}
}
- tsFilesReader.close();
+ tsBlockReader.close();
assertEquals(500, count);
List<TsFileResource> targetResources =
@@ -1790,31 +1799,31 @@ public class ReadPointCompactionPerformerTest extends AbstractCompactionTest {
performer.perform();
CompactionUtils.moveTargetFile(targetResources, false, COMPACTION_TEST_SG);
- tsFilesReader =
- new SeriesRawDataBatchReader(
+ tsBlockReader =
+ new SeriesDataBlockReader(
path,
TSDataType.INT64,
- EnvironmentUtils.TEST_QUERY_CONTEXT,
+ FragmentInstanceContext.createFragmentInstanceContextForCompaction(
+ EnvironmentUtils.TEST_QUERY_CONTEXT.getQueryId()),
targetResources,
new ArrayList<>(),
- null,
- null,
true);
count = 0;
- while (tsFilesReader.hasNextBatch()) {
- BatchData batchData = tsFilesReader.nextBatch();
- while (batchData.hasCurrent()) {
- if (batchData.currentTime() % 100 < 50) {
- assertEquals(batchData.currentTime() + 10000, batchData.currentValue());
+ while (tsBlockReader.hasNextBatch()) {
+ TsBlock block = tsBlockReader.nextBatch();
+ IBatchDataIterator iterator = block.getTsBlockSingleColumnIterator();
+ while (iterator.hasNext()) {
+ if (iterator.currentTime() % 100 < 50) {
+ assertEquals(iterator.currentTime() + 10000, iterator.currentValue());
} else {
- assertEquals(batchData.currentTime(), batchData.currentValue());
+ assertEquals(iterator.currentTime(), iterator.currentValue());
}
count++;
- batchData.next();
+ iterator.next();
}
}
- tsFilesReader.close();
+ tsBlockReader.close();
assertEquals(500, count);
}
@@ -1851,37 +1860,37 @@ public class ReadPointCompactionPerformerTest extends AbstractCompactionTest {
COMPACTION_TEST_SG + PATH_SEPARATOR + "d" + i,
"s" + j,
new MeasurementSchema("s" + j, TSDataType.INT64));
- IBatchReader tsFilesReader =
- new SeriesRawDataBatchReader(
+ IDataBlockReader tsBlockReader =
+ new SeriesDataBlockReader(
path,
TSDataType.INT64,
- EnvironmentUtils.TEST_QUERY_CONTEXT,
+ FragmentInstanceContext.createFragmentInstanceContextForCompaction(
+ EnvironmentUtils.TEST_QUERY_CONTEXT.getQueryId()),
seqResources,
unseqResources,
- null,
- null,
true);
int count = 0;
- while (tsFilesReader.hasNextBatch()) {
- BatchData batchData = tsFilesReader.nextBatch();
- while (batchData.hasCurrent()) {
+ while (tsBlockReader.hasNextBatch()) {
+ TsBlock block = tsBlockReader.nextBatch();
+ IBatchDataIterator iterator = block.getTsBlockSingleColumnIterator();
+ while (iterator.hasNext()) {
if (i == 0
- && ((450 <= batchData.currentTime() && batchData.currentTime() < 550)
- || (550 <= batchData.currentTime() && batchData.currentTime() < 650))) {
- assertEquals(batchData.currentTime() + 20000, batchData.currentValue());
+ && ((450 <= iterator.currentTime() && iterator.currentTime() < 550)
+ || (550 <= iterator.currentTime() && iterator.currentTime() < 650))) {
+ assertEquals(iterator.currentTime() + 20000, iterator.currentValue());
} else if ((i < 3 && j < 4)
- && ((20 <= batchData.currentTime() && batchData.currentTime() < 220)
- || (250 <= batchData.currentTime() && batchData.currentTime() < 450)
- || (480 <= batchData.currentTime() && batchData.currentTime() < 680))) {
- assertEquals(batchData.currentTime() + 10000, batchData.currentValue());
+ && ((20 <= iterator.currentTime() && iterator.currentTime() < 220)
+ || (250 <= iterator.currentTime() && iterator.currentTime() < 450)
+ || (480 <= iterator.currentTime() && iterator.currentTime() < 680))) {
+ assertEquals(iterator.currentTime() + 10000, iterator.currentValue());
} else {
- assertEquals(batchData.currentTime(), batchData.currentValue());
+ assertEquals(iterator.currentTime(), iterator.currentValue());
}
count++;
- batchData.next();
+ iterator.next();
}
}
- tsFilesReader.close();
+ tsBlockReader.close();
if (i < 2 && j < 3) {
assertEquals(1280, count);
} else if (i < 1 && j < 4) {
@@ -1946,45 +1955,45 @@ public class ReadPointCompactionPerformerTest extends AbstractCompactionTest {
COMPACTION_TEST_SG + PATH_SEPARATOR + "d" + i,
"s" + j,
new MeasurementSchema("s" + j, TSDataType.INT64));
- IBatchReader tsFilesReader =
- new SeriesRawDataBatchReader(
+ IDataBlockReader tsBlockReader =
+ new SeriesDataBlockReader(
path,
TSDataType.INT64,
- EnvironmentUtils.TEST_QUERY_CONTEXT,
+ FragmentInstanceContext.createFragmentInstanceContextForCompaction(
+ EnvironmentUtils.TEST_QUERY_CONTEXT.getQueryId()),
targetResources,
new ArrayList<>(),
- null,
- null,
true);
int count = 0;
- while (tsFilesReader.hasNextBatch()) {
- BatchData batchData = tsFilesReader.nextBatch();
- while (batchData.hasCurrent()) {
+ while (tsBlockReader.hasNextBatch()) {
+ TsBlock block = tsBlockReader.nextBatch();
+ IBatchDataIterator iterator = block.getTsBlockSingleColumnIterator();
+ while (iterator.hasNext()) {
if (measurementMaxTime.get(
COMPACTION_TEST_SG + PATH_SEPARATOR + "d" + i + PATH_SEPARATOR + "s" + j)
- >= batchData.currentTime()) {
+ >= iterator.currentTime()) {
Assert.fail();
}
measurementMaxTime.put(
COMPACTION_TEST_SG + PATH_SEPARATOR + "d" + i + PATH_SEPARATOR + "s" + j,
- batchData.currentTime());
+ iterator.currentTime());
if (i == 0
- && ((450 <= batchData.currentTime() && batchData.currentTime() < 550)
- || (550 <= batchData.currentTime() && batchData.currentTime() < 650))) {
- assertEquals(batchData.currentTime() + 20000, batchData.currentValue());
+ && ((450 <= iterator.currentTime() && iterator.currentTime() < 550)
+ || (550 <= iterator.currentTime() && iterator.currentTime() < 650))) {
+ assertEquals(iterator.currentTime() + 20000, iterator.currentValue());
} else if ((i < 3 && j < 4)
- && ((20 <= batchData.currentTime() && batchData.currentTime() < 220)
- || (250 <= batchData.currentTime() && batchData.currentTime() < 450)
- || (480 <= batchData.currentTime() && batchData.currentTime() < 680))) {
- assertEquals(batchData.currentTime() + 10000, batchData.currentValue());
+ && ((20 <= iterator.currentTime() && iterator.currentTime() < 220)
+ || (250 <= iterator.currentTime() && iterator.currentTime() < 450)
+ || (480 <= iterator.currentTime() && iterator.currentTime() < 680))) {
+ assertEquals(iterator.currentTime() + 10000, iterator.currentValue());
} else {
- assertEquals(batchData.currentTime(), batchData.currentValue());
+ assertEquals(iterator.currentTime(), iterator.currentValue());
}
count++;
- batchData.next();
+ iterator.next();
}
}
- tsFilesReader.close();
+ tsBlockReader.close();
if (i < 2 && j < 3) {
assertEquals(1280, count);
} else if (i < 1 && j < 4) {
@@ -2046,37 +2055,37 @@ public class ReadPointCompactionPerformerTest extends AbstractCompactionTest {
COMPACTION_TEST_SG + PATH_SEPARATOR + "d" + i,
"s" + j,
new MeasurementSchema("s" + j, TSDataType.INT64));
- IBatchReader tsFilesReader =
- new SeriesRawDataBatchReader(
+ IDataBlockReader tsBlockReader =
+ new SeriesDataBlockReader(
path,
TSDataType.INT64,
- EnvironmentUtils.TEST_QUERY_CONTEXT,
+ FragmentInstanceContext.createFragmentInstanceContextForCompaction(
+ EnvironmentUtils.TEST_QUERY_CONTEXT.getQueryId()),
seqResources,
unseqResources,
- null,
- null,
true);
int count = 0;
- while (tsFilesReader.hasNextBatch()) {
- BatchData batchData = tsFilesReader.nextBatch();
- while (batchData.hasCurrent()) {
+ while (tsBlockReader.hasNextBatch()) {
+ TsBlock block = tsBlockReader.nextBatch();
+ IBatchDataIterator iterator = block.getTsBlockSingleColumnIterator();
+ while (iterator.hasNext()) {
if (i == 0
- && ((450 <= batchData.currentTime() && batchData.currentTime() < 550)
- || (550 <= batchData.currentTime() && batchData.currentTime() < 650))) {
- assertEquals(batchData.currentTime() + 20000, batchData.currentValue());
+ && ((450 <= iterator.currentTime() && iterator.currentTime() < 550)
+ || (550 <= iterator.currentTime() && iterator.currentTime() < 650))) {
+ assertEquals(iterator.currentTime() + 20000, iterator.currentValue());
} else if ((i < 3 && j < 4)
- && ((20 <= batchData.currentTime() && batchData.currentTime() < 220)
- || (250 <= batchData.currentTime() && batchData.currentTime() < 450)
- || (480 <= batchData.currentTime() && batchData.currentTime() < 680))) {
- assertEquals(batchData.currentTime() + 10000, batchData.currentValue());
+ && ((20 <= iterator.currentTime() && iterator.currentTime() < 220)
+ || (250 <= iterator.currentTime() && iterator.currentTime() < 450)
+ || (480 <= iterator.currentTime() && iterator.currentTime() < 680))) {
+ assertEquals(iterator.currentTime() + 10000, iterator.currentValue());
} else {
- assertEquals(batchData.currentTime(), batchData.currentValue());
+ assertEquals(iterator.currentTime(), iterator.currentValue());
}
count++;
- batchData.next();
+ iterator.next();
}
}
- tsFilesReader.close();
+ tsBlockReader.close();
if ((i == 0 && j == 0) || (i == 0 && j == 1) || (i == 2 && j == 4) || (i == 3 && j == 4)) {
assertEquals(0, count);
} else if (i < 2 && j < 3) {
@@ -2142,45 +2151,45 @@ public class ReadPointCompactionPerformerTest extends AbstractCompactionTest {
COMPACTION_TEST_SG + PATH_SEPARATOR + "d" + i,
"s" + j,
new MeasurementSchema("s" + j, TSDataType.INT64));
- IBatchReader tsFilesReader =
- new SeriesRawDataBatchReader(
+ IDataBlockReader tsBlockReader =
+ new SeriesDataBlockReader(
path,
TSDataType.INT64,
- EnvironmentUtils.TEST_QUERY_CONTEXT,
+ FragmentInstanceContext.createFragmentInstanceContextForCompaction(
+ EnvironmentUtils.TEST_QUERY_CONTEXT.getQueryId()),
targetResources,
new ArrayList<>(),
- null,
- null,
true);
int count = 0;
- while (tsFilesReader.hasNextBatch()) {
- BatchData batchData = tsFilesReader.nextBatch();
- while (batchData.hasCurrent()) {
+ while (tsBlockReader.hasNextBatch()) {
+ TsBlock block = tsBlockReader.nextBatch();
+ IBatchDataIterator iterator = block.getTsBlockSingleColumnIterator();
+ while (iterator.hasNext()) {
if (measurementMaxTime.get(
COMPACTION_TEST_SG + PATH_SEPARATOR + "d" + i + PATH_SEPARATOR + "s" + j)
- >= batchData.currentTime()) {
+ >= iterator.currentTime()) {
Assert.fail();
}
measurementMaxTime.put(
COMPACTION_TEST_SG + PATH_SEPARATOR + "d" + i + PATH_SEPARATOR + "s" + j,
- batchData.currentTime());
+ iterator.currentTime());
if (i == 0
- && ((450 <= batchData.currentTime() && batchData.currentTime() < 550)
- || (550 <= batchData.currentTime() && batchData.currentTime() < 650))) {
- assertEquals(batchData.currentTime() + 20000, batchData.currentValue());
+ && ((450 <= iterator.currentTime() && iterator.currentTime() < 550)
+ || (550 <= iterator.currentTime() && iterator.currentTime() < 650))) {
+ assertEquals(iterator.currentTime() + 20000, iterator.currentValue());
} else if ((i < 3 && j < 4)
- && ((20 <= batchData.currentTime() && batchData.currentTime() < 220)
- || (250 <= batchData.currentTime() && batchData.currentTime() < 450)
- || (480 <= batchData.currentTime() && batchData.currentTime() < 680))) {
- assertEquals(batchData.currentTime() + 10000, batchData.currentValue());
+ && ((20 <= iterator.currentTime() && iterator.currentTime() < 220)
+ || (250 <= iterator.currentTime() && iterator.currentTime() < 450)
+ || (480 <= iterator.currentTime() && iterator.currentTime() < 680))) {
+ assertEquals(iterator.currentTime() + 10000, iterator.currentValue());
} else {
- assertEquals(batchData.currentTime(), batchData.currentValue());
+ assertEquals(iterator.currentTime(), iterator.currentValue());
}
count++;
- batchData.next();
+ iterator.next();
}
}
- tsFilesReader.close();
+ tsBlockReader.close();
if ((i == 0 && j == 0) || (i == 0 && j == 1) || (i == 2 && j == 4) || (i == 3 && j == 4)) {
assertEquals(0, count);
} else if (i < 2 && j < 3) {
@@ -2244,37 +2253,37 @@ public class ReadPointCompactionPerformerTest extends AbstractCompactionTest {
COMPACTION_TEST_SG + PATH_SEPARATOR + "d" + i,
"s" + j,
new MeasurementSchema("s" + j, TSDataType.INT64));
- IBatchReader tsFilesReader =
- new SeriesRawDataBatchReader(
+ IDataBlockReader tsBlockReader =
+ new SeriesDataBlockReader(
path,
TSDataType.INT64,
- EnvironmentUtils.TEST_QUERY_CONTEXT,
+ FragmentInstanceContext.createFragmentInstanceContextForCompaction(
+ EnvironmentUtils.TEST_QUERY_CONTEXT.getQueryId()),
seqResources,
unseqResources,
- null,
- null,
true);
int count = 0;
- while (tsFilesReader.hasNextBatch()) {
- BatchData batchData = tsFilesReader.nextBatch();
- while (batchData.hasCurrent()) {
+ while (tsBlockReader.hasNextBatch()) {
+ TsBlock block = tsBlockReader.nextBatch();
+ IBatchDataIterator iterator = block.getTsBlockSingleColumnIterator();
+ while (iterator.hasNext()) {
if (i == 0
- && ((450 <= batchData.currentTime() && batchData.currentTime() < 550)
- || (550 <= batchData.currentTime() && batchData.currentTime() < 650))) {
- assertEquals(batchData.currentTime() + 20000, batchData.currentValue());
+ && ((450 <= iterator.currentTime() && iterator.currentTime() < 550)
+ || (550 <= iterator.currentTime() && iterator.currentTime() < 650))) {
+ assertEquals(iterator.currentTime() + 20000, iterator.currentValue());
} else if ((i < 3 && j < 4)
- && ((20 <= batchData.currentTime() && batchData.currentTime() < 220)
- || (250 <= batchData.currentTime() && batchData.currentTime() < 450)
- || (480 <= batchData.currentTime() && batchData.currentTime() < 680))) {
- assertEquals(batchData.currentTime() + 10000, batchData.currentValue());
+ && ((20 <= iterator.currentTime() && iterator.currentTime() < 220)
+ || (250 <= iterator.currentTime() && iterator.currentTime() < 450)
+ || (480 <= iterator.currentTime() && iterator.currentTime() < 680))) {
+ assertEquals(iterator.currentTime() + 10000, iterator.currentValue());
} else {
- assertEquals(batchData.currentTime(), batchData.currentValue());
+ assertEquals(iterator.currentTime(), iterator.currentValue());
}
count++;
- batchData.next();
+ iterator.next();
}
}
- tsFilesReader.close();
+ tsBlockReader.close();
if (i == 0 || i == 2) {
assertEquals(0, count);
} else if (i < 2 && j < 3) {
@@ -2334,45 +2343,45 @@ public class ReadPointCompactionPerformerTest extends AbstractCompactionTest {
COMPACTION_TEST_SG + PATH_SEPARATOR + "d" + i,
"s" + j,
new MeasurementSchema("s" + j, TSDataType.INT64));
- IBatchReader tsFilesReader =
- new SeriesRawDataBatchReader(
+ IDataBlockReader tsBlockReader =
+ new SeriesDataBlockReader(
path,
TSDataType.INT64,
- EnvironmentUtils.TEST_QUERY_CONTEXT,
+ FragmentInstanceContext.createFragmentInstanceContextForCompaction(
+ EnvironmentUtils.TEST_QUERY_CONTEXT.getQueryId()),
targetResources,
new ArrayList<>(),
- null,
- null,
true);
int count = 0;
- while (tsFilesReader.hasNextBatch()) {
- BatchData batchData = tsFilesReader.nextBatch();
- while (batchData.hasCurrent()) {
+ while (tsBlockReader.hasNextBatch()) {
+ TsBlock block = tsBlockReader.nextBatch();
+ IBatchDataIterator iterator = block.getTsBlockSingleColumnIterator();
+ while (iterator.hasNext()) {
if (measurementMaxTime.get(
COMPACTION_TEST_SG + PATH_SEPARATOR + "d" + i + PATH_SEPARATOR + "s" + j)
- >= batchData.currentTime()) {
+ >= iterator.currentTime()) {
Assert.fail();
}
measurementMaxTime.put(
COMPACTION_TEST_SG + PATH_SEPARATOR + "d" + i + PATH_SEPARATOR + "s" + j,
- batchData.currentTime());
+ iterator.currentTime());
if (i == 0
- && ((450 <= batchData.currentTime() && batchData.currentTime() < 550)
- || (550 <= batchData.currentTime() && batchData.currentTime() < 650))) {
- assertEquals(batchData.currentTime() + 20000, batchData.currentValue());
+ && ((450 <= iterator.currentTime() && iterator.currentTime() < 550)
+ || (550 <= iterator.currentTime() && iterator.currentTime() < 650))) {
+ assertEquals(iterator.currentTime() + 20000, iterator.currentValue());
} else if ((i < 3 && j < 4)
- && ((20 <= batchData.currentTime() && batchData.currentTime() < 220)
- || (250 <= batchData.currentTime() && batchData.currentTime() < 450)
- || (480 <= batchData.currentTime() && batchData.currentTime() < 680))) {
- assertEquals(batchData.currentTime() + 10000, batchData.currentValue());
+ && ((20 <= iterator.currentTime() && iterator.currentTime() < 220)
+ || (250 <= iterator.currentTime() && iterator.currentTime() < 450)
+ || (480 <= iterator.currentTime() && iterator.currentTime() < 680))) {
+ assertEquals(iterator.currentTime() + 10000, iterator.currentValue());
} else {
- assertEquals(batchData.currentTime(), batchData.currentValue());
+ assertEquals(iterator.currentTime(), iterator.currentValue());
}
count++;
- batchData.next();
+ iterator.next();
}
}
- tsFilesReader.close();
+ tsBlockReader.close();
if (i == 0 || i == 2) {
assertEquals(0, count);
} else if (i < 2 && j < 3) {
@@ -2434,37 +2443,37 @@ public class ReadPointCompactionPerformerTest extends AbstractCompactionTest {
COMPACTION_TEST_SG + PATH_SEPARATOR + "d" + i,
"s" + j,
new MeasurementSchema("s" + j, TSDataType.INT64));
- IBatchReader tsFilesReader =
- new SeriesRawDataBatchReader(
+ IDataBlockReader tsBlockReader =
+ new SeriesDataBlockReader(
path,
TSDataType.INT64,
- EnvironmentUtils.TEST_QUERY_CONTEXT,
+ FragmentInstanceContext.createFragmentInstanceContextForCompaction(
+ EnvironmentUtils.TEST_QUERY_CONTEXT.getQueryId()),
seqResources,
unseqResources,
- null,
- null,
true);
int count = 0;
- while (tsFilesReader.hasNextBatch()) {
- BatchData batchData = tsFilesReader.nextBatch();
- while (batchData.hasCurrent()) {
+ while (tsBlockReader.hasNextBatch()) {
+ TsBlock block = tsBlockReader.nextBatch();
+ IBatchDataIterator iterator = block.getTsBlockSingleColumnIterator();
+ while (iterator.hasNext()) {
if (i == 0
- && ((450 <= batchData.currentTime() && batchData.currentTime() < 550)
- || (550 <= batchData.currentTime() && batchData.currentTime() < 650))) {
- assertEquals(batchData.currentTime() + 20000, batchData.currentValue());
+ && ((450 <= iterator.currentTime() && iterator.currentTime() < 550)
+ || (550 <= iterator.currentTime() && iterator.currentTime() < 650))) {
+ assertEquals(iterator.currentTime() + 20000, iterator.currentValue());
} else if ((i < 3 && j < 4)
- && ((20 <= batchData.currentTime() && batchData.currentTime() < 220)
- || (250 <= batchData.currentTime() && batchData.currentTime() < 450)
- || (480 <= batchData.currentTime() && batchData.currentTime() < 680))) {
- assertEquals(batchData.currentTime() + 10000, batchData.currentValue());
+ && ((20 <= iterator.currentTime() && iterator.currentTime() < 220)
+ || (250 <= iterator.currentTime() && iterator.currentTime() < 450)
+ || (480 <= iterator.currentTime() && iterator.currentTime() < 680))) {
+ assertEquals(iterator.currentTime() + 10000, iterator.currentValue());
} else {
- assertEquals(batchData.currentTime(), batchData.currentValue());
+ assertEquals(iterator.currentTime(), iterator.currentValue());
}
count++;
- batchData.next();
+ iterator.next();
}
}
- tsFilesReader.close();
+ tsBlockReader.close();
if (i == 0 || i == 1 || i == 2) {
assertEquals(0, count);
} else {
@@ -2507,45 +2516,45 @@ public class ReadPointCompactionPerformerTest extends AbstractCompactionTest {
COMPACTION_TEST_SG + PATH_SEPARATOR + "d" + i,
"s" + j,
new MeasurementSchema("s" + j, TSDataType.INT64));
- IBatchReader tsFilesReader =
- new SeriesRawDataBatchReader(
+ IDataBlockReader tsBlockReader =
+ new SeriesDataBlockReader(
path,
TSDataType.INT64,
- EnvironmentUtils.TEST_QUERY_CONTEXT,
+ FragmentInstanceContext.createFragmentInstanceContextForCompaction(
+ EnvironmentUtils.TEST_QUERY_CONTEXT.getQueryId()),
targetResources,
new ArrayList<>(),
- null,
- null,
true);
int count = 0;
- while (tsFilesReader.hasNextBatch()) {
- BatchData batchData = tsFilesReader.nextBatch();
- while (batchData.hasCurrent()) {
+ while (tsBlockReader.hasNextBatch()) {
+ TsBlock block = tsBlockReader.nextBatch();
+ IBatchDataIterator iterator = block.getTsBlockSingleColumnIterator();
+ while (iterator.hasNext()) {
if (measurementMaxTime.get(
COMPACTION_TEST_SG + PATH_SEPARATOR + "d" + i + PATH_SEPARATOR + "s" + j)
- >= batchData.currentTime()) {
+ >= iterator.currentTime()) {
Assert.fail();
}
measurementMaxTime.put(
COMPACTION_TEST_SG + PATH_SEPARATOR + "d" + i + PATH_SEPARATOR + "s" + j,
- batchData.currentTime());
+ iterator.currentTime());
if (i == 0
- && ((450 <= batchData.currentTime() && batchData.currentTime() < 550)
- || (550 <= batchData.currentTime() && batchData.currentTime() < 650))) {
- assertEquals(batchData.currentTime() + 20000, batchData.currentValue());
+ && ((450 <= iterator.currentTime() && iterator.currentTime() < 550)
+ || (550 <= iterator.currentTime() && iterator.currentTime() < 650))) {
+ assertEquals(iterator.currentTime() + 20000, iterator.currentValue());
} else if ((i < 3 && j < 4)
- && ((20 <= batchData.currentTime() && batchData.currentTime() < 220)
- || (250 <= batchData.currentTime() && batchData.currentTime() < 450)
- || (480 <= batchData.currentTime() && batchData.currentTime() < 680))) {
- assertEquals(batchData.currentTime() + 10000, batchData.currentValue());
+ && ((20 <= iterator.currentTime() && iterator.currentTime() < 220)
+ || (250 <= iterator.currentTime() && iterator.currentTime() < 450)
+ || (480 <= iterator.currentTime() && iterator.currentTime() < 680))) {
+ assertEquals(iterator.currentTime() + 10000, iterator.currentValue());
} else {
- assertEquals(batchData.currentTime(), batchData.currentValue());
+ assertEquals(iterator.currentTime(), iterator.currentValue());
}
count++;
- batchData.next();
+ iterator.next();
}
}
- tsFilesReader.close();
+ tsBlockReader.close();
if (i == 0 || i == 1 || i == 2) {
assertEquals(0, count);
} else {
@@ -2599,37 +2608,37 @@ public class ReadPointCompactionPerformerTest extends AbstractCompactionTest {
COMPACTION_TEST_SG + PATH_SEPARATOR + "d" + i,
"s" + j,
new MeasurementSchema("s" + j, TSDataType.INT64));
- IBatchReader tsFilesReader =
- new SeriesRawDataBatchReader(
+ IDataBlockReader tsBlockReader =
+ new SeriesDataBlockReader(
path,
TSDataType.INT64,
- EnvironmentUtils.TEST_QUERY_CONTEXT,
+ FragmentInstanceContext.createFragmentInstanceContextForCompaction(
+ EnvironmentUtils.TEST_QUERY_CONTEXT.getQueryId()),
seqResources,
unseqResources,
- null,
- null,
true);
int count = 0;
- while (tsFilesReader.hasNextBatch()) {
- BatchData batchData = tsFilesReader.nextBatch();
- while (batchData.hasCurrent()) {
+ while (tsBlockReader.hasNextBatch()) {
+ TsBlock block = tsBlockReader.nextBatch();
+ IBatchDataIterator iterator = block.getTsBlockSingleColumnIterator();
+ while (iterator.hasNext()) {
if (i == 0
- && ((450 <= batchData.currentTime() && batchData.currentTime() < 550)
- || (550 <= batchData.currentTime() && batchData.currentTime() < 650))) {
- assertEquals(batchData.currentTime() + 20000, batchData.currentValue());
+ && ((450 <= iterator.currentTime() && iterator.currentTime() < 550)
+ || (550 <= iterator.currentTime() && iterator.currentTime() < 650))) {
+ assertEquals(iterator.currentTime() + 20000, iterator.currentValue());
} else if ((i < 3 && j < 4)
- && ((20 <= batchData.currentTime() && batchData.currentTime() < 220)
- || (250 <= batchData.currentTime() && batchData.currentTime() < 450)
- || (480 <= batchData.currentTime() && batchData.currentTime() < 680))) {
- assertEquals(batchData.currentTime() + 10000, batchData.currentValue());
+ && ((20 <= iterator.currentTime() && iterator.currentTime() < 220)
+ || (250 <= iterator.currentTime() && iterator.currentTime() < 450)
+ || (480 <= iterator.currentTime() && iterator.currentTime() < 680))) {
+ assertEquals(iterator.currentTime() + 10000, iterator.currentValue());
} else {
- assertEquals(batchData.currentTime(), batchData.currentValue());
+ assertEquals(iterator.currentTime(), iterator.currentValue());
}
count++;
- batchData.next();
+ iterator.next();
}
}
- tsFilesReader.close();
+ tsBlockReader.close();
if (i < 1) {
if (j < 4) {
assertEquals(630, count);
@@ -2709,45 +2718,45 @@ public class ReadPointCompactionPerformerTest extends AbstractCompactionTest {
COMPACTION_TEST_SG + PATH_SEPARATOR + "d" + i,
"s" + j,
new MeasurementSchema("s" + j, TSDataType.INT64));
- IBatchReader tsFilesReader =
- new SeriesRawDataBatchReader(
+ IDataBlockReader tsBlockReader =
+ new SeriesDataBlockReader(
path,
TSDataType.INT64,
- EnvironmentUtils.TEST_QUERY_CONTEXT,
+ FragmentInstanceContext.createFragmentInstanceContextForCompaction(
+ EnvironmentUtils.TEST_QUERY_CONTEXT.getQueryId()),
targetResources,
new ArrayList<>(),
- null,
- null,
true);
int count = 0;
- while (tsFilesReader.hasNextBatch()) {
- BatchData batchData = tsFilesReader.nextBatch();
- while (batchData.hasCurrent()) {
+ while (tsBlockReader.hasNextBatch()) {
+ TsBlock block = tsBlockReader.nextBatch();
+ IBatchDataIterator iterator = block.getTsBlockSingleColumnIterator();
+ while (iterator.hasNext()) {
if (measurementMaxTime.get(
COMPACTION_TEST_SG + PATH_SEPARATOR + "d" + i + PATH_SEPARATOR + "s" + j)
- >= batchData.currentTime()) {
+ >= iterator.currentTime()) {
Assert.fail();
}
measurementMaxTime.put(
COMPACTION_TEST_SG + PATH_SEPARATOR + "d" + i + PATH_SEPARATOR + "s" + j,
- batchData.currentTime());
+ iterator.currentTime());
if (i == 0
- && ((450 <= batchData.currentTime() && batchData.currentTime() < 550)
- || (550 <= batchData.currentTime() && batchData.currentTime() < 650))) {
- assertEquals(batchData.currentTime() + 20000, batchData.currentValue());
+ && ((450 <= iterator.currentTime() && iterator.currentTime() < 550)
+ || (550 <= iterator.currentTime() && iterator.currentTime() < 650))) {
+ assertEquals(iterator.currentTime() + 20000, iterator.currentValue());
} else if ((i < 3 && j < 4)
- && ((20 <= batchData.currentTime() && batchData.currentTime() < 220)
- || (250 <= batchData.currentTime() && batchData.currentTime() < 450)
- || (480 <= batchData.currentTime() && batchData.currentTime() < 680))) {
- assertEquals(batchData.currentTime() + 10000, batchData.currentValue());
+ && ((20 <= iterator.currentTime() && iterator.currentTime() < 220)
+ || (250 <= iterator.currentTime() && iterator.currentTime() < 450)
+ || (480 <= iterator.currentTime() && iterator.currentTime() < 680))) {
+ assertEquals(iterator.currentTime() + 10000, iterator.currentValue());
} else {
- assertEquals(batchData.currentTime(), batchData.currentValue());
+ assertEquals(iterator.currentTime(), iterator.currentValue());
}
count++;
- batchData.next();
+ iterator.next();
}
}
- tsFilesReader.close();
+ tsBlockReader.close();
if (i < 1) {
if (j < 4) {
assertEquals(630, count);
@@ -2792,34 +2801,34 @@ public class ReadPointCompactionPerformerTest extends AbstractCompactionTest {
COMPACTION_TEST_SG + PATH_SEPARATOR + "d10000",
Collections.singletonList("s1"),
schemas);
- IBatchReader tsFilesReader =
- new SeriesRawDataBatchReader(
+ IDataBlockReader tsBlockReader =
+ new SeriesDataBlockReader(
path,
TSDataType.VECTOR,
- EnvironmentUtils.TEST_QUERY_CONTEXT,
+ FragmentInstanceContext.createFragmentInstanceContextForCompaction(
+ EnvironmentUtils.TEST_QUERY_CONTEXT.getQueryId()),
seqResources,
unseqResources,
- null,
- null,
true);
int count = 0;
- while (tsFilesReader.hasNextBatch()) {
- BatchData batchData = tsFilesReader.nextBatch();
- while (batchData.hasCurrent()) {
- if (batchData.currentTime() % 100 < 50) {
+ while (tsBlockReader.hasNextBatch()) {
+ TsBlock block = tsBlockReader.nextBatch();
+ IBatchDataIterator iterator = block.getTsBlockAlignedRowIterator();
+ while (iterator.hasNext()) {
+ if (iterator.currentTime() % 100 < 50) {
assertEquals(
- batchData.currentTime() + 10000,
- ((TsPrimitiveType[]) (batchData.currentValue()))[0].getValue());
+ iterator.currentTime() + 10000,
+ ((TsPrimitiveType[]) (iterator.currentValue()))[0].getValue());
} else {
assertEquals(
- batchData.currentTime(),
- ((TsPrimitiveType[]) (batchData.currentValue()))[0].getValue());
+ iterator.currentTime(),
+ ((TsPrimitiveType[]) (iterator.currentValue()))[0].getValue());
}
count++;
- batchData.next();
+ iterator.next();
}
}
- tsFilesReader.close();
+ tsBlockReader.close();
assertEquals(500, count);
List<TsFileResource> targetResources =
@@ -2830,35 +2839,35 @@ public class ReadPointCompactionPerformerTest extends AbstractCompactionTest {
performer.perform();
CompactionUtils.moveTargetFile(targetResources, false, COMPACTION_TEST_SG);
- tsFilesReader =
- new SeriesRawDataBatchReader(
+ tsBlockReader =
+ new SeriesDataBlockReader(
path,
TSDataType.INT64,
- EnvironmentUtils.TEST_QUERY_CONTEXT,
+ FragmentInstanceContext.createFragmentInstanceContextForCompaction(
+ EnvironmentUtils.TEST_QUERY_CONTEXT.getQueryId()),
targetResources,
new ArrayList<>(),
- null,
- null,
true);
count = 0;
- while (tsFilesReader.hasNextBatch()) {
- BatchData batchData = tsFilesReader.nextBatch();
- while (batchData.hasCurrent()) {
- if (batchData.currentTime() % 100 < 50) {
+ while (tsBlockReader.hasNextBatch()) {
+ TsBlock block = tsBlockReader.nextBatch();
+ IBatchDataIterator iterator = block.getTsBlockAlignedRowIterator();
+ while (iterator.hasNext()) {
+ if (iterator.currentTime() % 100 < 50) {
assertEquals(
- batchData.currentTime() + 10000,
- ((TsPrimitiveType[]) (batchData.currentValue()))[0].getValue());
+ iterator.currentTime() + 10000,
+ ((TsPrimitiveType[]) (iterator.currentValue()))[0].getValue());
} else {
assertEquals(
- batchData.currentTime(),
- ((TsPrimitiveType[]) (batchData.currentValue()))[0].getValue());
+ iterator.currentTime(),
+ ((TsPrimitiveType[]) (iterator.currentValue()))[0].getValue());
}
count++;
- batchData.next();
+ iterator.next();
}
}
- tsFilesReader.close();
+ tsBlockReader.close();
assertEquals(500, count);
}
@@ -2899,43 +2908,43 @@ public class ReadPointCompactionPerformerTest extends AbstractCompactionTest {
COMPACTION_TEST_SG + PATH_SEPARATOR + "d" + i,
Collections.singletonList("s" + j),
schemas);
- IBatchReader tsFilesReader =
- new SeriesRawDataBatchReader(
+ IDataBlockReader tsBlockReader =
+ new SeriesDataBlockReader(
path,
TSDataType.VECTOR,
- EnvironmentUtils.TEST_QUERY_CONTEXT,
+ FragmentInstanceContext.createFragmentInstanceContextForCompaction(
+ EnvironmentUtils.TEST_QUERY_CONTEXT.getQueryId()),
seqResources,
unseqResources,
- null,
- null,
true);
int count = 0;
- while (tsFilesReader.hasNextBatch()) {
- BatchData batchData = tsFilesReader.nextBatch();
- while (batchData.hasCurrent()) {
+ while (tsBlockReader.hasNextBatch()) {
+ TsBlock block = tsBlockReader.nextBatch();
+ IBatchDataIterator iterator = block.getTsBlockAlignedRowIterator();
+ while (iterator.hasNext()) {
if (i == TsFileGeneratorUtils.getAlignDeviceOffset()
- && ((450 <= batchData.currentTime() && batchData.currentTime() < 550)
- || (550 <= batchData.currentTime() && batchData.currentTime() < 650))) {
+ && ((450 <= iterator.currentTime() && iterator.currentTime() < 550)
+ || (550 <= iterator.currentTime() && iterator.currentTime() < 650))) {
assertEquals(
- batchData.currentTime() + 20000,
- ((TsPrimitiveType[]) (batchData.currentValue()))[0].getValue());
+ iterator.currentTime() + 20000,
+ ((TsPrimitiveType[]) (iterator.currentValue()))[0].getValue());
} else if ((i < TsFileGeneratorUtils.getAlignDeviceOffset() + 3 && j < 4)
- && ((20 <= batchData.currentTime() && batchData.currentTime() < 220)
- || (250 <= batchData.currentTime() && batchData.currentTime() < 450)
- || (480 <= batchData.currentTime() && batchData.currentTime() < 680))) {
+ && ((20 <= iterator.currentTime() && iterator.currentTime() < 220)
+ || (250 <= iterator.currentTime() && iterator.currentTime() < 450)
+ || (480 <= iterator.currentTime() && iterator.currentTime() < 680))) {
assertEquals(
- batchData.currentTime() + 10000,
- ((TsPrimitiveType[]) (batchData.currentValue()))[0].getValue());
+ iterator.currentTime() + 10000,
+ ((TsPrimitiveType[]) (iterator.currentValue()))[0].getValue());
} else {
assertEquals(
- batchData.currentTime(),
- ((TsPrimitiveType[]) (batchData.currentValue()))[0].getValue());
+ iterator.currentTime(),
+ ((TsPrimitiveType[]) (iterator.currentValue()))[0].getValue());
}
count++;
- batchData.next();
+ iterator.next();
}
}
- tsFilesReader.close();
+ tsBlockReader.close();
if (i < TsFileGeneratorUtils.getAlignDeviceOffset() + 2 && j < 3) {
assertEquals(1280, count);
} else if (i < TsFileGeneratorUtils.getAlignDeviceOffset() + 1 && j < 4) {
@@ -2971,43 +2980,43 @@ public class ReadPointCompactionPerformerTest extends AbstractCompactionTest {
COMPACTION_TEST_SG + PATH_SEPARATOR + "d" + i,
Collections.singletonList("s" + j),
schemas);
- IBatchReader tsFilesReader =
- new SeriesRawDataBatchReader(
+ IDataBlockReader tsBlockReader =
+ new SeriesDataBlockReader(
path,
TSDataType.VECTOR,
- EnvironmentUtils.TEST_QUERY_CONTEXT,
+ FragmentInstanceContext.createFragmentInstanceContextForCompaction(
+ EnvironmentUtils.TEST_QUERY_CONTEXT.getQueryId()),
targetResources,
new ArrayList<>(),
- null,
- null,
true);
int count = 0;
- while (tsFilesReader.hasNextBatch()) {
- BatchData batchData = tsFilesReader.nextBatch();
- while (batchData.hasCurrent()) {
+ while (tsBlockReader.hasNextBatch()) {
+ TsBlock block = tsBlockReader.nextBatch();
+ IBatchDataIterator iterator = block.getTsBlockAlignedRowIterator();
+ while (iterator.hasNext()) {
if (i == TsFileGeneratorUtils.getAlignDeviceOffset()
- && ((450 <= batchData.currentTime() && batchData.currentTime() < 550)
- || (550 <= batchData.currentTime() && batchData.currentTime() < 650))) {
+ && ((450 <= iterator.currentTime() && iterator.currentTime() < 550)
+ || (550 <= iterator.currentTime() && iterator.currentTime() < 650))) {
assertEquals(
- batchData.currentTime() + 20000,
- ((TsPrimitiveType[]) (batchData.currentValue()))[0].getValue());
+ iterator.currentTime() + 20000,
+ ((TsPrimitiveType[]) (iterator.currentValue()))[0].getValue());
} else if ((i < TsFileGeneratorUtils.getAlignDeviceOffset() + 3 && j < 4)
- && ((20 <= batchData.currentTime() && batchData.currentTime() < 220)
- || (250 <= batchData.currentTime() && batchData.currentTime() < 450)
- || (480 <= batchData.currentTime() && batchData.currentTime() < 680))) {
+ && ((20 <= iterator.currentTime() && iterator.currentTime() < 220)
+ || (250 <= iterator.currentTime() && iterator.currentTime() < 450)
+ || (480 <= iterator.currentTime() && iterator.currentTime() < 680))) {
assertEquals(
- batchData.currentTime() + 10000,
- ((TsPrimitiveType[]) (batchData.currentValue()))[0].getValue());
+ iterator.currentTime() + 10000,
+ ((TsPrimitiveType[]) (iterator.currentValue()))[0].getValue());
} else {
assertEquals(
- batchData.currentTime(),
- ((TsPrimitiveType[]) (batchData.currentValue()))[0].getValue());
+ iterator.currentTime(),
+ ((TsPrimitiveType[]) (iterator.currentValue()))[0].getValue());
}
count++;
- batchData.next();
+ iterator.next();
}
}
- tsFilesReader.close();
+ tsBlockReader.close();
if (i < TsFileGeneratorUtils.getAlignDeviceOffset() + 2 && j < 3) {
assertEquals(1280, count);
} else if (i < TsFileGeneratorUtils.getAlignDeviceOffset() + 1 && j < 4) {
@@ -3097,43 +3106,43 @@ public class ReadPointCompactionPerformerTest extends AbstractCompactionTest {
COMPACTION_TEST_SG + PATH_SEPARATOR + "d" + i,
Collections.singletonList("s" + j),
schemas);
- IBatchReader tsFilesReader =
- new SeriesRawDataBatchReader(
+ IDataBlockReader tsBlockReader =
+ new SeriesDataBlockReader(
path,
TSDataType.VECTOR,
- EnvironmentUtils.TEST_QUERY_CONTEXT,
+ FragmentInstanceContext.createFragmentInstanceContextForCompaction(
+ EnvironmentUtils.TEST_QUERY_CONTEXT.getQueryId()),
seqResources,
unseqResources,
- null,
- null,
true);
int count = 0;
- while (tsFilesReader.hasNextBatch()) {
- BatchData batchData = tsFilesReader.nextBatch();
- while (batchData.hasCurrent()) {
+ while (tsBlockReader.hasNextBatch()) {
+ TsBlock block = tsBlockReader.nextBatch();
+ IBatchDataIterator iterator = block.getTsBlockAlignedRowIterator();
+ while (iterator.hasNext()) {
if (i == TsFileGeneratorUtils.getAlignDeviceOffset()
- && ((450 <= batchData.currentTime() && batchData.currentTime() < 550)
- || (550 <= batchData.currentTime() && batchData.currentTime() < 650))) {
+ && ((450 <= iterator.currentTime() && iterator.currentTime() < 550)
+ || (550 <= iterator.currentTime() && iterator.currentTime() < 650))) {
assertEquals(
- batchData.currentTime() + 20000,
- ((TsPrimitiveType[]) (batchData.currentValue()))[0].getValue());
+ iterator.currentTime() + 20000,
+ ((TsPrimitiveType[]) (iterator.currentValue()))[0].getValue());
} else if ((i < TsFileGeneratorUtils.getAlignDeviceOffset() + 3 && j < 4)
- && ((20 <= batchData.currentTime() && batchData.currentTime() < 220)
- || (250 <= batchData.currentTime() && batchData.currentTime() < 450)
- || (480 <= batchData.currentTime() && batchData.currentTime() < 680))) {
+ && ((20 <= iterator.currentTime() && iterator.currentTime() < 220)
+ || (250 <= iterator.currentTime() && iterator.currentTime() < 450)
+ || (480 <= iterator.currentTime() && iterator.currentTime() < 680))) {
assertEquals(
- batchData.currentTime() + 10000,
- ((TsPrimitiveType[]) (batchData.currentValue()))[0].getValue());
+ iterator.currentTime() + 10000,
+ ((TsPrimitiveType[]) (iterator.currentValue()))[0].getValue());
} else {
assertEquals(
- batchData.currentTime(),
- ((TsPrimitiveType[]) (batchData.currentValue()))[0].getValue());
+ iterator.currentTime(),
+ ((TsPrimitiveType[]) (iterator.currentValue()))[0].getValue());
}
count++;
- batchData.next();
+ iterator.next();
}
}
- tsFilesReader.close();
+ tsBlockReader.close();
if ((i == TsFileGeneratorUtils.getAlignDeviceOffset() && j == 0)
|| (i == TsFileGeneratorUtils.getAlignDeviceOffset() && j == 1)
|| (i == TsFileGeneratorUtils.getAlignDeviceOffset() + 2 && j == 4)
@@ -3203,43 +3212,43 @@ public class ReadPointCompactionPerformerTest extends AbstractCompactionTest {
COMPACTION_TEST_SG + PATH_SEPARATOR + "d" + i,
Collections.singletonList("s" + j),
schemas);
- IBatchReader tsFilesReader =
- new SeriesRawDataBatchReader(
+ IDataBlockReader tsBlockReader =
+ new SeriesDataBlockReader(
path,
TSDataType.VECTOR,
- EnvironmentUtils.TEST_QUERY_CONTEXT,
+ FragmentInstanceContext.createFragmentInstanceContextForCompaction(
+ EnvironmentUtils.TEST_QUERY_CONTEXT.getQueryId()),
targetResources,
new ArrayList<>(),
- null,
- null,
true);
int count = 0;
- while (tsFilesReader.hasNextBatch()) {
- BatchData batchData = tsFilesReader.nextBatch();
- while (batchData.hasCurrent()) {
+ while (tsBlockReader.hasNextBatch()) {
+ TsBlock block = tsBlockReader.nextBatch();
+ IBatchDataIterator iterator = block.getTsBlockAlignedRowIterator();
+ while (iterator.hasNext()) {
if (i == TsFileGeneratorUtils.getAlignDeviceOffset()
- && ((450 <= batchData.currentTime() && batchData.currentTime() < 550)
- || (550 <= batchData.currentTime() && batchData.currentTime() < 650))) {
+ && ((450 <= iterator.currentTime() && iterator.currentTime() < 550)
+ || (550 <= iterator.currentTime() && iterator.currentTime() < 650))) {
assertEquals(
- batchData.currentTime() + 20000,
- ((TsPrimitiveType[]) (batchData.currentValue()))[0].getValue());
+ iterator.currentTime() + 20000,
+ ((TsPrimitiveType[]) (iterator.currentValue()))[0].getValue());
} else if ((i < TsFileGeneratorUtils.getAlignDeviceOffset() + 3 && j < 4)
- && ((20 <= batchData.currentTime() && batchData.currentTime() < 220)
- || (250 <= batchData.currentTime() && batchData.currentTime() < 450)
- || (480 <= batchData.currentTime() && batchData.currentTime() < 680))) {
+ && ((20 <= iterator.currentTime() && iterator.currentTime() < 220)
+ || (250 <= iterator.currentTime() && iterator.currentTime() < 450)
+ || (480 <= iterator.currentTime() && iterator.currentTime() < 680))) {
assertEquals(
- batchData.currentTime() + 10000,
- ((TsPrimitiveType[]) (batchData.currentValue()))[0].getValue());
+ iterator.currentTime() + 10000,
+ ((TsPrimitiveType[]) (iterator.currentValue()))[0].getValue());
} else {
assertEquals(
- batchData.currentTime(),
- ((TsPrimitiveType[]) (batchData.currentValue()))[0].getValue());
+ iterator.currentTime(),
+ ((TsPrimitiveType[]) (iterator.currentValue()))[0].getValue());
}
count++;
- batchData.next();
+ iterator.next();
}
}
- tsFilesReader.close();
+ tsBlockReader.close();
if ((i == TsFileGeneratorUtils.getAlignDeviceOffset() && j == 0)
|| (i == TsFileGeneratorUtils.getAlignDeviceOffset() && j == 1)
|| (i == TsFileGeneratorUtils.getAlignDeviceOffset() + 2 && j == 4)
@@ -3335,43 +3344,43 @@ public class ReadPointCompactionPerformerTest extends AbstractCompactionTest {
COMPACTION_TEST_SG + PATH_SEPARATOR + "d" + i,
Collections.singletonList("s" + j),
schemas);
- IBatchReader tsFilesReader =
- new SeriesRawDataBatchReader(
+ IDataBlockReader tsBlockReader =
+ new SeriesDataBlockReader(
path,
TSDataType.VECTOR,
- EnvironmentUtils.TEST_QUERY_CONTEXT,
+ FragmentInstanceContext.createFragmentInstanceContextForCompaction(
+ EnvironmentUtils.TEST_QUERY_CONTEXT.getQueryId()),
seqResources,
unseqResources,
- null,
- null,
true);
int count = 0;
- while (tsFilesReader.hasNextBatch()) {
- BatchData batchData = tsFilesReader.nextBatch();
- while (batchData.hasCurrent()) {
+ while (tsBlockReader.hasNextBatch()) {
+ TsBlock block = tsBlockReader.nextBatch();
+ IBatchDataIterator iterator = block.getTsBlockAlignedRowIterator();
+ while (iterator.hasNext()) {
if (i == TsFileGeneratorUtils.getAlignDeviceOffset()
- && ((450 <= batchData.currentTime() && batchData.currentTime() < 550)
- || (550 <= batchData.currentTime() && batchData.currentTime() < 650))) {
+ && ((450 <= iterator.currentTime() && iterator.currentTime() < 550)
+ || (550 <= iterator.currentTime() && iterator.currentTime() < 650))) {
assertEquals(
- batchData.currentTime() + 20000,
- ((TsPrimitiveType[]) (batchData.currentValue()))[0].getValue());
+ iterator.currentTime() + 20000,
+ ((TsPrimitiveType[]) (iterator.currentValue()))[0].getValue());
} else if ((i < TsFileGeneratorUtils.getAlignDeviceOffset() + 3 && j < 4)
- && ((20 <= batchData.currentTime() && batchData.currentTime() < 220)
- || (250 <= batchData.currentTime() && batchData.currentTime() < 450)
- || (480 <= batchData.currentTime() && batchData.currentTime() < 680))) {
+ && ((20 <= iterator.currentTime() && iterator.currentTime() < 220)
+ || (250 <= iterator.currentTime() && iterator.currentTime() < 450)
+ || (480 <= iterator.currentTime() && iterator.currentTime() < 680))) {
assertEquals(
- batchData.currentTime() + 10000,
- ((TsPrimitiveType[]) (batchData.currentValue()))[0].getValue());
+ iterator.currentTime() + 10000,
+ ((TsPrimitiveType[]) (iterator.currentValue()))[0].getValue());
} else {
assertEquals(
- batchData.currentTime(),
- ((TsPrimitiveType[]) (batchData.currentValue()))[0].getValue());
+ iterator.currentTime(),
+ ((TsPrimitiveType[]) (iterator.currentValue()))[0].getValue());
}
count++;
- batchData.next();
+ iterator.next();
}
}
- tsFilesReader.close();
+ tsBlockReader.close();
if (i == 0 || i == 1 || i == 2) {
assertEquals(0, count);
}
@@ -3412,43 +3421,43 @@ public class ReadPointCompactionPerformerTest extends AbstractCompactionTest {
COMPACTION_TEST_SG + PATH_SEPARATOR + "d" + i,
Collections.singletonList("s" + j),
schemas);
- IBatchReader tsFilesReader =
- new SeriesRawDataBatchReader(
+ IDataBlockReader tsBlockReader =
+ new SeriesDataBlockReader(
path,
TSDataType.VECTOR,
- EnvironmentUtils.TEST_QUERY_CONTEXT,
+ FragmentInstanceContext.createFragmentInstanceContextForCompaction(
+ EnvironmentUtils.TEST_QUERY_CONTEXT.getQueryId()),
targetResources,
new ArrayList<>(),
- null,
- null,
true);
int count = 0;
- while (tsFilesReader.hasNextBatch()) {
- BatchData batchData = tsFilesReader.nextBatch();
- while (batchData.hasCurrent()) {
+ while (tsBlockReader.hasNextBatch()) {
+ TsBlock block = tsBlockReader.nextBatch();
+ IBatchDataIterator iterator = block.getTsBlockAlignedRowIterator();
+ while (iterator.hasNext()) {
if (i == TsFileGeneratorUtils.getAlignDeviceOffset()
- && ((450 <= batchData.currentTime() && batchData.currentTime() < 550)
- || (550 <= batchData.currentTime() && batchData.currentTime() < 650))) {
+ && ((450 <= iterator.currentTime() && iterator.currentTime() < 550)
+ || (550 <= iterator.currentTime() && iterator.currentTime() < 650))) {
assertEquals(
- batchData.currentTime() + 20000,
- ((TsPrimitiveType[]) (batchData.currentValue()))[0].getValue());
+ iterator.currentTime() + 20000,
+ ((TsPrimitiveType[]) (iterator.currentValue()))[0].getValue());
} else if ((i < TsFileGeneratorUtils.getAlignDeviceOffset() + 3 && j < 4)
- && ((20 <= batchData.currentTime() && batchData.currentTime() < 220)
- || (250 <= batchData.currentTime() && batchData.currentTime() < 450)
- || (480 <= batchData.currentTime() && batchData.currentTime() < 680))) {
+ && ((20 <= iterator.currentTime() && iterator.currentTime() < 220)
+ || (250 <= iterator.currentTime() && iterator.currentTime() < 450)
+ || (480 <= iterator.currentTime() && iterator.currentTime() < 680))) {
assertEquals(
- batchData.currentTime() + 10000,
- ((TsPrimitiveType[]) (batchData.currentValue()))[0].getValue());
+ iterator.currentTime() + 10000,
+ ((TsPrimitiveType[]) (iterator.currentValue()))[0].getValue());
} else {
assertEquals(
- batchData.currentTime(),
- ((TsPrimitiveType[]) (batchData.currentValue()))[0].getValue());
+ iterator.currentTime(),
+ ((TsPrimitiveType[]) (iterator.currentValue()))[0].getValue());
}
count++;
- batchData.next();
+ iterator.next();
}
}
- tsFilesReader.close();
+ tsBlockReader.close();
if (i == 0 || i == 1 || i == 2) {
assertEquals(0, count);
}
@@ -3548,43 +3557,43 @@ public class ReadPointCompactionPerformerTest extends AbstractCompactionTest {
COMPACTION_TEST_SG + PATH_SEPARATOR + "d" + i,
Collections.singletonList("s" + j),
schemas);
- IBatchReader tsFilesReader =
- new SeriesRawDataBatchReader(
+ IDataBlockReader tsBlockReader =
+ new SeriesDataBlockReader(
path,
TSDataType.VECTOR,
- EnvironmentUtils.TEST_QUERY_CONTEXT,
+ FragmentInstanceContext.createFragmentInstanceContextForCompaction(
+ EnvironmentUtils.TEST_QUERY_CONTEXT.getQueryId()),
seqResources,
unseqResources,
- null,
- null,
true);
int count = 0;
- while (tsFilesReader.hasNextBatch()) {
- BatchData batchData = tsFilesReader.nextBatch();
- while (batchData.hasCurrent()) {
+ while (tsBlockReader.hasNextBatch()) {
+ TsBlock block = tsBlockReader.nextBatch();
+ IBatchDataIterator iterator = block.getTsBlockAlignedRowIterator();
+ while (iterator.hasNext()) {
if (i == TsFileGeneratorUtils.getAlignDeviceOffset()
- && ((450 <= batchData.currentTime() && batchData.currentTime() < 550)
- || (550 <= batchData.currentTime() && batchData.currentTime() < 650))) {
+ && ((450 <= iterator.currentTime() && iterator.currentTime() < 550)
+ || (550 <= iterator.currentTime() && iterator.currentTime() < 650))) {
assertEquals(
- batchData.currentTime() + 20000,
- ((TsPrimitiveType[]) (batchData.currentValue()))[0].getValue());
+ iterator.currentTime() + 20000,
+ ((TsPrimitiveType[]) (iterator.currentValue()))[0].getValue());
} else if ((i < TsFileGeneratorUtils.getAlignDeviceOffset() + 3 && j < 4)
- && ((20 <= batchData.currentTime() && batchData.currentTime() < 220)
- || (250 <= batchData.currentTime() && batchData.currentTime() < 450)
- || (480 <= batchData.currentTime() && batchData.currentTime() < 680))) {
+ && ((20 <= iterator.currentTime() && iterator.currentTime() < 220)
+ || (250 <= iterator.currentTime() && iterator.currentTime() < 450)
+ || (480 <= iterator.currentTime() && iterator.currentTime() < 680))) {
assertEquals(
- batchData.currentTime() + 10000,
- ((TsPrimitiveType[]) (batchData.currentValue()))[0].getValue());
+ iterator.currentTime() + 10000,
+ ((TsPrimitiveType[]) (iterator.currentValue()))[0].getValue());
} else {
assertEquals(
- batchData.currentTime(),
- ((TsPrimitiveType[]) (batchData.currentValue()))[0].getValue());
+ iterator.currentTime(),
+ ((TsPrimitiveType[]) (iterator.currentValue()))[0].getValue());
}
count++;
- batchData.next();
+ iterator.next();
}
}
- tsFilesReader.close();
+ tsBlockReader.close();
if (i == 0 || i == 1 || i == 2) {
assertEquals(0, count);
}
@@ -3625,43 +3634,43 @@ public class ReadPointCompactionPerformerTest extends AbstractCompactionTest {
COMPACTION_TEST_SG + PATH_SEPARATOR + "d" + i,
Collections.singletonList("s" + j),
schemas);
- IBatchReader tsFilesReader =
- new SeriesRawDataBatchReader(
+ IDataBlockReader tsBlockReader =
+ new SeriesDataBlockReader(
path,
TSDataType.VECTOR,
- EnvironmentUtils.TEST_QUERY_CONTEXT,
+ FragmentInstanceContext.createFragmentInstanceContextForCompaction(
+ EnvironmentUtils.TEST_QUERY_CONTEXT.getQueryId()),
targetResources,
new ArrayList<>(),
- null,
- null,
true);
int count = 0;
- while (tsFilesReader.hasNextBatch()) {
- BatchData batchData = tsFilesReader.nextBatch();
- while (batchData.hasCurrent()) {
+ while (tsBlockReader.hasNextBatch()) {
+ TsBlock block = tsBlockReader.nextBatch();
+ IBatchDataIterator iterator = block.getTsBlockAlignedRowIterator();
+ while (iterator.hasNext()) {
if (i == TsFileGeneratorUtils.getAlignDeviceOffset()
- && ((450 <= batchData.currentTime() && batchData.currentTime() < 550)
- || (550 <= batchData.currentTime() && batchData.currentTime() < 650))) {
+ && ((450 <= iterator.currentTime() && iterator.currentTime() < 550)
+ || (550 <= iterator.currentTime() && iterator.currentTime() < 650))) {
assertEquals(
- batchData.currentTime() + 20000,
- ((TsPrimitiveType[]) (batchData.currentValue()))[0].getValue());
+ iterator.currentTime() + 20000,
+ ((TsPrimitiveType[]) (iterator.currentValue()))[0].getValue());
} else if ((i < TsFileGeneratorUtils.getAlignDeviceOffset() + 3 && j < 4)
- && ((20 <= batchData.currentTime() && batchData.currentTime() < 220)
- || (250 <= batchData.currentTime() && batchData.currentTime() < 450)
- || (480 <= batchData.currentTime() && batchData.currentTime() < 680))) {
+ && ((20 <= iterator.currentTime() && iterator.currentTime() < 220)
+ || (250 <= iterator.currentTime() && iterator.currentTime() < 450)
+ || (480 <= iterator.currentTime() && iterator.currentTime() < 680))) {
assertEquals(
- batchData.currentTime() + 10000,
- ((TsPrimitiveType[]) (batchData.currentValue()))[0].getValue());
+ iterator.currentTime() + 10000,
+ ((TsPrimitiveType[]) (iterator.currentValue()))[0].getValue());
} else {
assertEquals(
- batchData.currentTime(),
- ((TsPrimitiveType[]) (batchData.currentValue()))[0].getValue());
+ iterator.currentTime(),
+ ((TsPrimitiveType[]) (iterator.currentValue()))[0].getValue());
}
count++;
- batchData.next();
+ iterator.next();
}
}
- tsFilesReader.close();
+ tsBlockReader.close();
if (i == 0 || i == 1 || i == 2) {
assertEquals(0, count);
}
@@ -3811,30 +3820,30 @@ public class ReadPointCompactionPerformerTest extends AbstractCompactionTest {
COMPACTION_TEST_SG + PATH_SEPARATOR + "d" + i,
"s" + j,
new MeasurementSchema("s" + j, TSDataType.INT64));
- IBatchReader tsFilesReader =
- new SeriesRawDataBatchReader(
+ IDataBlockReader tsBlockReader =
+ new SeriesDataBlockReader(
path,
TSDataType.INT64,
- EnvironmentUtils.TEST_QUERY_CONTEXT,
+ FragmentInstanceContext.createFragmentInstanceContextForCompaction(
+ EnvironmentUtils.TEST_QUERY_CONTEXT.getQueryId()),
targetResources,
new ArrayList<>(),
- null,
- null,
true);
int count = 0;
- while (tsFilesReader.hasNextBatch()) {
- BatchData batchData = tsFilesReader.nextBatch();
- while (batchData.hasCurrent()) {
- if (batchData.currentTime() < 20) {
- assertEquals(batchData.currentTime(), batchData.currentValue());
+ while (tsBlockReader.hasNextBatch()) {
+ TsBlock block = tsBlockReader.nextBatch();
+ IBatchDataIterator iterator = block.getTsBlockSingleColumnIterator();
+ while (iterator.hasNext()) {
+ if (iterator.currentTime() < 20) {
+ assertEquals(iterator.currentTime(), iterator.currentValue());
} else {
- assertEquals(batchData.currentTime() + 10000, batchData.currentValue());
+ assertEquals(iterator.currentTime() + 10000, iterator.currentValue());
}
count++;
- batchData.next();
+ iterator.next();
}
}
- tsFilesReader.close();
+ tsBlockReader.close();
if (i < 2 && j < 3) {
assertEquals(920, count);
} else {