You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by hu...@apache.org on 2022/12/23 08:53:44 UTC
[iotdb] 02/03: delete SeriesReader
This is an automated email from the ASF dual-hosted git repository.
hui pushed a commit to branch lmh/removeOldCode
in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit 9eb6d92f7700bbe1f81467213e8608cd742e5687
Author: Minghui Liu <li...@foxmail.com>
AuthorDate: Fri Dec 23 16:43:13 2022 +0800
delete SeriesReader
---
.../db/metadata/utils/ResourceByPathUtils.java | 63 -
.../db/query/control/QueryResourceManager.java | 11 -
.../reader/series/SeriesRawDataBatchReader.java | 139 --
.../iotdb/db/query/reader/series/SeriesReader.java | 1383 --------------------
.../ReadPointCompactionPerformerTest.java | 3 +-
...eCrossSpaceCompactionWithFastPerformerTest.java | 121 +-
...sSpaceCompactionWithReadPointPerformerTest.java | 163 +--
.../SizeTieredCompactionRecoverTest.java | 230 ++--
8 files changed, 221 insertions(+), 1892 deletions(-)
diff --git a/server/src/main/java/org/apache/iotdb/db/metadata/utils/ResourceByPathUtils.java b/server/src/main/java/org/apache/iotdb/db/metadata/utils/ResourceByPathUtils.java
index 00c04840e8..fedf352383 100644
--- a/server/src/main/java/org/apache/iotdb/db/metadata/utils/ResourceByPathUtils.java
+++ b/server/src/main/java/org/apache/iotdb/db/metadata/utils/ResourceByPathUtils.java
@@ -21,7 +21,6 @@ package org.apache.iotdb.db.metadata.utils;
import org.apache.iotdb.commons.path.AlignedPath;
import org.apache.iotdb.commons.path.MeasurementPath;
import org.apache.iotdb.commons.path.PartialPath;
-import org.apache.iotdb.commons.utils.TestOnly;
import org.apache.iotdb.db.engine.memtable.AlignedWritableMemChunk;
import org.apache.iotdb.db.engine.memtable.AlignedWritableMemChunkGroup;
import org.apache.iotdb.db.engine.memtable.IMemTable;
@@ -37,8 +36,6 @@ import org.apache.iotdb.db.exception.query.QueryProcessException;
import org.apache.iotdb.db.metadata.idtable.entry.DeviceIDFactory;
import org.apache.iotdb.db.metadata.idtable.entry.IDeviceID;
import org.apache.iotdb.db.query.context.QueryContext;
-import org.apache.iotdb.db.query.reader.series.AlignedSeriesReader;
-import org.apache.iotdb.db.query.reader.series.SeriesReader;
import org.apache.iotdb.db.utils.QueryUtils;
import org.apache.iotdb.db.utils.datastructure.TVList;
import org.apache.iotdb.tsfile.file.metadata.AlignedChunkMetadata;
@@ -51,7 +48,6 @@ import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
import org.apache.iotdb.tsfile.file.metadata.enums.TSEncoding;
import org.apache.iotdb.tsfile.file.metadata.statistics.Statistics;
import org.apache.iotdb.tsfile.read.common.TimeRange;
-import org.apache.iotdb.tsfile.read.filter.basic.Filter;
import org.apache.iotdb.tsfile.utils.Pair;
import org.apache.iotdb.tsfile.write.schema.IMeasurementSchema;
import org.apache.iotdb.tsfile.write.schema.VectorMeasurementSchema;
@@ -62,7 +58,6 @@ import java.io.Serializable;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
-import java.util.Set;
import static org.apache.iotdb.commons.path.AlignedPath.VECTOR_PLACEHOLDER;
@@ -81,17 +76,6 @@ public abstract class ResourceByPathUtils {
throw new UnsupportedOperationException("Should call exact sub class!");
}
- @TestOnly
- public abstract SeriesReader createSeriesReader(
- Set<String> allSensors,
- TSDataType dataType,
- QueryContext context,
- List<TsFileResource> seqFileResource,
- List<TsFileResource> unseqFileResource,
- Filter timeFilter,
- Filter valueFilter,
- boolean ascending);
-
public abstract ITimeSeriesMetadata generateTimeSeriesMetadata(
List<ReadOnlyMemChunk> readOnlyMemChunk, List<IChunkMetadata> chunkMetadataList)
throws IOException;
@@ -126,29 +110,6 @@ class AlignedResourceByPathUtils extends ResourceByPathUtils {
this.partialPath = (AlignedPath) partialPath;
}
- @Override
- @TestOnly
- public AlignedSeriesReader createSeriesReader(
- Set<String> allSensors,
- TSDataType dataType,
- QueryContext context,
- List<TsFileResource> seqFileResource,
- List<TsFileResource> unseqFileResource,
- Filter timeFilter,
- Filter valueFilter,
- boolean ascending) {
- return new AlignedSeriesReader(
- partialPath,
- allSensors,
- dataType,
- context,
- seqFileResource,
- unseqFileResource,
- timeFilter,
- valueFilter,
- ascending);
- }
-
/**
* Because the unclosed tsfile don't have TimeSeriesMetadata and memtables in the memory don't
* have chunkMetadata, but query will use these, so we need to generate it for them.
@@ -345,30 +306,6 @@ class MeasurementResourceByPathUtils extends ResourceByPathUtils {
this.partialPath = (MeasurementPath) partialPath;
}
- @Override
- @TestOnly
- public SeriesReader createSeriesReader(
- Set<String> allSensors,
- TSDataType dataType,
- QueryContext context,
- List<TsFileResource> seqFileResource,
- List<TsFileResource> unseqFileResource,
- Filter timeFilter,
- Filter valueFilter,
- boolean ascending) {
- allSensors.add(partialPath.getMeasurement());
- return new SeriesReader(
- partialPath,
- allSensors,
- dataType,
- context,
- seqFileResource,
- unseqFileResource,
- timeFilter,
- valueFilter,
- ascending);
- }
-
/**
* Because the unclosed tsfile don't have TimeSeriesMetadata and memtables in the memory don't
* have chunkMetadata, but query will use these, so we need to generate it for them.
diff --git a/server/src/main/java/org/apache/iotdb/db/query/control/QueryResourceManager.java b/server/src/main/java/org/apache/iotdb/db/query/control/QueryResourceManager.java
index 0934f3fd10..6005c47024 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/control/QueryResourceManager.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/control/QueryResourceManager.java
@@ -40,16 +40,8 @@ public class QueryResourceManager {
private final AtomicLong queryIdAtom = new AtomicLong();
private final QueryFileManager filePathsManager;
- /**
- * Record QueryDataSource used in queries
- *
- * <p>Key: query job id. Value: QueryDataSource corresponding to each data region.
- */
- private final Map<Long, Map<String, QueryDataSource>> cachedQueryDataSourcesMap;
-
private QueryResourceManager() {
filePathsManager = new QueryFileManager();
- cachedQueryDataSourcesMap = new ConcurrentHashMap<>();
}
public static QueryResourceManager getInstance() {
@@ -86,9 +78,6 @@ public class QueryResourceManager {
// close and delete UDF temp files
TemporaryQueryDataFileService.getInstance().deregister(queryId);
-
- // remove cached QueryDataSource
- cachedQueryDataSourcesMap.remove(queryId);
}
public QueryFileManager getQueryFileManager() {
diff --git a/server/src/main/java/org/apache/iotdb/db/query/reader/series/SeriesRawDataBatchReader.java b/server/src/main/java/org/apache/iotdb/db/query/reader/series/SeriesRawDataBatchReader.java
deleted file mode 100644
index 6aee37c213..0000000000
--- a/server/src/main/java/org/apache/iotdb/db/query/reader/series/SeriesRawDataBatchReader.java
+++ /dev/null
@@ -1,139 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.iotdb.db.query.reader.series;
-
-import org.apache.iotdb.commons.path.PartialPath;
-import org.apache.iotdb.commons.utils.TestOnly;
-import org.apache.iotdb.db.engine.storagegroup.TsFileResource;
-import org.apache.iotdb.db.metadata.utils.ResourceByPathUtils;
-import org.apache.iotdb.db.query.context.QueryContext;
-import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
-import org.apache.iotdb.tsfile.read.common.BatchData;
-import org.apache.iotdb.tsfile.read.filter.basic.Filter;
-import org.apache.iotdb.tsfile.read.reader.IBatchReader;
-
-import java.io.IOException;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Set;
-
-public class SeriesRawDataBatchReader implements IBatchReader {
-
- private final SeriesReader seriesReader;
-
- private BatchData batchData;
- private boolean hasCachedBatchData = false;
-
- @TestOnly
- public SeriesRawDataBatchReader(
- PartialPath seriesPath,
- TSDataType dataType,
- QueryContext context,
- List<TsFileResource> seqFileResource,
- List<TsFileResource> unseqFileResource,
- Filter timeFilter,
- Filter valueFilter,
- boolean ascending) {
- Set<String> allSensors = new HashSet<>();
- this.seriesReader =
- ResourceByPathUtils.getResourceInstance(seriesPath)
- .createSeriesReader(
- allSensors,
- dataType,
- context,
- seqFileResource,
- unseqFileResource,
- timeFilter,
- valueFilter,
- ascending);
- }
-
- /**
- * This method overrides the AbstractDataReader.hasNextOverlappedPage for pause reads, to achieve
- * a continuous read
- */
- public boolean hasNextBatch() throws IOException {
-
- if (hasCachedBatchData) {
- return true;
- }
-
- /*
- * consume page data firstly
- */
- if (readPageData()) {
- hasCachedBatchData = true;
- return true;
- }
-
- /*
- * consume chunk data secondly
- */
- if (readChunkData()) {
- hasCachedBatchData = true;
- return true;
- }
-
- /*
- * consume next file finally
- */
- while (seriesReader.hasNextFile()) {
- if (readChunkData()) {
- hasCachedBatchData = true;
- return true;
- }
- }
- return hasCachedBatchData;
- }
-
- public BatchData nextBatch() throws IOException {
- if (hasCachedBatchData || hasNextBatch()) {
- hasCachedBatchData = false;
- return batchData;
- }
- throw new IOException("no next batch");
- }
-
- public void close() throws IOException {
- // no resources need to close
- }
-
- private boolean readChunkData() throws IOException {
- while (seriesReader.hasNextChunk()) {
- if (readPageData()) {
- return true;
- }
- }
- return false;
- }
-
- private boolean readPageData() throws IOException {
- while (seriesReader.hasNextPage()) {
- batchData = seriesReader.nextPage();
- if (!isEmpty(batchData)) {
- return true;
- }
- }
- return false;
- }
-
- private boolean isEmpty(BatchData batchData) {
- return batchData == null || !batchData.hasCurrent();
- }
-}
diff --git a/server/src/main/java/org/apache/iotdb/db/query/reader/series/SeriesReader.java b/server/src/main/java/org/apache/iotdb/db/query/reader/series/SeriesReader.java
deleted file mode 100644
index 6cf2559349..0000000000
--- a/server/src/main/java/org/apache/iotdb/db/query/reader/series/SeriesReader.java
+++ /dev/null
@@ -1,1383 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.iotdb.db.query.reader.series;
-
-import org.apache.iotdb.commons.path.PartialPath;
-import org.apache.iotdb.commons.utils.TestOnly;
-import org.apache.iotdb.db.engine.querycontext.QueryDataSource;
-import org.apache.iotdb.db.engine.storagegroup.TsFileResource;
-import org.apache.iotdb.db.metadata.idtable.IDTable;
-import org.apache.iotdb.db.query.context.QueryContext;
-import org.apache.iotdb.db.query.reader.universal.DescPriorityMergeReader;
-import org.apache.iotdb.db.query.reader.universal.PriorityMergeReader;
-import org.apache.iotdb.db.query.reader.universal.PriorityMergeReader.MergeReaderPriority;
-import org.apache.iotdb.db.utils.FileLoaderUtils;
-import org.apache.iotdb.db.utils.QueryUtils;
-import org.apache.iotdb.tsfile.file.metadata.AlignedChunkMetadata;
-import org.apache.iotdb.tsfile.file.metadata.AlignedTimeSeriesMetadata;
-import org.apache.iotdb.tsfile.file.metadata.IChunkMetadata;
-import org.apache.iotdb.tsfile.file.metadata.ITimeSeriesMetadata;
-import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
-import org.apache.iotdb.tsfile.file.metadata.statistics.Statistics;
-import org.apache.iotdb.tsfile.read.TimeValuePair;
-import org.apache.iotdb.tsfile.read.common.BatchData;
-import org.apache.iotdb.tsfile.read.common.BatchDataFactory;
-import org.apache.iotdb.tsfile.read.filter.basic.Filter;
-import org.apache.iotdb.tsfile.read.filter.basic.UnaryFilter;
-import org.apache.iotdb.tsfile.read.reader.IAlignedPageReader;
-import org.apache.iotdb.tsfile.read.reader.IPageReader;
-import org.apache.iotdb.tsfile.utils.TsPrimitiveType;
-
-import java.io.IOException;
-import java.io.Serializable;
-import java.util.Comparator;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.Objects;
-import java.util.PriorityQueue;
-import java.util.Set;
-import java.util.function.ToLongFunction;
-import java.util.stream.Collectors;
-
-public class SeriesReader {
-
- // inner class of SeriesReader for order purpose
- protected TimeOrderUtils orderUtils;
-
- protected final PartialPath seriesPath;
-
- // all the sensors in this device;
- protected final Set<String> allSensors;
- protected final TSDataType dataType;
- protected final QueryContext context;
-
- /*
- * There is at most one is not null between timeFilter and valueFilter
- *
- * timeFilter is pushed down to all pages (seq, unseq) without correctness problem
- *
- * valueFilter is pushed down to non-overlapped page only
- */
- protected final Filter timeFilter;
- protected final Filter valueFilter;
-
- protected final QueryDataSource dataSource;
-
- /*
- * file index
- */
- protected int curSeqFileIndex;
- protected int curUnseqFileIndex;
-
- /*
- * TimeSeriesMetadata cache
- */
- protected ITimeSeriesMetadata firstTimeSeriesMetadata;
- protected final List<ITimeSeriesMetadata> seqTimeSeriesMetadata = new LinkedList<>();
- protected final PriorityQueue<ITimeSeriesMetadata> unSeqTimeSeriesMetadata;
-
- /*
- * chunk cache
- */
- protected IChunkMetadata firstChunkMetadata;
- protected final PriorityQueue<IChunkMetadata> cachedChunkMetadata;
-
- /*
- * page cache
- */
- protected VersionPageReader firstPageReader;
- protected final List<VersionPageReader> seqPageReaders = new LinkedList<>();
- protected final PriorityQueue<VersionPageReader> unSeqPageReaders;
-
- /*
- * point cache
- */
- protected final PriorityMergeReader mergeReader;
-
- /*
- * result cache
- */
- protected boolean hasCachedNextOverlappedPage;
- protected BatchData cachedBatchData;
-
- /**
- * @param seriesPath For querying aligned series, the seriesPath should be AlignedPath. All
- * selected series belonging to one aligned device should be all in this one AlignedPath's
- * measurementList.
- * @param allSensors For querying aligned series, allSensors are not used.
- */
- @TestOnly
- public SeriesReader(
- PartialPath seriesPath,
- Set<String> allSensors,
- TSDataType dataType,
- QueryContext context,
- List<TsFileResource> seqFileResource,
- List<TsFileResource> unseqFileResource,
- Filter timeFilter,
- Filter valueFilter,
- boolean ascending) {
- this.seriesPath = IDTable.translateQueryPath(seriesPath);
- this.allSensors = allSensors;
- this.dataType = dataType;
- this.context = context;
- this.dataSource = new QueryDataSource(seqFileResource, unseqFileResource);
- QueryUtils.fillOrderIndexes(dataSource, seriesPath.getDevice(), ascending);
- this.timeFilter = timeFilter;
- this.valueFilter = valueFilter;
-
- if (ascending) {
- this.orderUtils = new AscTimeOrderUtils();
- mergeReader = getPriorityMergeReader();
- this.curSeqFileIndex = 0;
- this.curUnseqFileIndex = 0;
- } else {
- this.orderUtils = new DescTimeOrderUtils();
- mergeReader = getDescPriorityMergeReader();
- this.curSeqFileIndex = dataSource.getSeqResourcesSize() - 1;
- this.curUnseqFileIndex = 0;
- }
-
- unSeqTimeSeriesMetadata =
- new PriorityQueue<>(
- orderUtils.comparingLong(
- timeSeriesMetadata -> orderUtils.getOrderTime(timeSeriesMetadata.getStatistics())));
- cachedChunkMetadata =
- new PriorityQueue<>(
- orderUtils.comparingLong(
- chunkMetadata -> orderUtils.getOrderTime(chunkMetadata.getStatistics())));
- unSeqPageReaders =
- new PriorityQueue<>(
- orderUtils.comparingLong(
- versionPageReader -> orderUtils.getOrderTime(versionPageReader.getStatistics())));
- }
-
- protected PriorityMergeReader getPriorityMergeReader() {
- return new PriorityMergeReader();
- }
-
- protected DescPriorityMergeReader getDescPriorityMergeReader() {
- return new DescPriorityMergeReader();
- }
-
- public boolean isEmpty() throws IOException {
- return !(hasNextPage() || hasNextChunk() || hasNextFile());
- }
-
- boolean hasNextFile() throws IOException {
-
- if (!unSeqPageReaders.isEmpty()
- || firstPageReader != null
- || mergeReader.hasNextTimeValuePair()) {
- throw new IOException(
- "all cached pages should be consumed first unSeqPageReaders.isEmpty() is "
- + unSeqPageReaders.isEmpty()
- + " firstPageReader != null is "
- + (firstPageReader != null)
- + " mergeReader.hasNextTimeValuePair() = "
- + mergeReader.hasNextTimeValuePair());
- }
-
- if (firstChunkMetadata != null || !cachedChunkMetadata.isEmpty()) {
- throw new IOException("all cached chunks should be consumed first");
- }
-
- if (firstTimeSeriesMetadata != null) {
- return true;
- }
-
- while (firstTimeSeriesMetadata == null
- && (orderUtils.hasNextSeqResource()
- || orderUtils.hasNextUnseqResource()
- || !seqTimeSeriesMetadata.isEmpty()
- || !unSeqTimeSeriesMetadata.isEmpty())) {
- // init first time series metadata whose startTime is minimum
- tryToUnpackAllOverlappedFilesToTimeSeriesMetadata();
- }
-
- return firstTimeSeriesMetadata != null;
- }
-
- boolean isFileOverlapped() throws IOException {
- if (firstTimeSeriesMetadata == null) {
- throw new IOException("no first file");
- }
-
- Statistics fileStatistics = firstTimeSeriesMetadata.getStatistics();
- return !seqTimeSeriesMetadata.isEmpty()
- && orderUtils.isOverlapped(fileStatistics, seqTimeSeriesMetadata.get(0).getStatistics())
- || !unSeqTimeSeriesMetadata.isEmpty()
- && orderUtils.isOverlapped(
- fileStatistics, unSeqTimeSeriesMetadata.peek().getStatistics());
- }
-
- Statistics currentFileStatistics() {
- return firstTimeSeriesMetadata.getStatistics();
- }
-
- Statistics currentFileStatistics(int index) throws IOException {
- if (!(firstTimeSeriesMetadata instanceof AlignedTimeSeriesMetadata)) {
- throw new IOException("Can only get statistics by index from alignedTimeSeriesMetaData");
- }
- return ((AlignedTimeSeriesMetadata) firstTimeSeriesMetadata).getStatistics(index);
- }
-
- public Statistics currentFileTimeStatistics() throws IOException {
- if (!(firstTimeSeriesMetadata instanceof AlignedTimeSeriesMetadata)) {
- throw new IOException("Can only get statistics of time column from alignedChunkMetaData");
- }
- return ((AlignedTimeSeriesMetadata) firstTimeSeriesMetadata).getTimeStatistics();
- }
-
- boolean currentFileModified() throws IOException {
- if (firstTimeSeriesMetadata == null) {
- throw new IOException("no first file");
- }
- return firstTimeSeriesMetadata.isModified();
- }
-
- void skipCurrentFile() {
- firstTimeSeriesMetadata = null;
- }
-
- /**
- * This method should be called after hasNextFile() until no next chunk, make sure that all
- * overlapped chunks are consumed
- */
- boolean hasNextChunk() throws IOException {
-
- if (!unSeqPageReaders.isEmpty()
- || firstPageReader != null
- || mergeReader.hasNextTimeValuePair()) {
- throw new IOException(
- "all cached pages should be consumed first unSeqPageReaders.isEmpty() is "
- + unSeqPageReaders.isEmpty()
- + " firstPageReader != null is "
- + (firstPageReader != null)
- + " mergeReader.hasNextTimeValuePair() = "
- + mergeReader.hasNextTimeValuePair());
- }
-
- if (firstChunkMetadata != null) {
- return true;
- // hasNextFile() has not been invoked
- } else if (firstTimeSeriesMetadata == null && cachedChunkMetadata.isEmpty()) {
- return false;
- }
-
- while (firstChunkMetadata == null && (!cachedChunkMetadata.isEmpty() || hasNextFile())) {
- initFirstChunkMetadata();
- }
- return firstChunkMetadata != null;
- }
-
- /** construct first chunk metadata */
- private void initFirstChunkMetadata() throws IOException {
- if (firstTimeSeriesMetadata != null) {
- /*
- * try to unpack all overlapped TimeSeriesMetadata to cachedChunkMetadata
- */
- unpackAllOverlappedTsFilesToTimeSeriesMetadata(
- orderUtils.getOverlapCheckTime(firstTimeSeriesMetadata.getStatistics()));
- unpackAllOverlappedTimeSeriesMetadataToCachedChunkMetadata(
- orderUtils.getOverlapCheckTime(firstTimeSeriesMetadata.getStatistics()), true);
- } else {
- /*
- * first time series metadata is already unpacked, consume cached ChunkMetadata
- */
- while (!cachedChunkMetadata.isEmpty()) {
- firstChunkMetadata = cachedChunkMetadata.peek();
- unpackAllOverlappedTsFilesToTimeSeriesMetadata(
- orderUtils.getOverlapCheckTime(firstChunkMetadata.getStatistics()));
- unpackAllOverlappedTimeSeriesMetadataToCachedChunkMetadata(
- orderUtils.getOverlapCheckTime(firstChunkMetadata.getStatistics()), false);
- if (firstChunkMetadata.equals(cachedChunkMetadata.peek())) {
- firstChunkMetadata = cachedChunkMetadata.poll();
- break;
- }
- }
- }
- if (valueFilter != null
- && firstChunkMetadata != null
- && !isChunkOverlapped()
- && !firstChunkMetadata.isModified()
- && !valueFilter.satisfy(firstChunkMetadata.getStatistics())) {
- skipCurrentChunk();
- }
- }
-
- private void unpackAllOverlappedTimeSeriesMetadataToCachedChunkMetadata(
- long endpointTime, boolean init) throws IOException {
- while (!seqTimeSeriesMetadata.isEmpty()
- && orderUtils.isOverlapped(endpointTime, seqTimeSeriesMetadata.get(0).getStatistics())) {
- unpackOneTimeSeriesMetadata(seqTimeSeriesMetadata.remove(0));
- }
- while (!unSeqTimeSeriesMetadata.isEmpty()
- && orderUtils.isOverlapped(endpointTime, unSeqTimeSeriesMetadata.peek().getStatistics())) {
- unpackOneTimeSeriesMetadata(unSeqTimeSeriesMetadata.poll());
- }
-
- if (firstTimeSeriesMetadata != null
- && orderUtils.isOverlapped(endpointTime, firstTimeSeriesMetadata.getStatistics())) {
- unpackOneTimeSeriesMetadata(firstTimeSeriesMetadata);
- firstTimeSeriesMetadata = null;
- }
-
- if (init && firstChunkMetadata == null && !cachedChunkMetadata.isEmpty()) {
- firstChunkMetadata = cachedChunkMetadata.poll();
- }
- }
-
- protected void unpackOneTimeSeriesMetadata(ITimeSeriesMetadata timeSeriesMetadata)
- throws IOException {
- List<IChunkMetadata> chunkMetadataList =
- FileLoaderUtils.loadChunkMetadataList(timeSeriesMetadata);
- chunkMetadataList.forEach(chunkMetadata -> chunkMetadata.setSeq(timeSeriesMetadata.isSeq()));
- cachedChunkMetadata.addAll(chunkMetadataList);
- }
-
- boolean isChunkOverlapped() throws IOException {
- if (firstChunkMetadata == null) {
- throw new IOException("no first chunk");
- }
-
- Statistics chunkStatistics = firstChunkMetadata.getStatistics();
- return !cachedChunkMetadata.isEmpty()
- && orderUtils.isOverlapped(chunkStatistics, cachedChunkMetadata.peek().getStatistics());
- }
-
- Statistics currentChunkStatistics() {
- return firstChunkMetadata.getStatistics();
- }
-
- Statistics currentChunkStatistics(int index) throws IOException {
- if (!(firstChunkMetadata instanceof AlignedChunkMetadata)) {
- throw new IOException("Can only get statistics by index from vectorChunkMetaData");
- }
- return ((AlignedChunkMetadata) firstChunkMetadata).getStatistics(index);
- }
-
- Statistics currentChunkTimeStatistics() throws IOException {
- if (!(firstChunkMetadata instanceof AlignedChunkMetadata)) {
- throw new IOException("Can only get statistics of time column from alignedChunkMetaData");
- }
- return ((AlignedChunkMetadata) firstChunkMetadata).getTimeStatistics();
- }
-
- boolean currentChunkModified() throws IOException {
- if (firstChunkMetadata == null) {
- throw new IOException("no first chunk");
- }
- return firstChunkMetadata.isModified();
- }
-
- void skipCurrentChunk() {
- firstChunkMetadata = null;
- }
-
- /**
- * This method should be called after hasNextChunk() until no next page, make sure that all
- * overlapped pages are consumed
- */
- @SuppressWarnings("squid:S3776")
- // Suppress high Cognitive Complexity warning
- boolean hasNextPage() throws IOException {
-
- /*
- * has overlapped data before
- */
- if (hasCachedNextOverlappedPage) {
- return true;
- } else if (mergeReader.hasNextTimeValuePair() || firstPageOverlapped()) {
- if (hasNextOverlappedPage()) {
- cachedBatchData = nextOverlappedPage();
- if (cachedBatchData != null && cachedBatchData.hasCurrent()) {
- hasCachedNextOverlappedPage = true;
- return true;
- }
- }
- }
-
- if (firstPageReader != null) {
- return true;
- }
-
- /*
- * construct first page reader
- */
- if (firstChunkMetadata != null) {
- /*
- * try to unpack all overlapped ChunkMetadata to cachedPageReaders
- */
- unpackAllOverlappedChunkMetadataToPageReaders(
- orderUtils.getOverlapCheckTime(firstChunkMetadata.getStatistics()), true);
- } else {
- /*
- * first chunk metadata is already unpacked, consume cached pages
- */
- initFirstPageReader();
- }
-
- if (isExistOverlappedPage()) {
- return true;
- }
-
- // make sure firstPageReader won't be null while the unSeqPageReaders has more cached page
- // readers
- while (firstPageReader == null && (!seqPageReaders.isEmpty() || !unSeqPageReaders.isEmpty())) {
-
- initFirstPageReader();
-
- if (isExistOverlappedPage()) {
- return true;
- }
- }
- return firstPageReader != null;
- }
-
- private boolean isExistOverlappedPage() throws IOException {
- if (firstPageOverlapped()) {
- /*
- * next page is overlapped, read overlapped data and cache it
- */
- if (hasNextOverlappedPage()) {
- cachedBatchData = nextOverlappedPage();
- if (cachedBatchData != null && cachedBatchData.hasCurrent()) {
- hasCachedNextOverlappedPage = true;
- return true;
- }
- }
- }
-
- return false;
- }
-
- private boolean firstPageOverlapped() throws IOException {
- if (firstPageReader == null) {
- return false;
- }
-
- long endpointTime = orderUtils.getOverlapCheckTime(firstPageReader.getStatistics());
- unpackAllOverlappedTsFilesToTimeSeriesMetadata(endpointTime);
- unpackAllOverlappedTimeSeriesMetadataToCachedChunkMetadata(endpointTime, false);
- unpackAllOverlappedChunkMetadataToPageReaders(endpointTime, false);
-
- return (!seqPageReaders.isEmpty()
- && orderUtils.isOverlapped(
- firstPageReader.getStatistics(), seqPageReaders.get(0).getStatistics()))
- || (!unSeqPageReaders.isEmpty()
- && orderUtils.isOverlapped(
- firstPageReader.getStatistics(), unSeqPageReaders.peek().getStatistics())
- || (mergeReader.hasNextTimeValuePair()
- && orderUtils.isOverlapped(
- mergeReader.currentTimeValuePair().getTimestamp(),
- firstPageReader.getStatistics())));
- }
-
- private void unpackAllOverlappedChunkMetadataToPageReaders(long endpointTime, boolean init)
- throws IOException {
- if (firstChunkMetadata != null
- && orderUtils.isOverlapped(endpointTime, firstChunkMetadata.getStatistics())) {
- unpackOneChunkMetaData(firstChunkMetadata);
- firstChunkMetadata = null;
- }
- // In case unpacking too many sequence chunks
- boolean hasMeetSeq = false;
- while (!cachedChunkMetadata.isEmpty()
- && orderUtils.isOverlapped(endpointTime, cachedChunkMetadata.peek().getStatistics())) {
- if (cachedChunkMetadata.peek().isSeq() && hasMeetSeq) {
- break;
- } else if (cachedChunkMetadata.peek().isSeq()) {
- hasMeetSeq = true;
- }
- unpackOneChunkMetaData(cachedChunkMetadata.poll());
- }
- if (init
- && firstPageReader == null
- && (!seqPageReaders.isEmpty() || !unSeqPageReaders.isEmpty())) {
- initFirstPageReader();
- }
- }
-
- private void unpackOneChunkMetaData(IChunkMetadata chunkMetaData) throws IOException {
- List<IPageReader> pageReaderList =
- FileLoaderUtils.loadPageReaderList(chunkMetaData, timeFilter);
-
- if (chunkMetaData.isSeq()) {
- if (orderUtils.getAscending()) {
- for (IPageReader iPageReader : pageReaderList) {
- seqPageReaders.add(
- new VersionPageReader(
- chunkMetaData.getVersion(),
- chunkMetaData.getOffsetOfChunkHeader(),
- iPageReader,
- true));
- }
- } else {
- for (int i = pageReaderList.size() - 1; i >= 0; i--) {
- seqPageReaders.add(
- new VersionPageReader(
- chunkMetaData.getVersion(),
- chunkMetaData.getOffsetOfChunkHeader(),
- pageReaderList.get(i),
- true));
- }
- }
- } else {
- pageReaderList.forEach(
- pageReader ->
- unSeqPageReaders.add(
- new VersionPageReader(
- chunkMetaData.getVersion(),
- chunkMetaData.getOffsetOfChunkHeader(),
- pageReader,
- false)));
- }
- }
-
- /**
- * This method should be called after calling hasNextPage.
- *
- * <p>hasNextPage may cache firstPageReader if it is not overlapped or cached a BatchData if the
- * first page is overlapped
- */
- boolean isPageOverlapped() throws IOException {
-
- /*
- * has an overlapped page
- */
- if (hasCachedNextOverlappedPage) {
- return true;
- }
-
- /*
- * has a non-overlapped page in firstPageReader
- */
- if (mergeReader.hasNextTimeValuePair()
- && ((orderUtils.getAscending()
- && mergeReader.currentTimeValuePair().getTimestamp()
- <= firstPageReader.getStatistics().getEndTime())
- || (!orderUtils.getAscending()
- && mergeReader.currentTimeValuePair().getTimestamp()
- >= firstPageReader.getStatistics().getStartTime()))) {
- throw new IOException("overlapped data should be consumed first");
- }
-
- Statistics firstPageStatistics = firstPageReader.getStatistics();
-
- return !unSeqPageReaders.isEmpty()
- && orderUtils.isOverlapped(firstPageStatistics, unSeqPageReaders.peek().getStatistics());
- }
-
- Statistics currentPageStatistics() {
- if (firstPageReader == null) {
- return null;
- }
- return firstPageReader.getStatistics();
- }
-
- Statistics currentPageStatistics(int index) throws IOException {
- if (firstPageReader == null) {
- return null;
- }
- if (!(firstPageReader.isAlignedPageReader())) {
- throw new IOException("Can only get statistics by index from AlignedPageReader");
- }
- return firstPageReader.getStatistics(index);
- }
-
- Statistics currentPageTimeStatistics() throws IOException {
- if (firstPageReader == null) {
- return null;
- }
- if (!(firstPageReader.isAlignedPageReader())) {
- throw new IOException("Can only get statistics of time column from AlignedPageReader");
- }
- return firstPageReader.getTimeStatistics();
- }
-
- boolean currentPageModified() throws IOException {
- if (firstPageReader == null) {
- throw new IOException("no first page");
- }
- return firstPageReader.isModified();
- }
-
- void skipCurrentPage() {
- firstPageReader = null;
- }
-
- /** This method should only be used when the method isPageOverlapped() return true. */
- BatchData nextPage() throws IOException {
-
- if (hasCachedNextOverlappedPage) {
- hasCachedNextOverlappedPage = false;
- return cachedBatchData;
- } else {
-
- /*
- * next page is not overlapped, push down value filter if it exists
- */
- if (valueFilter != null) {
- firstPageReader.setFilter(valueFilter);
- }
- BatchData batchData = firstPageReader.getAllSatisfiedPageData(orderUtils.getAscending());
- firstPageReader = null;
-
- return batchData;
- }
- }
-
- /**
- * read overlapped data till currentLargestEndTime in mergeReader, if current batch does not
- * contain data, read till next currentLargestEndTime again
- */
- @SuppressWarnings("squid:S3776") // Suppress high Cognitive Complexity warning
- private boolean hasNextOverlappedPage() throws IOException {
-
- if (hasCachedNextOverlappedPage) {
- return true;
- }
-
- tryToPutAllDirectlyOverlappedUnseqPageReadersIntoMergeReader();
-
- while (true) {
-
- // may has overlapped data
- if (mergeReader.hasNextTimeValuePair()) {
-
- cachedBatchData =
- BatchDataFactory.createBatchData(dataType, orderUtils.getAscending(), true);
- long currentPageEndPointTime = mergeReader.getCurrentReadStopTime();
- while (mergeReader.hasNextTimeValuePair()) {
-
- /*
- * get current first point in mergeReader, this maybe overlapped later
- */
- TimeValuePair timeValuePair = mergeReader.currentTimeValuePair();
-
- if (orderUtils.isExcessEndpoint(timeValuePair.getTimestamp(), currentPageEndPointTime)) {
- /*
- * when the merged point excesses the currentPageEndPointTime, we have read all overlapped data before currentPageEndPointTime
- * 1. has cached batch data, we don't need to read more data, just use the cached data later
- * 2. has first page reader, which means first page reader last endTime < currentTimeValuePair.getTimestamp(),
- * we could just use the first page reader later
- * 3. sequence page reader is not empty, which means first page reader last endTime < currentTimeValuePair.getTimestamp(),
- * we could use the first sequence page reader later
- */
- if (cachedBatchData.hasCurrent()
- || firstPageReader != null
- || !seqPageReaders.isEmpty()) {
- break;
- }
- // so, we don't have other data except mergeReader
- currentPageEndPointTime = mergeReader.getCurrentReadStopTime();
- }
-
- // unpack all overlapped data for the first timeValuePair
- unpackAllOverlappedTsFilesToTimeSeriesMetadata(timeValuePair.getTimestamp());
- unpackAllOverlappedTimeSeriesMetadataToCachedChunkMetadata(
- timeValuePair.getTimestamp(), false);
- unpackAllOverlappedChunkMetadataToPageReaders(timeValuePair.getTimestamp(), false);
- unpackAllOverlappedUnseqPageReadersToMergeReader(timeValuePair.getTimestamp());
-
- // update if there are unpacked unSeqPageReaders
- timeValuePair = mergeReader.currentTimeValuePair();
-
- // from now, the unsequence reader is all unpacked, so we don't need to consider it
- // we has first page reader now
- if (firstPageReader != null) {
- // if current timeValuePair excesses the first page reader's end time, we just use the
- // cached data
- if ((orderUtils.getAscending()
- && timeValuePair.getTimestamp() > firstPageReader.getStatistics().getEndTime())
- || (!orderUtils.getAscending()
- && timeValuePair.getTimestamp()
- < firstPageReader.getStatistics().getStartTime())) {
- cachedBatchData.flip();
- hasCachedNextOverlappedPage = cachedBatchData.hasCurrent();
- return hasCachedNextOverlappedPage;
- } else if (orderUtils.isOverlapped(
- timeValuePair.getTimestamp(), firstPageReader.getStatistics())) {
- // current timeValuePair is overlapped with firstPageReader, add it to merged reader
- // and update endTime to the max end time
- mergeReader.addReader(
- firstPageReader
- .getAllSatisfiedPageData(orderUtils.getAscending())
- .getBatchDataIterator(),
- firstPageReader.version,
- orderUtils.getOverlapCheckTime(firstPageReader.getStatistics()),
- context);
- currentPageEndPointTime =
- updateEndPointTime(currentPageEndPointTime, firstPageReader);
- firstPageReader = null;
- }
- }
-
- // the seq page readers is not empty, just like first page reader
- if (!seqPageReaders.isEmpty()) {
- if ((orderUtils.getAscending()
- && timeValuePair.getTimestamp()
- > seqPageReaders.get(0).getStatistics().getEndTime())
- || (!orderUtils.getAscending()
- && timeValuePair.getTimestamp()
- < seqPageReaders.get(0).getStatistics().getStartTime())) {
- cachedBatchData.flip();
- hasCachedNextOverlappedPage = cachedBatchData.hasCurrent();
- return hasCachedNextOverlappedPage;
- } else if (orderUtils.isOverlapped(
- timeValuePair.getTimestamp(), seqPageReaders.get(0).getStatistics())) {
- VersionPageReader pageReader = seqPageReaders.remove(0);
- mergeReader.addReader(
- pageReader
- .getAllSatisfiedPageData(orderUtils.getAscending())
- .getBatchDataIterator(),
- pageReader.version,
- orderUtils.getOverlapCheckTime(pageReader.getStatistics()),
- context);
- currentPageEndPointTime = updateEndPointTime(currentPageEndPointTime, pageReader);
- }
- }
-
- /*
- * get the latest first point in mergeReader
- */
- timeValuePair = mergeReader.nextTimeValuePair();
-
- Object valueForFilter = timeValuePair.getValue().getValue();
-
- // TODO fix value filter firstNotNullObject, currently, if it's a value filter, it will
- // only accept AlignedPath with only one sub sensor
- if (timeValuePair.getValue().getDataType() == TSDataType.VECTOR) {
- for (TsPrimitiveType tsPrimitiveType : timeValuePair.getValue().getVector()) {
- if (tsPrimitiveType != null) {
- valueForFilter = tsPrimitiveType.getValue();
- break;
- }
- }
- }
-
- if (valueFilter == null
- || valueFilter.satisfy(timeValuePair.getTimestamp(), valueForFilter)) {
- cachedBatchData.putAnObject(
- timeValuePair.getTimestamp(), timeValuePair.getValue().getValue());
- }
- }
- cachedBatchData.flip();
- hasCachedNextOverlappedPage = cachedBatchData.hasCurrent();
- /*
- * if current overlapped page has valid data, return, otherwise read next overlapped page
- */
- if (hasCachedNextOverlappedPage) {
- return true;
- } else if (mergeReader.hasNextTimeValuePair()) {
- // condition: seqPage.endTime < mergeReader.currentTime
- return false;
- }
- } else {
- return false;
- }
- }
- }
-
- private long updateEndPointTime(long currentPageEndPointTime, VersionPageReader pageReader) {
- if (orderUtils.getAscending()) {
- return Math.min(currentPageEndPointTime, pageReader.getStatistics().getEndTime());
- } else {
- return Math.max(currentPageEndPointTime, pageReader.getStatistics().getStartTime());
- }
- }
-
- private void tryToPutAllDirectlyOverlappedUnseqPageReadersIntoMergeReader() throws IOException {
-
- /*
- * no cached page readers
- */
- if (firstPageReader == null && unSeqPageReaders.isEmpty() && seqPageReaders.isEmpty()) {
- return;
- }
-
- /*
- * init firstPageReader
- */
- if (firstPageReader == null) {
- initFirstPageReader();
- }
-
- long currentPageEndpointTime;
- if (mergeReader.hasNextTimeValuePair()) {
- currentPageEndpointTime = mergeReader.getCurrentReadStopTime();
- } else {
- currentPageEndpointTime = orderUtils.getOverlapCheckTime(firstPageReader.getStatistics());
- }
-
- /*
- * put all currently directly overlapped unseq page reader to merge reader
- */
- unpackAllOverlappedUnseqPageReadersToMergeReader(currentPageEndpointTime);
- }
-
- private void initFirstPageReader() throws IOException {
- while (this.firstPageReader == null) {
- VersionPageReader firstPageReader = getFirstPageReaderFromCachedReaders();
-
- // unpack overlapped page using current page reader
- if (firstPageReader != null) {
- long overlapCheckTime = orderUtils.getOverlapCheckTime(firstPageReader.getStatistics());
- unpackAllOverlappedTsFilesToTimeSeriesMetadata(overlapCheckTime);
- unpackAllOverlappedTimeSeriesMetadataToCachedChunkMetadata(overlapCheckTime, false);
- unpackAllOverlappedChunkMetadataToPageReaders(overlapCheckTime, false);
-
- // this page after unpacking must be the first page
- if (firstPageReader.equals(getFirstPageReaderFromCachedReaders())) {
- this.firstPageReader = firstPageReader;
- if (!seqPageReaders.isEmpty() && firstPageReader.equals(seqPageReaders.get(0))) {
- seqPageReaders.remove(0);
- break;
- } else if (!unSeqPageReaders.isEmpty()
- && firstPageReader.equals(unSeqPageReaders.peek())) {
- unSeqPageReaders.poll();
- break;
- }
- }
- } else {
- return;
- }
- }
- }
-
- // We use get() and peek() here in case it's not the first page reader before unpacking
- private VersionPageReader getFirstPageReaderFromCachedReaders() {
- VersionPageReader firstPageReader = null;
- if (!seqPageReaders.isEmpty() && !unSeqPageReaders.isEmpty()) {
- if (orderUtils.isTakeSeqAsFirst(
- seqPageReaders.get(0).getStatistics(), unSeqPageReaders.peek().getStatistics())) {
- firstPageReader = seqPageReaders.get(0);
- } else {
- firstPageReader = unSeqPageReaders.peek();
- }
- } else if (!seqPageReaders.isEmpty()) {
- firstPageReader = seqPageReaders.get(0);
- } else if (!unSeqPageReaders.isEmpty()) {
- firstPageReader = unSeqPageReaders.peek();
- }
- return firstPageReader;
- }
-
- private void unpackAllOverlappedUnseqPageReadersToMergeReader(long endpointTime)
- throws IOException {
- while (!unSeqPageReaders.isEmpty()
- && orderUtils.isOverlapped(endpointTime, unSeqPageReaders.peek().data.getStatistics())) {
- putPageReaderToMergeReader(unSeqPageReaders.poll());
- }
- if (firstPageReader != null
- && !firstPageReader.isSeq()
- && orderUtils.isOverlapped(endpointTime, firstPageReader.getStatistics())) {
- putPageReaderToMergeReader(firstPageReader);
- firstPageReader = null;
- }
- }
-
- private void putPageReaderToMergeReader(VersionPageReader pageReader) throws IOException {
- mergeReader.addReader(
- pageReader.getAllSatisfiedPageData(orderUtils.getAscending()).getBatchDataIterator(),
- pageReader.version,
- orderUtils.getOverlapCheckTime(pageReader.getStatistics()),
- context);
- }
-
- private BatchData nextOverlappedPage() throws IOException {
- if (hasCachedNextOverlappedPage || hasNextOverlappedPage()) {
- hasCachedNextOverlappedPage = false;
- return cachedBatchData;
- }
- throw new IOException("No more batch data");
- }
-
- private LinkedList<TsFileResource> sortUnSeqFileResources(List<TsFileResource> tsFileResources) {
- return tsFileResources.stream()
- .sorted(orderUtils.comparingLong(tsFileResource -> orderUtils.getOrderTime(tsFileResource)))
- .collect(Collectors.toCollection(LinkedList::new));
- }
-
- /**
- * unpack all overlapped seq/unseq files and find the first TimeSeriesMetadata
- *
- * <p>Because there may be too many files in the scenario used by the user, we cannot open all the
- * chunks at once, which may cause OOM, so we can only unpack one file at a time when needed. This
- * approach is likely to be ubiquitous, but it keeps the system running smoothly
- */
- @SuppressWarnings("squid:S3776") // Suppress high Cognitive Complexity warning
- protected void tryToUnpackAllOverlappedFilesToTimeSeriesMetadata() throws IOException {
- /*
- * Fill sequence TimeSeriesMetadata List until it is not empty
- */
- while (seqTimeSeriesMetadata.isEmpty() && orderUtils.hasNextSeqResource()) {
- unpackSeqTsFileResource();
- }
-
- /*
- * Fill unSequence TimeSeriesMetadata Priority Queue until it is not empty
- */
- while (unSeqTimeSeriesMetadata.isEmpty() && orderUtils.hasNextUnseqResource()) {
- unpackUnseqTsFileResource();
- }
-
- /*
- * find end time of the first TimeSeriesMetadata
- */
- long endTime = -1L;
- if (!seqTimeSeriesMetadata.isEmpty() && unSeqTimeSeriesMetadata.isEmpty()) {
- // only has seq
- endTime = orderUtils.getOverlapCheckTime(seqTimeSeriesMetadata.get(0).getStatistics());
- } else if (seqTimeSeriesMetadata.isEmpty() && !unSeqTimeSeriesMetadata.isEmpty()) {
- // only has unseq
- endTime = orderUtils.getOverlapCheckTime(unSeqTimeSeriesMetadata.peek().getStatistics());
- } else if (!seqTimeSeriesMetadata.isEmpty()) {
- // has seq and unseq
- endTime =
- orderUtils.getCurrentEndPoint(
- seqTimeSeriesMetadata.get(0).getStatistics(),
- unSeqTimeSeriesMetadata.peek().getStatistics());
- }
-
- /*
- * unpack all directly overlapped seq/unseq files with first TimeSeriesMetadata
- */
- if (endTime != -1) {
- unpackAllOverlappedTsFilesToTimeSeriesMetadata(endTime);
- }
-
- /*
- * update the first TimeSeriesMetadata
- */
- if (!seqTimeSeriesMetadata.isEmpty() && unSeqTimeSeriesMetadata.isEmpty()) {
- // only has seq
- firstTimeSeriesMetadata = seqTimeSeriesMetadata.remove(0);
- } else if (seqTimeSeriesMetadata.isEmpty() && !unSeqTimeSeriesMetadata.isEmpty()) {
- // only has unseq
- firstTimeSeriesMetadata = unSeqTimeSeriesMetadata.poll();
- } else if (!seqTimeSeriesMetadata.isEmpty()) {
- // has seq and unseq
- if (orderUtils.isTakeSeqAsFirst(
- seqTimeSeriesMetadata.get(0).getStatistics(),
- unSeqTimeSeriesMetadata.peek().getStatistics())) {
- firstTimeSeriesMetadata = seqTimeSeriesMetadata.remove(0);
- } else {
- firstTimeSeriesMetadata = unSeqTimeSeriesMetadata.poll();
- }
- }
- if (valueFilter != null
- && firstTimeSeriesMetadata != null
- && !isFileOverlapped()
- && !firstTimeSeriesMetadata.isModified()
- && !valueFilter.satisfy(firstTimeSeriesMetadata.getStatistics())) {
- firstTimeSeriesMetadata = null;
- }
- }
-
- protected void unpackAllOverlappedTsFilesToTimeSeriesMetadata(long endpointTime)
- throws IOException {
- while (orderUtils.hasNextUnseqResource()
- && orderUtils.isOverlapped(endpointTime, orderUtils.getNextUnseqFileResource(false))) {
- unpackUnseqTsFileResource();
- }
- while (orderUtils.hasNextSeqResource()
- && orderUtils.isOverlapped(endpointTime, orderUtils.getNextSeqFileResource(false))) {
- unpackSeqTsFileResource();
- }
- }
-
- protected void unpackSeqTsFileResource() throws IOException {
- ITimeSeriesMetadata timeseriesMetadata =
- loadTimeSeriesMetadata(
- orderUtils.getNextSeqFileResource(true),
- seriesPath,
- context,
- getAnyFilter(),
- allSensors);
- if (timeseriesMetadata != null) {
- timeseriesMetadata.setSeq(true);
- seqTimeSeriesMetadata.add(timeseriesMetadata);
- }
- }
-
- protected void unpackUnseqTsFileResource() throws IOException {
- ITimeSeriesMetadata timeseriesMetadata =
- loadTimeSeriesMetadata(
- orderUtils.getNextUnseqFileResource(true),
- seriesPath,
- context,
- getAnyFilter(),
- allSensors);
- if (timeseriesMetadata != null) {
- timeseriesMetadata.setModified(true);
- timeseriesMetadata.setSeq(false);
- unSeqTimeSeriesMetadata.add(timeseriesMetadata);
- }
- }
-
- protected ITimeSeriesMetadata loadTimeSeriesMetadata(
- TsFileResource resource,
- PartialPath seriesPath,
- QueryContext context,
- Filter filter,
- Set<String> allSensors)
- throws IOException {
- return FileLoaderUtils.loadTimeSeriesMetadata(
- resource, seriesPath, context, filter, allSensors);
- }
-
- protected Filter getAnyFilter() {
- return timeFilter != null ? timeFilter : valueFilter;
- }
-
- void setTimeFilter(long timestamp) {
- ((UnaryFilter) timeFilter).setValue(timestamp);
- }
-
- Filter getTimeFilter() {
- return timeFilter;
- }
-
- private class VersionPageReader {
-
- protected PriorityMergeReader.MergeReaderPriority version;
- protected IPageReader data;
-
- protected boolean isSeq;
-
- VersionPageReader(long version, long offset, IPageReader data, boolean isSeq) {
- this.version = new MergeReaderPriority(version, offset);
- this.data = data;
- this.isSeq = isSeq;
- }
-
- public boolean isAlignedPageReader() {
- return data instanceof IAlignedPageReader;
- }
-
- Statistics getStatistics() {
- return data.getStatistics();
- }
-
- Statistics getStatistics(int index) throws IOException {
- if (!(data instanceof IAlignedPageReader)) {
- throw new IOException("Can only get statistics by index from AlignedPageReader");
- }
- return ((IAlignedPageReader) data).getStatistics(index);
- }
-
- Statistics getTimeStatistics() throws IOException {
- if (!(data instanceof IAlignedPageReader)) {
- throw new IOException("Can only get statistics of time column from AlignedPageReader");
- }
- return ((IAlignedPageReader) data).getTimeStatistics();
- }
-
- BatchData getAllSatisfiedPageData(boolean ascending) throws IOException {
- return data.getAllSatisfiedPageData(ascending);
- }
-
- void setFilter(Filter filter) {
- data.setFilter(filter);
- }
-
- boolean isModified() {
- return data.isModified();
- }
-
- public boolean isSeq() {
- return isSeq;
- }
- }
-
- public interface TimeOrderUtils {
-
- long getOrderTime(Statistics<? extends Object> statistics);
-
- long getOrderTime(TsFileResource fileResource);
-
- long getOverlapCheckTime(Statistics<? extends Object> range);
-
- boolean isOverlapped(Statistics<? extends Object> left, Statistics<? extends Object> right);
-
- boolean isOverlapped(long time, Statistics<? extends Object> right);
-
- boolean isOverlapped(long time, TsFileResource right);
-
- <T> Comparator<T> comparingLong(ToLongFunction<? super T> keyExtractor);
-
- long getCurrentEndPoint(long time, Statistics<? extends Object> statistics);
-
- long getCurrentEndPoint(
- Statistics<? extends Object> seqStatistics, Statistics<? extends Object> unseqStatistics);
-
- boolean isExcessEndpoint(long time, long endpointTime);
-
- /** Return true if taking first page reader from seq readers */
- boolean isTakeSeqAsFirst(
- Statistics<? extends Object> seqStatistics, Statistics<? extends Object> unseqStatistics);
-
- boolean getAscending();
-
- boolean hasNextSeqResource();
-
- boolean hasNextUnseqResource();
-
- TsFileResource getNextSeqFileResource(boolean isDelete);
-
- TsFileResource getNextUnseqFileResource(boolean isDelete);
- }
-
- class DescTimeOrderUtils implements TimeOrderUtils {
-
- @Override
- public long getOrderTime(Statistics statistics) {
- return statistics.getEndTime();
- }
-
- @Override
- public long getOrderTime(TsFileResource fileResource) {
- return fileResource.getEndTime(seriesPath.getDevice());
- }
-
- @Override
- public long getOverlapCheckTime(Statistics range) {
- return range.getStartTime();
- }
-
- @Override
- public boolean isOverlapped(Statistics left, Statistics right) {
- return left.getStartTime() <= right.getEndTime();
- }
-
- @Override
- public boolean isOverlapped(long time, Statistics right) {
- return time <= right.getEndTime();
- }
-
- @Override
- public boolean isOverlapped(long time, TsFileResource right) {
- return time <= right.getEndTime(seriesPath.getDevice());
- }
-
- @Override
- public <T> Comparator<T> comparingLong(ToLongFunction<? super T> keyExtractor) {
- Objects.requireNonNull(keyExtractor);
- return (Comparator<T> & Serializable)
- (c1, c2) -> Long.compare(keyExtractor.applyAsLong(c2), keyExtractor.applyAsLong(c1));
- }
-
- @Override
- public long getCurrentEndPoint(long time, Statistics<? extends Object> statistics) {
- return Math.max(time, statistics.getStartTime());
- }
-
- @Override
- public long getCurrentEndPoint(
- Statistics<? extends Object> seqStatistics, Statistics<? extends Object> unseqStatistics) {
- return Math.max(seqStatistics.getStartTime(), unseqStatistics.getStartTime());
- }
-
- @Override
- public boolean isExcessEndpoint(long time, long endpointTime) {
- return time < endpointTime;
- }
-
- @Override
- public boolean isTakeSeqAsFirst(
- Statistics<? extends Object> seqStatistics, Statistics<? extends Object> unseqStatistics) {
- return seqStatistics.getEndTime() > unseqStatistics.getEndTime();
- }
-
- @Override
- public boolean getAscending() {
- return false;
- }
-
- @Override
- public boolean hasNextSeqResource() {
- while (dataSource.hasNextSeqResource(curSeqFileIndex, getAscending())) {
- TsFileResource tsFileResource = dataSource.getSeqResourceByIndex(curSeqFileIndex);
- if (tsFileResource != null
- && tsFileResource.isSatisfied(
- seriesPath.getDevice(), timeFilter, true, context.isDebug())) {
- break;
- }
- curSeqFileIndex--;
- }
- return dataSource.hasNextSeqResource(curSeqFileIndex, getAscending());
- }
-
- @Override
- public boolean hasNextUnseqResource() {
- while (dataSource.hasNextUnseqResource(curUnseqFileIndex)) {
- TsFileResource tsFileResource = dataSource.getUnseqResourceByIndex(curUnseqFileIndex);
- if (tsFileResource != null
- && tsFileResource.isSatisfied(
- seriesPath.getDevice(), timeFilter, false, context.isDebug())) {
- break;
- }
- curUnseqFileIndex++;
- }
- return dataSource.hasNextUnseqResource(curUnseqFileIndex);
- }
-
- @Override
- public TsFileResource getNextSeqFileResource(boolean isDelete) {
- TsFileResource tsFileResource = dataSource.getSeqResourceByIndex(curSeqFileIndex);
- if (isDelete) {
- curSeqFileIndex--;
- }
- return tsFileResource;
- }
-
- @Override
- public TsFileResource getNextUnseqFileResource(boolean isDelete) {
- TsFileResource tsFileResource = dataSource.getUnseqResourceByIndex(curUnseqFileIndex);
- if (isDelete) {
- curUnseqFileIndex++;
- }
- return tsFileResource;
- }
- }
-
- class AscTimeOrderUtils implements TimeOrderUtils {
-
- @Override
- public long getOrderTime(Statistics statistics) {
- return statistics.getStartTime();
- }
-
- @Override
- public long getOrderTime(TsFileResource fileResource) {
- return fileResource.getStartTime(seriesPath.getDevice());
- }
-
- @Override
- public long getOverlapCheckTime(Statistics range) {
- return range.getEndTime();
- }
-
- @Override
- public boolean isOverlapped(Statistics left, Statistics right) {
- return left.getEndTime() >= right.getStartTime();
- }
-
- @Override
- public boolean isOverlapped(long time, Statistics right) {
- return time >= right.getStartTime();
- }
-
- @Override
- public boolean isOverlapped(long time, TsFileResource right) {
- return time >= right.getStartTime(seriesPath.getDevice());
- }
-
- @Override
- public <T> Comparator<T> comparingLong(ToLongFunction<? super T> keyExtractor) {
- Objects.requireNonNull(keyExtractor);
- return (Comparator<T> & Serializable)
- (c1, c2) -> Long.compare(keyExtractor.applyAsLong(c1), keyExtractor.applyAsLong(c2));
- }
-
- @Override
- public long getCurrentEndPoint(long time, Statistics<? extends Object> statistics) {
- return Math.min(time, statistics.getEndTime());
- }
-
- @Override
- public long getCurrentEndPoint(
- Statistics<? extends Object> seqStatistics, Statistics<? extends Object> unseqStatistics) {
- return Math.min(seqStatistics.getEndTime(), unseqStatistics.getEndTime());
- }
-
- @Override
- public boolean isExcessEndpoint(long time, long endpointTime) {
- return time > endpointTime;
- }
-
- @Override
- public boolean isTakeSeqAsFirst(
- Statistics<? extends Object> seqStatistics, Statistics<? extends Object> unseqStatistics) {
- return seqStatistics.getStartTime() < unseqStatistics.getStartTime();
- }
-
- @Override
- public boolean getAscending() {
- return true;
- }
-
- @Override
- public boolean hasNextSeqResource() {
- while (dataSource.hasNextSeqResource(curSeqFileIndex, getAscending())) {
- TsFileResource tsFileResource = dataSource.getSeqResourceByIndex(curSeqFileIndex);
- if (tsFileResource != null
- && tsFileResource.isSatisfied(
- seriesPath.getDevice(), timeFilter, true, context.isDebug())) {
- break;
- }
- curSeqFileIndex++;
- }
- return dataSource.hasNextSeqResource(curSeqFileIndex, getAscending());
- }
-
- @Override
- public boolean hasNextUnseqResource() {
- while (dataSource.hasNextUnseqResource(curUnseqFileIndex)) {
- TsFileResource tsFileResource = dataSource.getUnseqResourceByIndex(curUnseqFileIndex);
- if (tsFileResource != null
- && tsFileResource.isSatisfied(
- seriesPath.getDevice(), timeFilter, false, context.isDebug())) {
- break;
- }
- curUnseqFileIndex++;
- }
- return dataSource.hasNextUnseqResource(curUnseqFileIndex);
- }
-
- @Override
- public TsFileResource getNextSeqFileResource(boolean isDelete) {
- TsFileResource tsFileResource = dataSource.getSeqResourceByIndex(curSeqFileIndex);
- if (isDelete) {
- curSeqFileIndex++;
- }
- return tsFileResource;
- }
-
- @Override
- public TsFileResource getNextUnseqFileResource(boolean isDelete) {
- TsFileResource tsFileResource = dataSource.getUnseqResourceByIndex(curUnseqFileIndex);
- if (isDelete) {
- curUnseqFileIndex++;
- }
- return tsFileResource;
- }
- }
-
- public TimeOrderUtils getOrderUtils() {
- return orderUtils;
- }
-
- @TestOnly
- public Filter getValueFilter() {
- return valueFilter;
- }
-}
diff --git a/server/src/test/java/org/apache/iotdb/db/engine/compaction/ReadPointCompactionPerformerTest.java b/server/src/test/java/org/apache/iotdb/db/engine/compaction/ReadPointCompactionPerformerTest.java
index 591831c07e..7eebfd3511 100644
--- a/server/src/test/java/org/apache/iotdb/db/engine/compaction/ReadPointCompactionPerformerTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/engine/compaction/ReadPointCompactionPerformerTest.java
@@ -1586,8 +1586,7 @@ public class ReadPointCompactionPerformerTest extends AbstractCompactionTest {
new SeriesDataBlockReader(
path,
TSDataType.VECTOR,
- FragmentInstanceContext.createFragmentInstanceContextForCompaction(
- EnvironmentUtils.TEST_QUERY_CONTEXT.getQueryId()),
+ EnvironmentUtils.TEST_QUERY_FI_CONTEXT,
seqResources,
unseqResources,
true);
diff --git a/server/src/test/java/org/apache/iotdb/db/engine/compaction/cross/RewriteCrossSpaceCompactionWithFastPerformerTest.java b/server/src/test/java/org/apache/iotdb/db/engine/compaction/cross/RewriteCrossSpaceCompactionWithFastPerformerTest.java
index 9ce94bac17..f94c09ed70 100644
--- a/server/src/test/java/org/apache/iotdb/db/engine/compaction/cross/RewriteCrossSpaceCompactionWithFastPerformerTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/engine/compaction/cross/RewriteCrossSpaceCompactionWithFastPerformerTest.java
@@ -35,19 +35,15 @@ import org.apache.iotdb.db.engine.storagegroup.TsFileNameGenerator;
import org.apache.iotdb.db.engine.storagegroup.TsFileResource;
import org.apache.iotdb.db.exception.StorageEngineException;
import org.apache.iotdb.db.query.control.FileReaderManager;
-import org.apache.iotdb.db.query.reader.series.SeriesRawDataBatchReader;
import org.apache.iotdb.db.utils.EnvironmentUtils;
import org.apache.iotdb.db.wal.recover.WALRecoverManager;
import org.apache.iotdb.tsfile.common.conf.TSFileDescriptor;
import org.apache.iotdb.tsfile.common.constant.TsFileConstant;
import org.apache.iotdb.tsfile.exception.write.WriteProcessException;
import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
-import org.apache.iotdb.tsfile.read.common.BatchData;
import org.apache.iotdb.tsfile.read.common.block.TsBlock;
-import org.apache.iotdb.tsfile.read.reader.IBatchReader;
import org.apache.iotdb.tsfile.utils.Pair;
import org.apache.iotdb.tsfile.utils.TsFileGeneratorUtils;
-import org.apache.iotdb.tsfile.utils.TsPrimitiveType;
import org.apache.iotdb.tsfile.write.schema.IMeasurementSchema;
import org.apache.iotdb.tsfile.write.schema.MeasurementSchema;
@@ -263,40 +259,35 @@ public class RewriteCrossSpaceCompactionWithFastPerformerTest extends AbstractCo
COMPACTION_TEST_SG + PATH_SEPARATOR + "d" + i,
Collections.singletonList("s" + j),
schemas);
- IBatchReader tsFilesReader =
- new SeriesRawDataBatchReader(
+ IDataBlockReader tsFilesReader =
+ new SeriesDataBlockReader(
path,
TSDataType.VECTOR,
- EnvironmentUtils.TEST_QUERY_CONTEXT,
+ EnvironmentUtils.TEST_QUERY_FI_CONTEXT,
tsFileManager.getTsFileList(true),
new ArrayList<>(),
- null,
- null,
true);
int count = 0;
while (tsFilesReader.hasNextBatch()) {
- BatchData batchData = tsFilesReader.nextBatch();
- while (batchData.hasCurrent()) {
+ TsBlock batchData = tsFilesReader.nextBatch();
+ for (int readIndex = 0, size = batchData.getPositionCount();
+ readIndex < size;
+ readIndex++) {
+ long currentTime = batchData.getTimeByIndex(readIndex);
+ long currentValue = batchData.getColumn(0).getLong(readIndex);
if (i == TsFileGeneratorUtils.getAlignDeviceOffset()
- && ((450 <= batchData.currentTime() && batchData.currentTime() < 550)
- || (550 <= batchData.currentTime() && batchData.currentTime() < 650))) {
- assertEquals(
- batchData.currentTime() + 20000,
- ((TsPrimitiveType[]) (batchData.currentValue()))[0].getValue());
+ && ((450 <= currentTime && currentTime < 550)
+ || (550 <= currentTime && currentTime < 650))) {
+ assertEquals(currentTime + 20000, currentValue);
} else if ((i < TsFileGeneratorUtils.getAlignDeviceOffset() + 3 && j < 4)
- && ((20 <= batchData.currentTime() && batchData.currentTime() < 220)
- || (250 <= batchData.currentTime() && batchData.currentTime() < 450)
- || (480 <= batchData.currentTime() && batchData.currentTime() < 680))) {
- assertEquals(
- batchData.currentTime() + 10000,
- ((TsPrimitiveType[]) (batchData.currentValue()))[0].getValue());
+ && ((20 <= currentTime && currentTime < 220)
+ || (250 <= currentTime && currentTime < 450)
+ || (480 <= currentTime && currentTime < 680))) {
+ assertEquals(currentTime + 10000, currentValue);
} else {
- assertEquals(
- batchData.currentTime(),
- ((TsPrimitiveType[]) (batchData.currentValue()))[0].getValue());
+ assertEquals(currentTime, currentValue);
}
count++;
- batchData.next();
}
}
tsFilesReader.close();
@@ -395,40 +386,35 @@ public class RewriteCrossSpaceCompactionWithFastPerformerTest extends AbstractCo
COMPACTION_TEST_SG + PATH_SEPARATOR + "d" + i,
Collections.singletonList("s" + j),
schemas);
- IBatchReader tsFilesReader =
- new SeriesRawDataBatchReader(
+ IDataBlockReader tsFilesReader =
+ new SeriesDataBlockReader(
path,
TSDataType.VECTOR,
- EnvironmentUtils.TEST_QUERY_CONTEXT,
+ EnvironmentUtils.TEST_QUERY_FI_CONTEXT,
seqResources,
unseqResources,
- null,
- null,
true);
int count = 0;
while (tsFilesReader.hasNextBatch()) {
- BatchData batchData = tsFilesReader.nextBatch();
- while (batchData.hasCurrent()) {
+ TsBlock batchData = tsFilesReader.nextBatch();
+ for (int readIndex = 0, size = batchData.getPositionCount();
+ readIndex < size;
+ readIndex++) {
+ long currentTime = batchData.getTimeByIndex(readIndex);
+ long currentValue = batchData.getColumn(0).getLong(readIndex);
if (i == TsFileGeneratorUtils.getAlignDeviceOffset()
- && ((450 <= batchData.currentTime() && batchData.currentTime() < 550)
- || (550 <= batchData.currentTime() && batchData.currentTime() < 650))) {
- assertEquals(
- batchData.currentTime() + 20000,
- ((TsPrimitiveType[]) (batchData.currentValue()))[0].getValue());
+ && ((450 <= currentTime && currentTime < 550)
+ || (550 <= currentTime && currentTime < 650))) {
+ assertEquals(currentTime + 20000, currentValue);
} else if ((i < TsFileGeneratorUtils.getAlignDeviceOffset() + 3 && j < 4)
- && ((20 <= batchData.currentTime() && batchData.currentTime() < 220)
- || (250 <= batchData.currentTime() && batchData.currentTime() < 450)
- || (480 <= batchData.currentTime() && batchData.currentTime() < 680))) {
- assertEquals(
- batchData.currentTime() + 10000,
- ((TsPrimitiveType[]) (batchData.currentValue()))[0].getValue());
+ && ((20 <= currentTime && currentTime < 220)
+ || (250 <= currentTime && currentTime < 450)
+ || (480 <= currentTime && currentTime < 680))) {
+ assertEquals(currentTime + 10000, currentValue);
} else {
- assertEquals(
- batchData.currentTime(),
- ((TsPrimitiveType[]) (batchData.currentValue()))[0].getValue());
+ assertEquals(currentTime, currentValue);
}
count++;
- batchData.next();
}
}
tsFilesReader.close();
@@ -502,40 +488,35 @@ public class RewriteCrossSpaceCompactionWithFastPerformerTest extends AbstractCo
COMPACTION_TEST_SG + PATH_SEPARATOR + "d" + i,
Collections.singletonList("s" + j),
schemas);
- IBatchReader tsFilesReader =
- new SeriesRawDataBatchReader(
+ IDataBlockReader tsFilesReader =
+ new SeriesDataBlockReader(
path,
TSDataType.VECTOR,
- EnvironmentUtils.TEST_QUERY_CONTEXT,
+ EnvironmentUtils.TEST_QUERY_FI_CONTEXT,
tsFileManager.getTsFileList(true),
new ArrayList<>(),
- null,
- null,
true);
int count = 0;
while (tsFilesReader.hasNextBatch()) {
- BatchData batchData = tsFilesReader.nextBatch();
- while (batchData.hasCurrent()) {
+ TsBlock batchData = tsFilesReader.nextBatch();
+ for (int readIndex = 0, size = batchData.getPositionCount();
+ readIndex < size;
+ readIndex++) {
+ long currentTime = batchData.getTimeByIndex(readIndex);
+ long currentValue = batchData.getColumn(0).getLong(readIndex);
if (i == TsFileGeneratorUtils.getAlignDeviceOffset()
- && ((450 <= batchData.currentTime() && batchData.currentTime() < 550)
- || (550 <= batchData.currentTime() && batchData.currentTime() < 650))) {
- assertEquals(
- batchData.currentTime() + 20000,
- ((TsPrimitiveType[]) (batchData.currentValue()))[0].getValue());
+ && ((450 <= currentTime && currentTime < 550)
+ || (550 <= currentTime && currentTime < 650))) {
+ assertEquals(currentTime + 20000, currentValue);
} else if ((i < TsFileGeneratorUtils.getAlignDeviceOffset() + 3 && j < 4)
- && ((20 <= batchData.currentTime() && batchData.currentTime() < 220)
- || (250 <= batchData.currentTime() && batchData.currentTime() < 450)
- || (480 <= batchData.currentTime() && batchData.currentTime() < 680))) {
- assertEquals(
- batchData.currentTime() + 10000,
- ((TsPrimitiveType[]) (batchData.currentValue()))[0].getValue());
+ && ((20 <= currentTime && currentTime < 220)
+ || (250 <= currentTime && currentTime < 450)
+ || (480 <= currentTime && currentTime < 680))) {
+ assertEquals(currentTime + 10000, currentValue);
} else {
- assertEquals(
- batchData.currentTime(),
- ((TsPrimitiveType[]) (batchData.currentValue()))[0].getValue());
+ assertEquals(currentTime, currentValue);
}
count++;
- batchData.next();
}
}
tsFilesReader.close();
diff --git a/server/src/test/java/org/apache/iotdb/db/engine/compaction/cross/RewriteCrossSpaceCompactionWithReadPointPerformerTest.java b/server/src/test/java/org/apache/iotdb/db/engine/compaction/cross/RewriteCrossSpaceCompactionWithReadPointPerformerTest.java
index 0693e6ebfa..db8f0f4a80 100644
--- a/server/src/test/java/org/apache/iotdb/db/engine/compaction/cross/RewriteCrossSpaceCompactionWithReadPointPerformerTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/engine/compaction/cross/RewriteCrossSpaceCompactionWithReadPointPerformerTest.java
@@ -25,6 +25,8 @@ import org.apache.iotdb.commons.path.PartialPath;
import org.apache.iotdb.db.conf.IoTDBDescriptor;
import org.apache.iotdb.db.engine.compaction.AbstractCompactionTest;
import org.apache.iotdb.db.engine.compaction.performer.impl.ReadPointCompactionPerformer;
+import org.apache.iotdb.db.engine.compaction.reader.IDataBlockReader;
+import org.apache.iotdb.db.engine.compaction.reader.SeriesDataBlockReader;
import org.apache.iotdb.db.engine.compaction.utils.CompactionFileGeneratorUtils;
import org.apache.iotdb.db.engine.flush.TsFileFlushPolicy;
import org.apache.iotdb.db.engine.storagegroup.DataRegion;
@@ -33,18 +35,15 @@ import org.apache.iotdb.db.engine.storagegroup.TsFileNameGenerator;
import org.apache.iotdb.db.engine.storagegroup.TsFileResource;
import org.apache.iotdb.db.exception.StorageEngineException;
import org.apache.iotdb.db.query.control.FileReaderManager;
-import org.apache.iotdb.db.query.reader.series.SeriesRawDataBatchReader;
import org.apache.iotdb.db.utils.EnvironmentUtils;
import org.apache.iotdb.db.wal.recover.WALRecoverManager;
import org.apache.iotdb.tsfile.common.conf.TSFileDescriptor;
import org.apache.iotdb.tsfile.common.constant.TsFileConstant;
import org.apache.iotdb.tsfile.exception.write.WriteProcessException;
import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
-import org.apache.iotdb.tsfile.read.common.BatchData;
-import org.apache.iotdb.tsfile.read.reader.IBatchReader;
+import org.apache.iotdb.tsfile.read.common.block.TsBlock;
import org.apache.iotdb.tsfile.utils.Pair;
import org.apache.iotdb.tsfile.utils.TsFileGeneratorUtils;
-import org.apache.iotdb.tsfile.utils.TsPrimitiveType;
import org.apache.iotdb.tsfile.write.schema.IMeasurementSchema;
import org.apache.iotdb.tsfile.write.schema.MeasurementSchema;
@@ -159,40 +158,35 @@ public class RewriteCrossSpaceCompactionWithReadPointPerformerTest extends Abstr
COMPACTION_TEST_SG + PATH_SEPARATOR + "d" + i,
Collections.singletonList("s" + j),
schemas);
- IBatchReader tsFilesReader =
- new SeriesRawDataBatchReader(
+ IDataBlockReader tsFilesReader =
+ new SeriesDataBlockReader(
path,
TSDataType.VECTOR,
- EnvironmentUtils.TEST_QUERY_CONTEXT,
+ EnvironmentUtils.TEST_QUERY_FI_CONTEXT,
seqResources,
unseqResources,
- null,
- null,
true);
int count = 0;
while (tsFilesReader.hasNextBatch()) {
- BatchData batchData = tsFilesReader.nextBatch();
- while (batchData.hasCurrent()) {
+ TsBlock batchData = tsFilesReader.nextBatch();
+ for (int readIndex = 0, size = batchData.getPositionCount();
+ readIndex < size;
+ readIndex++) {
+ long currentTime = batchData.getTimeByIndex(readIndex);
+ long currentValue = batchData.getColumn(0).getLong(readIndex);
if (i == TsFileGeneratorUtils.getAlignDeviceOffset()
- && ((450 <= batchData.currentTime() && batchData.currentTime() < 550)
- || (550 <= batchData.currentTime() && batchData.currentTime() < 650))) {
- assertEquals(
- batchData.currentTime() + 20000,
- ((TsPrimitiveType[]) (batchData.currentValue()))[0].getValue());
+ && ((450 <= currentTime && currentTime < 550)
+ || (550 <= currentTime && currentTime < 650))) {
+ assertEquals(currentTime + 20000, currentValue);
} else if ((i < TsFileGeneratorUtils.getAlignDeviceOffset() + 3 && j < 4)
- && ((20 <= batchData.currentTime() && batchData.currentTime() < 220)
- || (250 <= batchData.currentTime() && batchData.currentTime() < 450)
- || (480 <= batchData.currentTime() && batchData.currentTime() < 680))) {
- assertEquals(
- batchData.currentTime() + 10000,
- ((TsPrimitiveType[]) (batchData.currentValue()))[0].getValue());
+ && ((20 <= currentTime && currentTime < 220)
+ || (250 <= currentTime && currentTime < 450)
+ || (480 <= currentTime && currentTime < 680))) {
+ assertEquals(currentTime + 10000, currentValue);
} else {
- assertEquals(
- batchData.currentTime(),
- ((TsPrimitiveType[]) (batchData.currentValue()))[0].getValue());
+ assertEquals(currentTime, currentValue);
}
count++;
- batchData.next();
}
}
tsFilesReader.close();
@@ -265,40 +259,35 @@ public class RewriteCrossSpaceCompactionWithReadPointPerformerTest extends Abstr
COMPACTION_TEST_SG + PATH_SEPARATOR + "d" + i,
Collections.singletonList("s" + j),
schemas);
- IBatchReader tsFilesReader =
- new SeriesRawDataBatchReader(
+ IDataBlockReader tsFilesReader =
+ new SeriesDataBlockReader(
path,
TSDataType.VECTOR,
- EnvironmentUtils.TEST_QUERY_CONTEXT,
+ EnvironmentUtils.TEST_QUERY_FI_CONTEXT,
tsFileManager.getTsFileList(true),
new ArrayList<>(),
- null,
- null,
true);
int count = 0;
while (tsFilesReader.hasNextBatch()) {
- BatchData batchData = tsFilesReader.nextBatch();
- while (batchData.hasCurrent()) {
+ TsBlock batchData = tsFilesReader.nextBatch();
+ for (int readIndex = 0, size = batchData.getPositionCount();
+ readIndex < size;
+ readIndex++) {
+ long currentTime = batchData.getTimeByIndex(readIndex);
+ long currentValue = batchData.getColumn(0).getLong(readIndex);
if (i == TsFileGeneratorUtils.getAlignDeviceOffset()
- && ((450 <= batchData.currentTime() && batchData.currentTime() < 550)
- || (550 <= batchData.currentTime() && batchData.currentTime() < 650))) {
- assertEquals(
- batchData.currentTime() + 20000,
- ((TsPrimitiveType[]) (batchData.currentValue()))[0].getValue());
+ && ((450 <= currentTime && currentTime < 550)
+ || (550 <= currentTime && currentTime < 650))) {
+ assertEquals(currentTime + 20000, currentValue);
} else if ((i < TsFileGeneratorUtils.getAlignDeviceOffset() + 3 && j < 4)
- && ((20 <= batchData.currentTime() && batchData.currentTime() < 220)
- || (250 <= batchData.currentTime() && batchData.currentTime() < 450)
- || (480 <= batchData.currentTime() && batchData.currentTime() < 680))) {
- assertEquals(
- batchData.currentTime() + 10000,
- ((TsPrimitiveType[]) (batchData.currentValue()))[0].getValue());
+ && ((20 <= currentTime && currentTime < 220)
+ || (250 <= currentTime && currentTime < 450)
+ || (480 <= currentTime && currentTime < 680))) {
+ assertEquals(currentTime + 10000, currentValue);
} else {
- assertEquals(
- batchData.currentTime(),
- ((TsPrimitiveType[]) (batchData.currentValue()))[0].getValue());
+ assertEquals(currentTime, currentValue);
}
count++;
- batchData.next();
}
}
tsFilesReader.close();
@@ -397,40 +386,35 @@ public class RewriteCrossSpaceCompactionWithReadPointPerformerTest extends Abstr
COMPACTION_TEST_SG + PATH_SEPARATOR + "d" + i,
Collections.singletonList("s" + j),
schemas);
- IBatchReader tsFilesReader =
- new SeriesRawDataBatchReader(
+ IDataBlockReader tsFilesReader =
+ new SeriesDataBlockReader(
path,
TSDataType.VECTOR,
- EnvironmentUtils.TEST_QUERY_CONTEXT,
+ EnvironmentUtils.TEST_QUERY_FI_CONTEXT,
seqResources,
unseqResources,
- null,
- null,
true);
int count = 0;
while (tsFilesReader.hasNextBatch()) {
- BatchData batchData = tsFilesReader.nextBatch();
- while (batchData.hasCurrent()) {
+ TsBlock batchData = tsFilesReader.nextBatch();
+ for (int readIndex = 0, size = batchData.getPositionCount();
+ readIndex < size;
+ readIndex++) {
+ long currentTime = batchData.getTimeByIndex(readIndex);
+ long currentValue = batchData.getColumn(0).getLong(readIndex);
if (i == TsFileGeneratorUtils.getAlignDeviceOffset()
- && ((450 <= batchData.currentTime() && batchData.currentTime() < 550)
- || (550 <= batchData.currentTime() && batchData.currentTime() < 650))) {
- assertEquals(
- batchData.currentTime() + 20000,
- ((TsPrimitiveType[]) (batchData.currentValue()))[0].getValue());
+ && ((450 <= currentTime && currentTime < 550)
+ || (550 <= currentTime && currentTime < 650))) {
+ assertEquals(currentTime + 20000, currentValue);
} else if ((i < TsFileGeneratorUtils.getAlignDeviceOffset() + 3 && j < 4)
- && ((20 <= batchData.currentTime() && batchData.currentTime() < 220)
- || (250 <= batchData.currentTime() && batchData.currentTime() < 450)
- || (480 <= batchData.currentTime() && batchData.currentTime() < 680))) {
- assertEquals(
- batchData.currentTime() + 10000,
- ((TsPrimitiveType[]) (batchData.currentValue()))[0].getValue());
+ && ((20 <= currentTime && currentTime < 220)
+ || (250 <= currentTime && currentTime < 450)
+ || (480 <= currentTime && currentTime < 680))) {
+ assertEquals(currentTime + 10000, currentValue);
} else {
- assertEquals(
- batchData.currentTime(),
- ((TsPrimitiveType[]) (batchData.currentValue()))[0].getValue());
+ assertEquals(currentTime, currentValue);
}
count++;
- batchData.next();
}
}
tsFilesReader.close();
@@ -504,40 +488,35 @@ public class RewriteCrossSpaceCompactionWithReadPointPerformerTest extends Abstr
COMPACTION_TEST_SG + PATH_SEPARATOR + "d" + i,
Collections.singletonList("s" + j),
schemas);
- IBatchReader tsFilesReader =
- new SeriesRawDataBatchReader(
+ IDataBlockReader tsFilesReader =
+ new SeriesDataBlockReader(
path,
TSDataType.VECTOR,
- EnvironmentUtils.TEST_QUERY_CONTEXT,
+ EnvironmentUtils.TEST_QUERY_FI_CONTEXT,
tsFileManager.getTsFileList(true),
new ArrayList<>(),
- null,
- null,
true);
int count = 0;
while (tsFilesReader.hasNextBatch()) {
- BatchData batchData = tsFilesReader.nextBatch();
- while (batchData.hasCurrent()) {
+ TsBlock batchData = tsFilesReader.nextBatch();
+ for (int readIndex = 0, size = batchData.getPositionCount();
+ readIndex < size;
+ readIndex++) {
+ long currentTime = batchData.getTimeByIndex(readIndex);
+ long currentValue = batchData.getColumn(0).getLong(readIndex);
if (i == TsFileGeneratorUtils.getAlignDeviceOffset()
- && ((450 <= batchData.currentTime() && batchData.currentTime() < 550)
- || (550 <= batchData.currentTime() && batchData.currentTime() < 650))) {
- assertEquals(
- batchData.currentTime() + 20000,
- ((TsPrimitiveType[]) (batchData.currentValue()))[0].getValue());
+ && ((450 <= currentTime && currentTime < 550)
+ || (550 <= currentTime && currentTime < 650))) {
+ assertEquals(currentTime + 20000, currentValue);
} else if ((i < TsFileGeneratorUtils.getAlignDeviceOffset() + 3 && j < 4)
- && ((20 <= batchData.currentTime() && batchData.currentTime() < 220)
- || (250 <= batchData.currentTime() && batchData.currentTime() < 450)
- || (480 <= batchData.currentTime() && batchData.currentTime() < 680))) {
- assertEquals(
- batchData.currentTime() + 10000,
- ((TsPrimitiveType[]) (batchData.currentValue()))[0].getValue());
+ && ((20 <= currentTime && currentTime < 220)
+ || (250 <= currentTime && currentTime < 450)
+ || (480 <= currentTime && currentTime < 680))) {
+ assertEquals(currentTime + 10000, currentValue);
} else {
- assertEquals(
- batchData.currentTime(),
- ((TsPrimitiveType[]) (batchData.currentValue()))[0].getValue());
+ assertEquals(currentTime, currentValue);
}
count++;
- batchData.next();
}
}
tsFilesReader.close();
diff --git a/server/src/test/java/org/apache/iotdb/db/engine/compaction/inner/sizetiered/SizeTieredCompactionRecoverTest.java b/server/src/test/java/org/apache/iotdb/db/engine/compaction/inner/sizetiered/SizeTieredCompactionRecoverTest.java
index ff67346d45..142a6b6232 100644
--- a/server/src/test/java/org/apache/iotdb/db/engine/compaction/inner/sizetiered/SizeTieredCompactionRecoverTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/engine/compaction/inner/sizetiered/SizeTieredCompactionRecoverTest.java
@@ -30,6 +30,8 @@ import org.apache.iotdb.db.engine.compaction.inner.AbstractInnerSpaceCompactionT
import org.apache.iotdb.db.engine.compaction.log.CompactionLogger;
import org.apache.iotdb.db.engine.compaction.performer.ICompactionPerformer;
import org.apache.iotdb.db.engine.compaction.performer.impl.FastCompactionPerformer;
+import org.apache.iotdb.db.engine.compaction.reader.IDataBlockReader;
+import org.apache.iotdb.db.engine.compaction.reader.SeriesDataBlockReader;
import org.apache.iotdb.db.engine.compaction.task.CompactionRecoverTask;
import org.apache.iotdb.db.engine.compaction.task.CompactionTaskSummary;
import org.apache.iotdb.db.engine.compaction.utils.CompactionConfigRestorer;
@@ -39,14 +41,12 @@ import org.apache.iotdb.db.engine.storagegroup.TsFileNameGenerator;
import org.apache.iotdb.db.engine.storagegroup.TsFileResource;
import org.apache.iotdb.db.exception.StorageEngineException;
import org.apache.iotdb.db.query.control.FileReaderManager;
-import org.apache.iotdb.db.query.reader.series.SeriesRawDataBatchReader;
import org.apache.iotdb.db.utils.EnvironmentUtils;
import org.apache.iotdb.db.utils.SchemaTestUtils;
import org.apache.iotdb.tsfile.common.constant.TsFileConstant;
import org.apache.iotdb.tsfile.exception.write.WriteProcessException;
import org.apache.iotdb.tsfile.fileSystem.FSFactoryProducer;
-import org.apache.iotdb.tsfile.read.common.BatchData;
-import org.apache.iotdb.tsfile.read.reader.IBatchReader;
+import org.apache.iotdb.tsfile.read.common.block.TsBlock;
import org.apache.iotdb.tsfile.utils.Pair;
import org.apache.iotdb.tsfile.write.writer.TsFileOutput;
@@ -99,21 +99,19 @@ public class SizeTieredCompactionRecoverTest extends AbstractInnerSpaceCompactio
deviceIds[0]
+ TsFileConstant.PATH_SEPARATOR
+ measurementSchemas[0].getMeasurementId());
- IBatchReader tsFilesReader =
- new SeriesRawDataBatchReader(
+ IDataBlockReader tsFilesReader =
+ new SeriesDataBlockReader(
path,
measurementSchemas[0].getType(),
- EnvironmentUtils.TEST_QUERY_CONTEXT,
+ EnvironmentUtils.TEST_QUERY_FI_CONTEXT,
tsFileManager.getTsFileList(true),
new ArrayList<>(),
- null,
- null,
true);
int count = 0;
while (tsFilesReader.hasNextBatch()) {
- BatchData batchData = tsFilesReader.nextBatch();
- for (int i = 0; i < batchData.length(); i++) {
- assertEquals(batchData.getTimeByIndex(i), batchData.getDoubleByIndex(i), 0.001);
+ TsBlock batchData = tsFilesReader.nextBatch();
+ for (int i = 0, size = batchData.getPositionCount(); i < size; i++) {
+ assertEquals(batchData.getTimeByIndex(i), batchData.getColumn(0).getDouble(i), 0.001);
count++;
}
}
@@ -189,20 +187,18 @@ public class SizeTieredCompactionRecoverTest extends AbstractInnerSpaceCompactio
+ TsFileConstant.PATH_SEPARATOR
+ measurementSchemas[0].getMeasurementId());
tsFilesReader =
- new SeriesRawDataBatchReader(
+ new SeriesDataBlockReader(
path,
measurementSchemas[0].getType(),
- EnvironmentUtils.TEST_QUERY_CONTEXT,
+ EnvironmentUtils.TEST_QUERY_FI_CONTEXT,
tsFileManager.getTsFileList(true),
new ArrayList<>(),
- null,
- null,
true);
count = 0;
while (tsFilesReader.hasNextBatch()) {
- BatchData batchData = tsFilesReader.nextBatch();
- for (int i = 0; i < batchData.length(); i++) {
- assertEquals(batchData.getTimeByIndex(i), batchData.getDoubleByIndex(i), 0.001);
+ TsBlock batchData = tsFilesReader.nextBatch();
+ for (int i = 0, size = batchData.getPositionCount(); i < size; i++) {
+ assertEquals(batchData.getTimeByIndex(i), batchData.getColumn(0).getDouble(i), 0.001);
count++;
}
}
@@ -222,21 +218,19 @@ public class SizeTieredCompactionRecoverTest extends AbstractInnerSpaceCompactio
deviceIds[0]
+ TsFileConstant.PATH_SEPARATOR
+ measurementSchemas[0].getMeasurementId());
- IBatchReader tsFilesReader =
- new SeriesRawDataBatchReader(
+ IDataBlockReader tsFilesReader =
+ new SeriesDataBlockReader(
path,
measurementSchemas[0].getType(),
- EnvironmentUtils.TEST_QUERY_CONTEXT,
+ EnvironmentUtils.TEST_QUERY_FI_CONTEXT,
tsFileManager.getTsFileList(true),
new ArrayList<>(),
- null,
- null,
true);
int count = 0;
while (tsFilesReader.hasNextBatch()) {
- BatchData batchData = tsFilesReader.nextBatch();
- for (int i = 0; i < batchData.length(); i++) {
- assertEquals(batchData.getTimeByIndex(i), batchData.getDoubleByIndex(i), 0.001);
+ TsBlock batchData = tsFilesReader.nextBatch();
+ for (int i = 0, size = batchData.getPositionCount(); i < size; i++) {
+ assertEquals(batchData.getTimeByIndex(i), batchData.getColumn(0).getDouble(i), 0.001);
count++;
}
}
@@ -301,20 +295,18 @@ public class SizeTieredCompactionRecoverTest extends AbstractInnerSpaceCompactio
+ TsFileConstant.PATH_SEPARATOR
+ measurementSchemas[0].getMeasurementId());
tsFilesReader =
- new SeriesRawDataBatchReader(
+ new SeriesDataBlockReader(
path,
measurementSchemas[0].getType(),
- EnvironmentUtils.TEST_QUERY_CONTEXT,
+ EnvironmentUtils.TEST_QUERY_FI_CONTEXT,
tsFileManager.getTsFileList(true),
new ArrayList<>(),
- null,
- null,
true);
count = 0;
while (tsFilesReader.hasNextBatch()) {
- BatchData batchData = tsFilesReader.nextBatch();
- for (int i = 0; i < batchData.length(); i++) {
- assertEquals(batchData.getTimeByIndex(i), batchData.getDoubleByIndex(i), 0.001);
+ TsBlock batchData = tsFilesReader.nextBatch();
+ for (int i = 0, size = batchData.getPositionCount(); i < size; i++) {
+ assertEquals(batchData.getTimeByIndex(i), batchData.getColumn(0).getDouble(i), 0.001);
count++;
}
}
@@ -334,21 +326,19 @@ public class SizeTieredCompactionRecoverTest extends AbstractInnerSpaceCompactio
deviceIds[0]
+ TsFileConstant.PATH_SEPARATOR
+ measurementSchemas[0].getMeasurementId());
- IBatchReader tsFilesReader =
- new SeriesRawDataBatchReader(
+ IDataBlockReader tsFilesReader =
+ new SeriesDataBlockReader(
path,
measurementSchemas[0].getType(),
- EnvironmentUtils.TEST_QUERY_CONTEXT,
+ EnvironmentUtils.TEST_QUERY_FI_CONTEXT,
tsFileManager.getTsFileList(true),
new ArrayList<>(),
- null,
- null,
true);
int count = 0;
while (tsFilesReader.hasNextBatch()) {
- BatchData batchData = tsFilesReader.nextBatch();
- for (int i = 0; i < batchData.length(); i++) {
- assertEquals(batchData.getTimeByIndex(i), batchData.getDoubleByIndex(i), 0.001);
+ TsBlock batchData = tsFilesReader.nextBatch();
+ for (int i = 0, size = batchData.getPositionCount(); i < size; i++) {
+ assertEquals(batchData.getTimeByIndex(i), batchData.getColumn(0).getDouble(i), 0.001);
count++;
}
}
@@ -415,20 +405,18 @@ public class SizeTieredCompactionRecoverTest extends AbstractInnerSpaceCompactio
+ TsFileConstant.PATH_SEPARATOR
+ measurementSchemas[0].getMeasurementId());
tsFilesReader =
- new SeriesRawDataBatchReader(
+ new SeriesDataBlockReader(
path,
measurementSchemas[0].getType(),
- EnvironmentUtils.TEST_QUERY_CONTEXT,
+ EnvironmentUtils.TEST_QUERY_FI_CONTEXT,
tsFileManager.getTsFileList(true),
new ArrayList<>(),
- null,
- null,
true);
count = 0;
while (tsFilesReader.hasNextBatch()) {
- BatchData batchData = tsFilesReader.nextBatch();
- for (int i = 0; i < batchData.length(); i++) {
- assertEquals(batchData.getTimeByIndex(i), batchData.getDoubleByIndex(i), 0.001);
+ TsBlock batchData = tsFilesReader.nextBatch();
+ for (int i = 0, size = batchData.getPositionCount(); i < size; i++) {
+ assertEquals(batchData.getTimeByIndex(i), batchData.getColumn(0).getDouble(i), 0.001);
count++;
}
}
@@ -448,21 +436,19 @@ public class SizeTieredCompactionRecoverTest extends AbstractInnerSpaceCompactio
deviceIds[0]
+ TsFileConstant.PATH_SEPARATOR
+ measurementSchemas[0].getMeasurementId());
- IBatchReader tsFilesReader =
- new SeriesRawDataBatchReader(
+ IDataBlockReader tsFilesReader =
+ new SeriesDataBlockReader(
path,
measurementSchemas[0].getType(),
- EnvironmentUtils.TEST_QUERY_CONTEXT,
+ EnvironmentUtils.TEST_QUERY_FI_CONTEXT,
tsFileManager.getTsFileList(true),
new ArrayList<>(),
- null,
- null,
true);
int count = 0;
while (tsFilesReader.hasNextBatch()) {
- BatchData batchData = tsFilesReader.nextBatch();
- for (int i = 0; i < batchData.length(); i++) {
- assertEquals(batchData.getTimeByIndex(i), batchData.getDoubleByIndex(i), 0.001);
+ TsBlock batchData = tsFilesReader.nextBatch();
+ for (int i = 0, size = batchData.getPositionCount(); i < size; i++) {
+ assertEquals(batchData.getTimeByIndex(i), batchData.getColumn(0).getDouble(i), 0.001);
count++;
}
}
@@ -534,20 +520,18 @@ public class SizeTieredCompactionRecoverTest extends AbstractInnerSpaceCompactio
+ measurementSchemas[0].getMeasurementId());
tsFilesReader =
- new SeriesRawDataBatchReader(
+ new SeriesDataBlockReader(
path,
measurementSchemas[0].getType(),
- EnvironmentUtils.TEST_QUERY_CONTEXT,
+ EnvironmentUtils.TEST_QUERY_FI_CONTEXT,
tsFileManager.getTsFileList(true).subList(3, 6),
new ArrayList<>(),
- null,
- null,
true);
count = 0;
while (tsFilesReader.hasNextBatch()) {
- BatchData batchData = tsFilesReader.nextBatch();
- for (int i = 0; i < batchData.length(); i++) {
- assertEquals(batchData.getTimeByIndex(i), batchData.getDoubleByIndex(i), 0.001);
+ TsBlock batchData = tsFilesReader.nextBatch();
+ for (int i = 0, size = batchData.getPositionCount(); i < size; i++) {
+ assertEquals(batchData.getTimeByIndex(i), batchData.getColumn(0).getDouble(i), 0.001);
count++;
}
}
@@ -785,21 +769,19 @@ public class SizeTieredCompactionRecoverTest extends AbstractInnerSpaceCompactio
deviceIds[0]
+ TsFileConstant.PATH_SEPARATOR
+ measurementSchemas[0].getMeasurementId());
- IBatchReader tsFilesReader =
- new SeriesRawDataBatchReader(
+ IDataBlockReader tsFilesReader =
+ new SeriesDataBlockReader(
path,
measurementSchemas[0].getType(),
- EnvironmentUtils.TEST_QUERY_CONTEXT,
+ EnvironmentUtils.TEST_QUERY_FI_CONTEXT,
tsFileManager.getTsFileList(true),
new ArrayList<>(),
- null,
- null,
true);
int count = 0;
while (tsFilesReader.hasNextBatch()) {
- BatchData batchData = tsFilesReader.nextBatch();
- for (int i = 0; i < batchData.length(); i++) {
- assertEquals(batchData.getTimeByIndex(i), batchData.getDoubleByIndex(i), 0.001);
+ TsBlock batchData = tsFilesReader.nextBatch();
+ for (int i = 0, size = batchData.getPositionCount(); i < size; i++) {
+ assertEquals(batchData.getTimeByIndex(i), batchData.getColumn(0).getDouble(i), 0.001);
count++;
}
}
@@ -852,20 +834,18 @@ public class SizeTieredCompactionRecoverTest extends AbstractInnerSpaceCompactio
TimeSeriesMetadataCache.getInstance().clear();
ChunkCache.getInstance().clear();
tsFilesReader =
- new SeriesRawDataBatchReader(
+ new SeriesDataBlockReader(
path,
measurementSchemas[0].getType(),
- EnvironmentUtils.TEST_QUERY_CONTEXT,
+ EnvironmentUtils.TEST_QUERY_FI_CONTEXT,
tsFileManager.getTsFileList(true).subList(0, 5),
new ArrayList<>(),
- null,
- null,
true);
count = 0;
while (tsFilesReader.hasNextBatch()) {
- BatchData batchData = tsFilesReader.nextBatch();
- for (int i = 0; i < batchData.length(); i++) {
- assertEquals(batchData.getTimeByIndex(i), batchData.getDoubleByIndex(i), 0.001);
+ TsBlock batchData = tsFilesReader.nextBatch();
+ for (int i = 0, size = batchData.getPositionCount(); i < size; i++) {
+ assertEquals(batchData.getTimeByIndex(i), batchData.getColumn(0).getDouble(i), 0.001);
count++;
}
}
@@ -885,21 +865,19 @@ public class SizeTieredCompactionRecoverTest extends AbstractInnerSpaceCompactio
deviceIds[0]
+ TsFileConstant.PATH_SEPARATOR
+ measurementSchemas[0].getMeasurementId());
- IBatchReader tsFilesReader =
- new SeriesRawDataBatchReader(
+ IDataBlockReader tsFilesReader =
+ new SeriesDataBlockReader(
path,
measurementSchemas[0].getType(),
- EnvironmentUtils.TEST_QUERY_CONTEXT,
+ EnvironmentUtils.TEST_QUERY_FI_CONTEXT,
tsFileManager.getTsFileList(true),
new ArrayList<>(),
- null,
- null,
true);
int count = 0;
while (tsFilesReader.hasNextBatch()) {
- BatchData batchData = tsFilesReader.nextBatch();
- for (int i = 0; i < batchData.length(); i++) {
- assertEquals(batchData.getTimeByIndex(i), batchData.getDoubleByIndex(i), 0.001);
+ TsBlock batchData = tsFilesReader.nextBatch();
+ for (int i = 0, size = batchData.getPositionCount(); i < size; i++) {
+ assertEquals(batchData.getTimeByIndex(i), batchData.getColumn(0).getDouble(i), 0.001);
count++;
}
}
@@ -954,20 +932,18 @@ public class SizeTieredCompactionRecoverTest extends AbstractInnerSpaceCompactio
+ TsFileConstant.PATH_SEPARATOR
+ measurementSchemas[0].getMeasurementId());
tsFilesReader =
- new SeriesRawDataBatchReader(
+ new SeriesDataBlockReader(
path,
measurementSchemas[0].getType(),
- EnvironmentUtils.TEST_QUERY_CONTEXT,
+ EnvironmentUtils.TEST_QUERY_FI_CONTEXT,
tsFileManager.getTsFileList(true),
new ArrayList<>(),
- null,
- null,
true);
count = 0;
while (tsFilesReader.hasNextBatch()) {
- BatchData batchData = tsFilesReader.nextBatch();
- for (int i = 0; i < batchData.length(); i++) {
- assertEquals(batchData.getTimeByIndex(i), batchData.getDoubleByIndex(i), 0.001);
+ TsBlock batchData = tsFilesReader.nextBatch();
+ for (int i = 0, size = batchData.getPositionCount(); i < size; i++) {
+ assertEquals(batchData.getTimeByIndex(i), batchData.getColumn(0).getDouble(i), 0.001);
count++;
}
}
@@ -988,21 +964,19 @@ public class SizeTieredCompactionRecoverTest extends AbstractInnerSpaceCompactio
deviceIds[0]
+ TsFileConstant.PATH_SEPARATOR
+ measurementSchemas[0].getMeasurementId());
- IBatchReader tsFilesReader =
- new SeriesRawDataBatchReader(
+ IDataBlockReader tsFilesReader =
+ new SeriesDataBlockReader(
path,
measurementSchemas[0].getType(),
- EnvironmentUtils.TEST_QUERY_CONTEXT,
+ EnvironmentUtils.TEST_QUERY_FI_CONTEXT,
tsFileManager.getTsFileList(true),
new ArrayList<>(),
- null,
- null,
true);
int count = 0;
while (tsFilesReader.hasNextBatch()) {
- BatchData batchData = tsFilesReader.nextBatch();
- for (int i = 0; i < batchData.length(); i++) {
- assertEquals(batchData.getTimeByIndex(i), batchData.getDoubleByIndex(i), 0.001);
+ TsBlock batchData = tsFilesReader.nextBatch();
+ for (int i = 0, size = batchData.getPositionCount(); i < size; i++) {
+ assertEquals(batchData.getTimeByIndex(i), batchData.getColumn(0).getDouble(i), 0.001);
count++;
}
}
@@ -1059,20 +1033,18 @@ public class SizeTieredCompactionRecoverTest extends AbstractInnerSpaceCompactio
+ measurementSchemas[0].getMeasurementId());
logger.warn("TsFiles in list is {}", tsFileManager.getTsFileList(true));
tsFilesReader =
- new SeriesRawDataBatchReader(
+ new SeriesDataBlockReader(
path,
measurementSchemas[0].getType(),
- EnvironmentUtils.TEST_QUERY_CONTEXT,
+ EnvironmentUtils.TEST_QUERY_FI_CONTEXT,
tsFileManager.getTsFileList(true),
new ArrayList<>(),
- null,
- null,
true);
count = 0;
while (tsFilesReader.hasNextBatch()) {
- BatchData batchData = tsFilesReader.nextBatch();
- for (int i = 0; i < batchData.length(); i++) {
- assertEquals(batchData.getTimeByIndex(i), batchData.getDoubleByIndex(i), 0.001);
+ TsBlock batchData = tsFilesReader.nextBatch();
+ for (int i = 0, size = batchData.getPositionCount(); i < size; i++) {
+ assertEquals(batchData.getTimeByIndex(i), batchData.getColumn(0).getDouble(i), 0.001);
count++;
}
}
@@ -1103,21 +1075,19 @@ public class SizeTieredCompactionRecoverTest extends AbstractInnerSpaceCompactio
deviceIds[0]
+ TsFileConstant.PATH_SEPARATOR
+ measurementSchemas[0].getMeasurementId());
- IBatchReader tsFilesReader =
- new SeriesRawDataBatchReader(
+ IDataBlockReader tsFilesReader =
+ new SeriesDataBlockReader(
path,
measurementSchemas[0].getType(),
- EnvironmentUtils.TEST_QUERY_CONTEXT,
+ EnvironmentUtils.TEST_QUERY_FI_CONTEXT,
tsFileManager.getTsFileList(true),
new ArrayList<>(),
- null,
- null,
true);
int count = 0;
while (tsFilesReader.hasNextBatch()) {
- BatchData batchData = tsFilesReader.nextBatch();
- for (int i = 0; i < batchData.length(); i++) {
- assertEquals(batchData.getTimeByIndex(i), batchData.getDoubleByIndex(i), 0.001);
+ TsBlock batchData = tsFilesReader.nextBatch();
+ for (int i = 0, size = batchData.getPositionCount(); i < size; i++) {
+ assertEquals(batchData.getTimeByIndex(i), batchData.getColumn(0).getDouble(i), 0.001);
count++;
}
}
@@ -1146,21 +1116,19 @@ public class SizeTieredCompactionRecoverTest extends AbstractInnerSpaceCompactio
deviceIds[0]
+ TsFileConstant.PATH_SEPARATOR
+ measurementSchemas[0].getMeasurementId());
- IBatchReader tsFilesReader =
- new SeriesRawDataBatchReader(
+ IDataBlockReader tsFilesReader =
+ new SeriesDataBlockReader(
path,
measurementSchemas[0].getType(),
- EnvironmentUtils.TEST_QUERY_CONTEXT,
+ EnvironmentUtils.TEST_QUERY_FI_CONTEXT,
tsFileManager.getTsFileList(true),
new ArrayList<>(),
- null,
- null,
true);
int count = 0;
while (tsFilesReader.hasNextBatch()) {
- BatchData batchData = tsFilesReader.nextBatch();
- for (int i = 0; i < batchData.length(); i++) {
- assertEquals(batchData.getTimeByIndex(i), batchData.getDoubleByIndex(i), 0.001);
+ TsBlock batchData = tsFilesReader.nextBatch();
+ for (int i = 0, size = batchData.getPositionCount(); i < size; i++) {
+ assertEquals(batchData.getTimeByIndex(i), batchData.getColumn(0).getDouble(i), 0.001);
count++;
}
}
@@ -1204,21 +1172,19 @@ public class SizeTieredCompactionRecoverTest extends AbstractInnerSpaceCompactio
deviceIds[0]
+ TsFileConstant.PATH_SEPARATOR
+ measurementSchemas[0].getMeasurementId());
- IBatchReader tsFilesReader =
- new SeriesRawDataBatchReader(
+ IDataBlockReader tsFilesReader =
+ new SeriesDataBlockReader(
path,
measurementSchemas[0].getType(),
- EnvironmentUtils.TEST_QUERY_CONTEXT,
+ EnvironmentUtils.TEST_QUERY_FI_CONTEXT,
tsFileManager.getTsFileList(true),
new ArrayList<>(),
- null,
- null,
true);
int count = 0;
while (tsFilesReader.hasNextBatch()) {
- BatchData batchData = tsFilesReader.nextBatch();
- for (int i = 0; i < batchData.length(); i++) {
- assertEquals(batchData.getTimeByIndex(i), batchData.getDoubleByIndex(i), 0.001);
+ TsBlock batchData = tsFilesReader.nextBatch();
+ for (int i = 0, size = batchData.getPositionCount(); i < size; i++) {
+ assertEquals(batchData.getTimeByIndex(i), batchData.getColumn(0).getDouble(i), 0.001);
count++;
}
}