You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ja...@apache.org on 2022/04/20 02:54:02 UTC
[iotdb] branch master updated: [IOTDB-2844] Implementation of SeriesAggregateScanOperator and AggregateOperator - Part I (#5502)
This is an automated email from the ASF dual-hosted git repository.
jackietien pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new e0ecf5354e [IOTDB-2844] Implementation of SeriesAggregateScanOperator and AggregateOperator - Part I (#5502)
e0ecf5354e is described below
commit e0ecf5354e099b214d2f2e863050b51e2c3f40cc
Author: Xiangwei Wei <34...@users.noreply.github.com>
AuthorDate: Wed Apr 20 10:53:57 2022 +0800
[IOTDB-2844] Implementation of SeriesAggregateScanOperator and AggregateOperator - Part I (#5502)
---
.../apache/iotdb/db/mpp/execution/DataDriver.java | 4 +
.../iotdb/db/mpp/execution/DataDriverContext.java | 1 +
.../iotdb/db/mpp/execution/SchemaDriver.java | 1 +
.../db/mpp/execution/SchemaDriverContext.java | 1 +
.../source/SeriesAggregateScanOperator.java | 420 ++++++++++++++++++++-
.../db/mpp/operator/source/SeriesScanUtil.java | 10 +-
.../db/mpp/sql/planner/LocalExecutionPlanner.java | 28 +-
.../iotdb/db/mpp/sql/planner/QueryPlanBuilder.java | 5 +
.../db/mpp/sql/planner/plan/FragmentInstance.java | 1 +
.../plan/node/source/SeriesAggregateScanNode.java | 45 ++-
.../statement/component/GroupByTimeComponent.java | 73 ++++
.../query/dataset/groupby/GroupByFillDataSet.java | 8 +-
.../query/dataset/groupby/GroupByTimeDataSet.java | 30 +-
.../dataset/groupby/GroupByTimeEngineDataSet.java | 24 +-
.../groupby/GroupByWithValueFilterDataSet.java | 6 +-
.../groupby/GroupByWithoutValueFilterDataSet.java | 9 +-
.../iotdb/db/query/reader/series/SeriesReader.java | 3 +
.../timerangeiterator/AggrWindowIterator.java | 38 +-
.../timerangeiterator/ITimeRangeIterator.java | 8 +-
.../timerangeiterator/PreAggrWindowIterator.java | 38 +-
.../PreAggrWindowWithNaturalMonthIterator.java | 50 ++-
.../SingleTimeWindowIterator.java | 65 ++++
.../operator/SeriesAggregateScanOperatorTest.java | 373 ++++++++++++++++++
.../db/mpp/operator/SeriesScanOperatorTest.java | 6 +-
.../db/mpp/sql/plan/QueryLogicalPlanUtil.java | 8 +
.../source/SeriesAggregateScanNodeSerdeTest.java | 7 +-
.../dataset/groupby/GroupByTimeDataSetTest.java | 74 ++--
.../query/reader/series/SeriesReaderTestUtil.java | 8 +
.../iotdb/db/utils/TimeRangeIteratorTest.java | 230 +++++++----
.../iotdb/tsfile/read/common/block/TsBlock.java | 22 +-
.../tsfile/read/common/block/TsBlockBuilder.java | 5 +
.../common/block/column/BinaryColumnBuilder.java | 11 +
.../common/block/column/BooleanColumnBuilder.java | 11 +
.../read/common/block/column/ColumnBuilder.java | 5 +
.../common/block/column/DoubleColumnBuilder.java | 11 +
.../common/block/column/FloatColumnBuilder.java | 11 +
.../read/common/block/column/IntColumnBuilder.java | 11 +
.../common/block/column/LongColumnBuilder.java | 11 +
.../read/common/block/column/TimeColumn.java | 4 +
.../common/block/column/TimeColumnBuilder.java | 11 +
40 files changed, 1473 insertions(+), 214 deletions(-)
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/DataDriver.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/DataDriver.java
index 21aebe0214..5ae66b14d7 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/DataDriver.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/DataDriver.java
@@ -53,6 +53,10 @@ import java.util.stream.Collectors;
import static com.google.common.util.concurrent.MoreExecutors.directExecutor;
import static org.apache.iotdb.db.mpp.operator.Operator.NOT_BLOCKED;
+/**
+ * One dataDriver is responsible for one FragmentInstance which is for data query, which may
+ * contains several series.
+ */
@NotThreadSafe
public class DataDriver implements Driver {
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/DataDriverContext.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/DataDriverContext.java
index abd6ea4b90..dc08957a7b 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/DataDriverContext.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/DataDriverContext.java
@@ -25,6 +25,7 @@ import org.apache.iotdb.tsfile.read.filter.basic.Filter;
import java.util.List;
+/** TODO Add javadoc for context */
public class DataDriverContext extends DriverContext {
private final List<PartialPath> paths;
private final Filter timeFilter;
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/SchemaDriver.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/SchemaDriver.java
index 4020db4593..844fe417ba 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/SchemaDriver.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/SchemaDriver.java
@@ -39,6 +39,7 @@ import java.util.concurrent.atomic.AtomicReference;
import static com.google.common.util.concurrent.MoreExecutors.directExecutor;
import static org.apache.iotdb.db.mpp.operator.Operator.NOT_BLOCKED;
+/** One SchemaDriver is used to execute one FragmentInstance which is for metadata query. */
@NotThreadSafe
public class SchemaDriver implements Driver {
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/SchemaDriverContext.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/SchemaDriverContext.java
index a859df2b77..8781fa3137 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/SchemaDriverContext.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/SchemaDriverContext.java
@@ -20,6 +20,7 @@ package org.apache.iotdb.db.mpp.execution;
import org.apache.iotdb.db.metadata.schemaregion.ISchemaRegion;
+/** TODO Add javadoc for context */
public class SchemaDriverContext extends DriverContext {
private final ISchemaRegion schemaRegion;
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/operator/source/SeriesAggregateScanOperator.java b/server/src/main/java/org/apache/iotdb/db/mpp/operator/source/SeriesAggregateScanOperator.java
index c4b1015529..79586e3c4e 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/operator/source/SeriesAggregateScanOperator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/operator/source/SeriesAggregateScanOperator.java
@@ -19,18 +19,130 @@
package org.apache.iotdb.db.mpp.operator.source;
import org.apache.iotdb.db.engine.querycontext.QueryDataSource;
+import org.apache.iotdb.db.exception.query.QueryProcessException;
+import org.apache.iotdb.db.metadata.path.PartialPath;
import org.apache.iotdb.db.mpp.operator.OperatorContext;
import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNodeId;
+import org.apache.iotdb.db.mpp.sql.statement.component.GroupByTimeComponent;
+import org.apache.iotdb.db.query.aggregation.AggregateResult;
+import org.apache.iotdb.db.query.aggregation.AggregationType;
+import org.apache.iotdb.db.query.factory.AggregateResultFactory;
+import org.apache.iotdb.db.utils.SchemaUtils;
+import org.apache.iotdb.db.utils.timerangeiterator.ITimeRangeIterator;
+import org.apache.iotdb.db.utils.timerangeiterator.SingleTimeWindowIterator;
+import org.apache.iotdb.db.utils.timerangeiterator.TimeRangeIteratorFactory;
+import org.apache.iotdb.tsfile.file.metadata.statistics.Statistics;
+import org.apache.iotdb.tsfile.read.common.TimeRange;
import org.apache.iotdb.tsfile.read.common.block.TsBlock;
+import org.apache.iotdb.tsfile.read.common.block.TsBlock.TsBlockSingleColumnIterator;
+import org.apache.iotdb.tsfile.read.common.block.TsBlockBuilder;
+import org.apache.iotdb.tsfile.read.common.block.column.ColumnBuilder;
+import org.apache.iotdb.tsfile.read.common.block.column.TimeColumnBuilder;
+import org.apache.iotdb.tsfile.read.filter.basic.Filter;
import com.google.common.util.concurrent.ListenableFuture;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+/**
+ * This operator is responsible to do the aggregation calculation for one series based on global
+ * time range and time split parameter.
+ *
+ * <p>Every time next() is invoked, one tsBlock which contains current time window will be returned.
+ * In sliding window situation, current time window is a pre-aggregation window. If there is no time
+ * split parameter, i.e. aggregation without groupBy, just one tsBlock will be returned.
+ */
public class SeriesAggregateScanOperator implements DataSourceOperator {
+
+ private final OperatorContext operatorContext;
+ private final PlanNodeId sourceId;
+ private final SeriesScanUtil seriesScanUtil;
+ private final boolean ascending;
+ private List<AggregateResult> aggregateResultList;
+
+ private ITimeRangeIterator timeRangeIterator;
+ // current interval of aggregation window [curStartTime, curEndTime)
+ private TimeRange curTimeRange;
+
+ private TsBlockSingleColumnIterator preCachedData;
+ // used for resetting the preCachedData to the last read index
+ private int lastReadIndex;
+
+ private TsBlockBuilder tsBlockBuilder;
+ private TsBlock resultTsBlock;
+ private boolean hasCachedTsBlock = false;
+ private boolean finished = false;
+
+ public SeriesAggregateScanOperator(
+ PlanNodeId sourceId,
+ PartialPath seriesPath,
+ Set<String> allSensors,
+ OperatorContext context,
+ List<AggregationType> aggregateFuncList,
+ Filter timeFilter,
+ boolean ascending,
+ GroupByTimeComponent groupByTimeParameter) {
+ this.sourceId = sourceId;
+ this.operatorContext = context;
+ this.ascending = ascending;
+ this.seriesScanUtil =
+ new SeriesScanUtil(
+ seriesPath,
+ allSensors,
+ seriesPath.getSeriesType(),
+ context.getInstanceContext(),
+ timeFilter,
+ null,
+ ascending);
+ aggregateResultList = new ArrayList<>(aggregateFuncList.size());
+ for (AggregationType aggregationType : aggregateFuncList) {
+ aggregateResultList.add(
+ AggregateResultFactory.getAggrResultByType(
+ aggregationType,
+ seriesPath.getSeriesType(),
+ seriesScanUtil.getOrderUtils().getAscending()));
+ }
+ tsBlockBuilder =
+ new TsBlockBuilder(
+ aggregateFuncList.stream()
+ .map(
+ functionType ->
+ SchemaUtils.getSeriesTypeByPath(seriesPath, functionType.name()))
+ .collect(Collectors.toList()));
+ this.timeRangeIterator = initTimeRangeIterator(groupByTimeParameter);
+ }
+
+ /**
+ * If groupByTimeParameter is null, which means it's an aggregation query without down sampling.
+ * Aggregation query has only one time window and the result set of it does not contain a
+ * timestamp, so it doesn't matter what the time range returns.
+ */
+ public ITimeRangeIterator initTimeRangeIterator(GroupByTimeComponent groupByTimeParameter) {
+ if (groupByTimeParameter == null) {
+ return new SingleTimeWindowIterator(0, Long.MAX_VALUE);
+ } else {
+ return TimeRangeIteratorFactory.getTimeRangeIterator(
+ groupByTimeParameter.getStartTime(),
+ groupByTimeParameter.getEndTime(),
+ groupByTimeParameter.getInterval(),
+ groupByTimeParameter.getSlidingStep(),
+ ascending,
+ groupByTimeParameter.isIntervalByMonth(),
+ groupByTimeParameter.isSlidingStepByMonth(),
+ groupByTimeParameter.getInterval() > groupByTimeParameter.getSlidingStep());
+ }
+ }
+
@Override
public OperatorContext getOperatorContext() {
- return null;
+ return operatorContext;
}
+ // TODO
@Override
public ListenableFuture<Void> isBlocked() {
return DataSourceOperator.super.isBlocked();
@@ -38,14 +150,97 @@ public class SeriesAggregateScanOperator implements DataSourceOperator {
@Override
public TsBlock next() {
+ if (hasCachedTsBlock || hasNext()) {
+ hasCachedTsBlock = false;
+ return resultTsBlock;
+ }
return null;
}
@Override
public boolean hasNext() {
- return false;
+ if (hasCachedTsBlock) {
+ return true;
+ }
+ try {
+ if (!timeRangeIterator.hasNextTimeRange()) {
+ return false;
+ }
+ curTimeRange = timeRangeIterator.nextTimeRange();
+
+ // 1. Clear previous aggregation result
+ for (AggregateResult result : aggregateResultList) {
+ result.reset();
+ }
+
+ // 2. Calculate aggregation result based on current time window
+ if (calcFromCacheData(curTimeRange)) {
+ updateResultTsBlockUsingAggregateResult();
+ return true;
+ }
+
+ // read page data firstly
+ if (readAndCalcFromPage(curTimeRange)) {
+ updateResultTsBlockUsingAggregateResult();
+ return true;
+ }
+
+ // read chunk data secondly
+ if (readAndCalcFromChunk(curTimeRange)) {
+ updateResultTsBlockUsingAggregateResult();
+ return true;
+ }
+
+ // read from file first
+ while (seriesScanUtil.hasNextFile()) {
+ Statistics fileStatistics = seriesScanUtil.currentFileStatistics();
+ if (fileStatistics.getStartTime() >= curTimeRange.getMax()) {
+ if (ascending) {
+ updateResultTsBlockUsingAggregateResult();
+ return true;
+ } else {
+ seriesScanUtil.skipCurrentFile();
+ continue;
+ }
+ }
+ // calc from fileMetaData
+ if (canUseCurrentFileStatistics()
+ && curTimeRange.contains(fileStatistics.getStartTime(), fileStatistics.getEndTime())) {
+ calcFromStatistics(fileStatistics);
+ seriesScanUtil.skipCurrentFile();
+ continue;
+ }
+
+ // read chunk
+ if (readAndCalcFromChunk(curTimeRange)) {
+ updateResultTsBlockUsingAggregateResult();
+ return true;
+ }
+ }
+
+ updateResultTsBlockUsingAggregateResult();
+ return true;
+ } catch (IOException e) {
+ throw new RuntimeException("Error while scanning the file", e);
+ }
+ }
+
+ private void updateResultTsBlockUsingAggregateResult() {
+ // TODO AVG
+ tsBlockBuilder.reset();
+ TimeColumnBuilder timeColumnBuilder = tsBlockBuilder.getTimeColumnBuilder();
+ // Use start time of current time range as time column
+ timeColumnBuilder.writeLong(curTimeRange.getMin());
+ ColumnBuilder[] columnBuilders = tsBlockBuilder.getValueColumnBuilders();
+ for (int i = 0; i < aggregateResultList.size(); i++) {
+ columnBuilders[i].writeObject(aggregateResultList.get(i).getResult());
+ }
+ tsBlockBuilder.declarePosition();
+ resultTsBlock = tsBlockBuilder.build();
+ hasCachedTsBlock = true;
}
+ // TODO Implement it later?
@Override
public void close() throws Exception {
DataSourceOperator.super.close();
@@ -53,14 +248,229 @@ public class SeriesAggregateScanOperator implements DataSourceOperator {
@Override
public boolean isFinished() {
- return false;
+ return finished || (finished = hasNext());
}
@Override
public PlanNodeId getSourceId() {
- return null;
+ return sourceId;
}
@Override
- public void initQueryDataSource(QueryDataSource dataSource) {}
+ public void initQueryDataSource(QueryDataSource dataSource) {
+ seriesScanUtil.initQueryDataSource(dataSource);
+ }
+
+ /** @return if already get the result */
+ private boolean calcFromCacheData(TimeRange curTimeRange) throws IOException {
+ calcFromBatch(preCachedData, curTimeRange);
+ // The result is calculated from the cache
+ return (preCachedData != null
+ && (ascending
+ ? preCachedData.getEndTime() >= curTimeRange.getMax()
+ : preCachedData.getStartTime() < curTimeRange.getMin()))
+ || isEndCalc();
+ }
+
+ @SuppressWarnings("squid:S3776")
+ private void calcFromBatch(TsBlockSingleColumnIterator blockIterator, TimeRange curTimeRange)
+ throws IOException {
+ // check if the batchData does not contain points in current interval
+ if (!satisfied(blockIterator, curTimeRange)) {
+ return;
+ }
+
+ for (AggregateResult result : aggregateResultList) {
+ // current agg method has been calculated
+ if (result.hasFinalResult()) {
+ continue;
+ }
+ // lazy reset batch data for calculation
+ blockIterator.setRowIndex(lastReadIndex);
+ // skip points that cannot be calculated
+ skipOutOfTimeRangePoints(blockIterator, curTimeRange);
+
+ if (blockIterator.hasNext()) {
+ result.updateResultFromPageData(
+ blockIterator, curTimeRange.getMin(), curTimeRange.getMax());
+ }
+ }
+
+ // reset the last position to current Index
+ lastReadIndex = blockIterator.getRowIndex();
+
+ // can calc for next interval
+ if (blockIterator.hasNext()) {
+ preCachedData = blockIterator;
+ }
+ }
+
+ // skip points that cannot be calculated
+ private void skipOutOfTimeRangePoints(
+ TsBlockSingleColumnIterator tsBlockIterator, TimeRange curTimeRange) {
+ if (ascending) {
+ while (tsBlockIterator.hasNext() && tsBlockIterator.currentTime() < curTimeRange.getMin()) {
+ tsBlockIterator.next();
+ }
+ } else {
+ while (tsBlockIterator.hasNext() && tsBlockIterator.currentTime() >= curTimeRange.getMax()) {
+ tsBlockIterator.next();
+ }
+ }
+ }
+
+ private boolean satisfied(TsBlockSingleColumnIterator tsBlockIterator, TimeRange timeRange) {
+ if (tsBlockIterator == null || !tsBlockIterator.hasNext()) {
+ return false;
+ }
+
+ if (ascending
+ && (tsBlockIterator.getEndTime() < timeRange.getMin()
+ || tsBlockIterator.currentTime() >= timeRange.getMax())) {
+ return false;
+ }
+ if (!ascending
+ && (tsBlockIterator.getStartTime() >= timeRange.getMax()
+ || tsBlockIterator.currentTime() < timeRange.getMin())) {
+ preCachedData = tsBlockIterator;
+ return false;
+ }
+ return true;
+ }
+
+ private boolean isEndCalc() {
+ for (AggregateResult result : aggregateResultList) {
+ if (!result.hasFinalResult()) {
+ return false;
+ }
+ }
+ return true;
+ }
+
+ @SuppressWarnings("squid:S3776") // Suppress high Cognitive Complexity warning
+ private boolean readAndCalcFromPage(TimeRange curTimeRange) throws IOException {
+ while (seriesScanUtil.hasNextPage()) {
+ Statistics pageStatistics = seriesScanUtil.currentPageStatistics();
+ // must be non overlapped page
+ if (pageStatistics != null) {
+ // There is no more eligible points in current time range
+ if (pageStatistics.getStartTime() >= curTimeRange.getMax()) {
+ if (ascending) {
+ return true;
+ } else {
+ seriesScanUtil.skipCurrentPage();
+ continue;
+ }
+ }
+ // can use pageHeader
+ if (canUseCurrentPageStatistics()
+ && curTimeRange.contains(pageStatistics.getStartTime(), pageStatistics.getEndTime())) {
+ calcFromStatistics(pageStatistics);
+ seriesScanUtil.skipCurrentPage();
+ if (isEndCalc()) {
+ return true;
+ }
+ continue;
+ }
+ }
+
+ // calc from page data
+ TsBlockSingleColumnIterator tsBlockIterator =
+ seriesScanUtil.nextPage().getTsBlockSingleColumnIterator();
+ if (tsBlockIterator == null || !tsBlockIterator.hasNext()) {
+ continue;
+ }
+
+ // reset the last position to current Index
+ lastReadIndex = tsBlockIterator.getRowIndex();
+
+ // stop calc and cached current batchData
+ if (ascending && tsBlockIterator.currentTime() >= curTimeRange.getMax()) {
+ preCachedData = tsBlockIterator;
+ return true;
+ }
+
+ // calc from batch data
+ calcFromBatch(tsBlockIterator, curTimeRange);
+
+ // judge whether the calculation finished
+ if (isEndCalc()
+ || (tsBlockIterator.hasNext()
+ && (ascending
+ ? tsBlockIterator.currentTime() >= curTimeRange.getMax()
+ : tsBlockIterator.currentTime() < curTimeRange.getMin()))) {
+ return true;
+ }
+ }
+ return false;
+ }
+
+ private boolean readAndCalcFromChunk(TimeRange curTimeRange) throws IOException {
+ while (seriesScanUtil.hasNextChunk()) {
+ Statistics chunkStatistics = seriesScanUtil.currentChunkStatistics();
+ if (chunkStatistics.getStartTime() >= curTimeRange.getMax()) {
+ if (ascending) {
+ return true;
+ } else {
+ seriesScanUtil.skipCurrentChunk();
+ continue;
+ }
+ }
+ // calc from chunkMetaData
+ if (canUseCurrentChunkStatistics()
+ && curTimeRange.contains(chunkStatistics.getStartTime(), chunkStatistics.getEndTime())) {
+ calcFromStatistics(chunkStatistics);
+ seriesScanUtil.skipCurrentChunk();
+ continue;
+ }
+ // read page
+ if (readAndCalcFromPage(curTimeRange)) {
+ return true;
+ }
+ }
+ return false;
+ }
+
+ private void calcFromStatistics(Statistics statistics) {
+ try {
+ for (AggregateResult result : aggregateResultList) {
+ if (result.hasFinalResult()) {
+ continue;
+ }
+ result.updateResultFromStatistics(statistics);
+ }
+ } catch (QueryProcessException e) {
+ throw new RuntimeException("Error while updating result using statistics", e);
+ }
+ }
+
+ public boolean canUseCurrentFileStatistics() throws IOException {
+ Statistics fileStatistics = seriesScanUtil.currentFileStatistics();
+ return !seriesScanUtil.isFileOverlapped()
+ && containedByTimeFilter(fileStatistics)
+ && !seriesScanUtil.currentFileModified();
+ }
+
+ public boolean canUseCurrentChunkStatistics() throws IOException {
+ Statistics chunkStatistics = seriesScanUtil.currentChunkStatistics();
+ return !seriesScanUtil.isChunkOverlapped()
+ && containedByTimeFilter(chunkStatistics)
+ && !seriesScanUtil.currentChunkModified();
+ }
+
+ public boolean canUseCurrentPageStatistics() throws IOException {
+ Statistics currentPageStatistics = seriesScanUtil.currentPageStatistics();
+ if (currentPageStatistics == null) {
+ return false;
+ }
+ return !seriesScanUtil.isPageOverlapped()
+ && containedByTimeFilter(currentPageStatistics)
+ && !seriesScanUtil.currentPageModified();
+ }
+
+ private boolean containedByTimeFilter(Statistics statistics) {
+ Filter timeFilter = seriesScanUtil.getTimeFilter();
+ return timeFilter == null
+ || timeFilter.containStartEndTime(statistics.getStartTime(), statistics.getEndTime());
+ }
}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/operator/source/SeriesScanUtil.java b/server/src/main/java/org/apache/iotdb/db/mpp/operator/source/SeriesScanUtil.java
index c6493bedf4..fad08b703d 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/operator/source/SeriesScanUtil.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/operator/source/SeriesScanUtil.java
@@ -136,12 +136,11 @@ public class SeriesScanUtil {
if (ascending) {
this.orderUtils = new AscTimeOrderUtils();
mergeReader = getPriorityMergeReader();
- this.curUnseqFileIndex = 0;
} else {
this.orderUtils = new DescTimeOrderUtils();
mergeReader = getDescPriorityMergeReader();
- this.curUnseqFileIndex = 0;
}
+ this.curUnseqFileIndex = 0;
unSeqTimeSeriesMetadata =
new PriorityQueue<>(
@@ -271,6 +270,9 @@ public class SeriesScanUtil {
if (firstChunkMetadata != null) {
return true;
+ // hasNextFile() has not been invoked
+ } else if (firstTimeSeriesMetadata == null && cachedChunkMetadata.isEmpty()) {
+ return false;
}
while (firstChunkMetadata == null && (!cachedChunkMetadata.isEmpty() || hasNextFile())) {
@@ -1085,6 +1087,10 @@ public class SeriesScanUtil {
return timeFilter;
}
+ public TimeOrderUtils getOrderUtils() {
+ return orderUtils;
+ }
+
private class VersionPageReader {
protected PriorityMergeReader.MergeReaderPriority version;
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/LocalExecutionPlanner.java b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/LocalExecutionPlanner.java
index e02becf770..e5d60439bd 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/LocalExecutionPlanner.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/LocalExecutionPlanner.java
@@ -43,6 +43,7 @@ import org.apache.iotdb.db.mpp.operator.schema.SchemaMergeOperator;
import org.apache.iotdb.db.mpp.operator.schema.TimeSeriesSchemaScanOperator;
import org.apache.iotdb.db.mpp.operator.source.DataSourceOperator;
import org.apache.iotdb.db.mpp.operator.source.ExchangeOperator;
+import org.apache.iotdb.db.mpp.operator.source.SeriesAggregateScanOperator;
import org.apache.iotdb.db.mpp.operator.source.SeriesScanOperator;
import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNode;
import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanVisitor;
@@ -76,7 +77,7 @@ import static com.google.common.base.Preconditions.checkArgument;
import static java.util.Objects.requireNonNull;
/**
- * used to plan a fragment instance. Currently, we simply change it from PlanNode to executable
+ * Used to plan a fragment instance. Currently, we simply change it from PlanNode to executable
* Operator tree, but in the future, we may split one fragment instance into multiple pipeline to
* run a fragment instance parallel and take full advantage of multi-cores
*/
@@ -214,7 +215,29 @@ public class LocalExecutionPlanner {
@Override
public Operator visitSeriesAggregate(
SeriesAggregateScanNode node, LocalExecutionPlanContext context) {
- return super.visitSeriesAggregate(node, context);
+ PartialPath seriesPath = node.getSeriesPath();
+ boolean ascending = node.getScanOrder() == OrderBy.TIMESTAMP_ASC;
+ OperatorContext operatorContext =
+ context.instanceContext.addOperatorContext(
+ context.getNextOperatorId(),
+ node.getPlanNodeId(),
+ SeriesAggregateScanNode.class.getSimpleName());
+
+ SeriesAggregateScanOperator aggregateScanOperator =
+ new SeriesAggregateScanOperator(
+ node.getPlanNodeId(),
+ seriesPath,
+ node.getAllSensors(),
+ operatorContext,
+ node.getAggregateFuncList(),
+ node.getTimeFilter(),
+ ascending,
+ node.getGroupByTimeParameter());
+
+ context.addSourceOperator(aggregateScanOperator);
+ context.addPath(seriesPath);
+
+ return aggregateScanOperator;
}
@Override
@@ -355,6 +378,7 @@ public class LocalExecutionPlanner {
private static class LocalExecutionPlanContext {
private final FragmentInstanceContext instanceContext;
private final List<PartialPath> paths;
+ // Used to lock corresponding query resources
private final List<DataSourceOperator> sourceOperators;
private ISinkHandle sinkHandle;
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/QueryPlanBuilder.java b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/QueryPlanBuilder.java
index 4b0ee9c1ff..db7497ea61 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/QueryPlanBuilder.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/QueryPlanBuilder.java
@@ -109,6 +109,10 @@ public class QueryPlanBuilder {
for (Map.Entry<String, Map<PartialPath, Set<AggregationType>>> entry :
deviceNameToAggregationsMap.entrySet()) {
String deviceName = entry.getKey();
+ Set<String> allSensors =
+ entry.getValue().keySet().stream()
+ .map(PartialPath::getMeasurement)
+ .collect(Collectors.toSet());
for (PartialPath path : entry.getValue().keySet()) {
deviceNameToSourceNodesMap
@@ -117,6 +121,7 @@ public class QueryPlanBuilder {
new SeriesAggregateScanNode(
context.getQueryId().genPlanNodeId(),
path,
+ allSensors,
new ArrayList<>(entry.getValue().get(path)),
scanOrder,
timeFilter,
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/FragmentInstance.java b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/FragmentInstance.java
index a3065c8c03..7058301ce8 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/FragmentInstance.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/FragmentInstance.java
@@ -34,6 +34,7 @@ import java.nio.ByteBuffer;
import java.util.Objects;
public class FragmentInstance implements IConsensusRequest {
+
private final FragmentInstanceId id;
private final QueryType type;
// The reference of PlanFragment which this instance is generated from
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/source/SeriesAggregateScanNode.java b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/source/SeriesAggregateScanNode.java
index dcde49fd9a..fde2b903a1 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/source/SeriesAggregateScanNode.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/source/SeriesAggregateScanNode.java
@@ -45,8 +45,10 @@ import java.io.DataOutputStream;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.ArrayList;
+import java.util.HashSet;
import java.util.List;
import java.util.Objects;
+import java.util.Set;
import java.util.stream.Collectors;
/**
@@ -71,6 +73,9 @@ public class SeriesAggregateScanNode extends SourceNode implements IOutputPlanNo
private final PartialPath seriesPath;
private final List<AggregationType> aggregateFuncList;
+ // all the sensors in seriesPath's device of current query
+ private Set<String> allSensors;
+
// The order to traverse the data.
// Currently, we only support TIMESTAMP_ASC and TIMESTAMP_DESC here.
// The default order is TIMESTAMP_ASC, which means "order by timestamp asc"
@@ -90,12 +95,14 @@ public class SeriesAggregateScanNode extends SourceNode implements IOutputPlanNo
public SeriesAggregateScanNode(
PlanNodeId id,
PartialPath seriesPath,
+ Set<String> allSensors,
List<AggregationType> aggregateFuncList,
OrderBy scanOrder,
Filter timeFilter,
GroupByTimeComponent groupByTimeParameter) {
super(id);
this.seriesPath = seriesPath;
+ this.allSensors = allSensors;
this.aggregateFuncList = aggregateFuncList;
this.scanOrder = scanOrder;
this.timeFilter = timeFilter;
@@ -109,6 +116,22 @@ public class SeriesAggregateScanNode extends SourceNode implements IOutputPlanNo
.collect(Collectors.toList());
}
+ public OrderBy getScanOrder() {
+ return scanOrder;
+ }
+
+ public Set<String> getAllSensors() {
+ return allSensors;
+ }
+
+ public Filter getTimeFilter() {
+ return timeFilter;
+ }
+
+ public GroupByTimeComponent getGroupByTimeParameter() {
+ return groupByTimeParameter;
+ }
+
@Override
public List<PlanNode> getChildren() {
return ImmutableList.of();
@@ -167,6 +190,10 @@ public class SeriesAggregateScanNode extends SourceNode implements IOutputPlanNo
protected void serializeAttributes(ByteBuffer byteBuffer) {
PlanNodeType.SERIES_AGGREGATE_SCAN.serialize(byteBuffer);
seriesPath.serialize(byteBuffer);
+ ReadWriteIOUtils.write(allSensors.size(), byteBuffer);
+ for (String sensor : allSensors) {
+ ReadWriteIOUtils.write(sensor, byteBuffer);
+ }
ReadWriteIOUtils.write(aggregateFuncList.size(), byteBuffer);
for (AggregationType aggregationType : aggregateFuncList) {
ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();
@@ -180,12 +207,17 @@ public class SeriesAggregateScanNode extends SourceNode implements IOutputPlanNo
}
ReadWriteIOUtils.write(scanOrder.ordinal(), byteBuffer);
timeFilter.serialize(byteBuffer);
- // TODO serialize groupByTimeParameter
+ groupByTimeParameter.serialize(byteBuffer);
regionReplicaSet.serializeImpl(byteBuffer);
}
public static SeriesAggregateScanNode deserialize(ByteBuffer byteBuffer) {
PartialPath partialPath = (PartialPath) PathDeserializeUtil.deserialize(byteBuffer);
+ int allSensorsSize = ReadWriteIOUtils.readInt(byteBuffer);
+ Set<String> allSensors = new HashSet<>();
+ for (int i = 0; i < allSensorsSize; i++) {
+ allSensors.add(ReadWriteIOUtils.readString(byteBuffer));
+ }
int aggregateFuncSize = ReadWriteIOUtils.readInt(byteBuffer);
List<AggregationType> aggregateFuncList = new ArrayList<>();
for (int i = 0; i < aggregateFuncSize; i++) {
@@ -194,12 +226,18 @@ public class SeriesAggregateScanNode extends SourceNode implements IOutputPlanNo
OrderBy scanOrder = OrderBy.values()[ReadWriteIOUtils.readInt(byteBuffer)];
Filter timeFilter = FilterFactory.deserialize(byteBuffer);
- // TODO serialize groupByTimeParameter
+ GroupByTimeComponent groupByTimeComponent = GroupByTimeComponent.deserialize(byteBuffer);
RegionReplicaSet regionReplicaSet = RegionReplicaSet.deserializeImpl(byteBuffer);
PlanNodeId planNodeId = PlanNodeId.deserialize(byteBuffer);
SeriesAggregateScanNode seriesAggregateScanNode =
new SeriesAggregateScanNode(
- planNodeId, partialPath, aggregateFuncList, scanOrder, timeFilter, null);
+ planNodeId,
+ partialPath,
+ allSensors,
+ aggregateFuncList,
+ scanOrder,
+ timeFilter,
+ groupByTimeComponent);
seriesAggregateScanNode.regionReplicaSet = regionReplicaSet;
return seriesAggregateScanNode;
}
@@ -233,6 +271,7 @@ public class SeriesAggregateScanNode extends SourceNode implements IOutputPlanNo
SeriesAggregateScanNode that = (SeriesAggregateScanNode) o;
return Objects.equals(groupByTimeParameter, that.groupByTimeParameter)
&& Objects.equals(seriesPath, that.seriesPath)
+ && Objects.equals(allSensors, that.allSensors)
&& Objects.equals(
aggregateFuncList.stream().sorted().collect(Collectors.toList()),
that.aggregateFuncList.stream().sorted().collect(Collectors.toList()))
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/sql/statement/component/GroupByTimeComponent.java b/server/src/main/java/org/apache/iotdb/db/mpp/sql/statement/component/GroupByTimeComponent.java
index dbb2daa415..026297a331 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/sql/statement/component/GroupByTimeComponent.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/sql/statement/component/GroupByTimeComponent.java
@@ -20,6 +20,10 @@
package org.apache.iotdb.db.mpp.sql.statement.component;
import org.apache.iotdb.db.mpp.sql.statement.StatementNode;
+import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;
+
+import java.nio.ByteBuffer;
+import java.util.Objects;
/** This class maintains information of {@code GROUP BY} clause. */
public class GroupByTimeComponent extends StatementNode {
@@ -43,6 +47,28 @@ public class GroupByTimeComponent extends StatementNode {
public GroupByTimeComponent() {}
+ public GroupByTimeComponent(
+ long startTime, long endTime, long interval, long slidingStep, boolean leftCRightO) {
+ this(startTime, endTime, interval, slidingStep, false, false, leftCRightO);
+ }
+
+ public GroupByTimeComponent(
+ long startTime,
+ long endTime,
+ long interval,
+ long slidingStep,
+ boolean isIntervalByMonth,
+ boolean isSlidingStepByMonth,
+ boolean leftCRightO) {
+ this.startTime = startTime;
+ this.endTime = endTime;
+ this.interval = interval;
+ this.slidingStep = slidingStep;
+ this.isIntervalByMonth = isIntervalByMonth;
+ this.isSlidingStepByMonth = isSlidingStepByMonth;
+ this.leftCRightO = leftCRightO;
+ }
+
public boolean isLeftCRightO() {
return leftCRightO;
}
@@ -98,4 +124,51 @@ public class GroupByTimeComponent extends StatementNode {
public void setIntervalByMonth(boolean isIntervalByMonth) {
this.isIntervalByMonth = isIntervalByMonth;
}
+
+ public void serialize(ByteBuffer buffer) {
+ ReadWriteIOUtils.write(startTime, buffer);
+ ReadWriteIOUtils.write(endTime, buffer);
+ ReadWriteIOUtils.write(interval, buffer);
+ ReadWriteIOUtils.write(slidingStep, buffer);
+ ReadWriteIOUtils.write(isIntervalByMonth, buffer);
+ ReadWriteIOUtils.write(isSlidingStepByMonth, buffer);
+ ReadWriteIOUtils.write(leftCRightO, buffer);
+ }
+
+ public static GroupByTimeComponent deserialize(ByteBuffer buffer) {
+ GroupByTimeComponent groupByTimeComponent = new GroupByTimeComponent();
+ groupByTimeComponent.setStartTime(ReadWriteIOUtils.readLong(buffer));
+ groupByTimeComponent.setEndTime(ReadWriteIOUtils.readLong(buffer));
+ groupByTimeComponent.setInterval(ReadWriteIOUtils.readLong(buffer));
+ groupByTimeComponent.setSlidingStep(ReadWriteIOUtils.readLong(buffer));
+ groupByTimeComponent.setIntervalByMonth(ReadWriteIOUtils.readBool(buffer));
+ groupByTimeComponent.setSlidingStepByMonth(ReadWriteIOUtils.readBool(buffer));
+ groupByTimeComponent.setLeftCRightO(ReadWriteIOUtils.readBool(buffer));
+ return groupByTimeComponent;
+ }
+
+ public boolean equals(Object obj) {
+ if (!(obj instanceof GroupByTimeComponent)) {
+ return false;
+ }
+ GroupByTimeComponent other = (GroupByTimeComponent) obj;
+ return this.startTime == other.startTime
+ && this.endTime == other.endTime
+ && this.interval == other.interval
+ && this.slidingStep == other.slidingStep
+ && this.isSlidingStepByMonth == other.isSlidingStepByMonth
+ && this.isIntervalByMonth == other.isIntervalByMonth
+ && this.leftCRightO == other.leftCRightO;
+ }
+
+ public int hashCode() {
+ return Objects.hash(
+ startTime,
+ endTime,
+ interval,
+ slidingStep,
+ isIntervalByMonth,
+ isSlidingStepByMonth,
+ leftCRightO);
+ }
}
diff --git a/server/src/main/java/org/apache/iotdb/db/query/dataset/groupby/GroupByFillDataSet.java b/server/src/main/java/org/apache/iotdb/db/query/dataset/groupby/GroupByFillDataSet.java
index aa34aa3677..5b0c9a7607 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/dataset/groupby/GroupByFillDataSet.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/dataset/groupby/GroupByFillDataSet.java
@@ -153,11 +153,11 @@ public class GroupByFillDataSet extends GroupByTimeDataSet {
RowRecord record;
long curTimestamp;
if (leftCRightO) {
- curTimestamp = curStartTime;
- record = new RowRecord(curStartTime);
+ curTimestamp = curAggrTimeRange.getMin();
+ record = new RowRecord(curAggrTimeRange.getMin());
} else {
- curTimestamp = curEndTime - 1;
- record = new RowRecord(curEndTime - 1);
+ curTimestamp = curAggrTimeRange.getMax() - 1;
+ record = new RowRecord(curAggrTimeRange.getMax() - 1);
}
for (int i = 0; i < aggregations.size(); i++) {
diff --git a/server/src/main/java/org/apache/iotdb/db/query/dataset/groupby/GroupByTimeDataSet.java b/server/src/main/java/org/apache/iotdb/db/query/dataset/groupby/GroupByTimeDataSet.java
index 8c269f7fd7..16d039e370 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/dataset/groupby/GroupByTimeDataSet.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/dataset/groupby/GroupByTimeDataSet.java
@@ -27,8 +27,8 @@ import org.apache.iotdb.db.query.executor.groupby.SlidingWindowGroupByExecutor;
import org.apache.iotdb.db.utils.timerangeiterator.ITimeRangeIterator;
import org.apache.iotdb.db.utils.timerangeiterator.TimeRangeIteratorFactory;
import org.apache.iotdb.tsfile.read.common.RowRecord;
+import org.apache.iotdb.tsfile.read.common.TimeRange;
import org.apache.iotdb.tsfile.read.query.dataset.QueryDataSet;
-import org.apache.iotdb.tsfile.utils.Pair;
import java.io.IOException;
import java.util.ArrayList;
@@ -45,13 +45,11 @@ public abstract class GroupByTimeDataSet extends QueryDataSet {
protected long endTime;
// current interval of aggregation window [curStartTime, curEndTime)
- protected long curStartTime;
- protected long curEndTime;
+ protected TimeRange curAggrTimeRange;
protected boolean hasCachedTimeInterval;
// current interval of pre-aggregation window [curStartTime, curEndTime)
- protected long curPreAggrStartTime;
- protected long curPreAggrEndTime;
+ protected TimeRange curPreAggrTimeRange;
protected boolean leftCRightO;
protected boolean isIntervalByMonth = false;
@@ -125,18 +123,10 @@ public abstract class GroupByTimeDataSet extends QueryDataSet {
true);
// find the first aggregation interval
- Pair<Long, Long> retTimeRange;
- retTimeRange = aggrWindowIterator.getFirstTimeRange();
-
- curStartTime = retTimeRange.left;
- curEndTime = retTimeRange.right;
+ curAggrTimeRange = aggrWindowIterator.nextTimeRange();
// find the first pre-aggregation interval
- Pair<Long, Long> retPerAggrTimeRange;
- retPerAggrTimeRange = preAggrWindowIterator.getFirstTimeRange();
-
- curPreAggrStartTime = retPerAggrTimeRange.left;
- curPreAggrEndTime = retPerAggrTimeRange.right;
+ curPreAggrTimeRange = preAggrWindowIterator.nextTimeRange();
this.hasCachedTimeInterval = true;
@@ -151,12 +141,10 @@ public abstract class GroupByTimeDataSet extends QueryDataSet {
}
// find the next aggregation interval
- Pair<Long, Long> nextTimeRange = aggrWindowIterator.getNextTimeRange(curStartTime);
- if (nextTimeRange == null) {
+ if (!aggrWindowIterator.hasNextTimeRange()) {
return false;
}
- curStartTime = nextTimeRange.left;
- curEndTime = nextTimeRange.right;
+ curAggrTimeRange = aggrWindowIterator.nextTimeRange();
hasCachedTimeInterval = true;
return true;
@@ -170,8 +158,8 @@ public abstract class GroupByTimeDataSet extends QueryDataSet {
}
@TestOnly
- public Pair<Long, Long> nextTimePartition() {
+ public TimeRange nextTimePartition() {
hasCachedTimeInterval = false;
- return new Pair<>(curStartTime, curEndTime);
+ return curAggrTimeRange;
}
}
diff --git a/server/src/main/java/org/apache/iotdb/db/query/dataset/groupby/GroupByTimeEngineDataSet.java b/server/src/main/java/org/apache/iotdb/db/query/dataset/groupby/GroupByTimeEngineDataSet.java
index 944f5896bc..d822f08780 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/dataset/groupby/GroupByTimeEngineDataSet.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/dataset/groupby/GroupByTimeEngineDataSet.java
@@ -23,7 +23,7 @@ import org.apache.iotdb.db.qp.physical.crud.GroupByTimePlan;
import org.apache.iotdb.db.query.aggregation.AggregateResult;
import org.apache.iotdb.db.query.context.QueryContext;
import org.apache.iotdb.tsfile.read.common.RowRecord;
-import org.apache.iotdb.tsfile.utils.Pair;
+import org.apache.iotdb.tsfile.read.common.TimeRange;
import java.io.IOException;
@@ -51,9 +51,9 @@ public abstract class GroupByTimeEngineDataSet extends GroupByTimeDataSet {
protected RowRecord constructRowRecord(AggregateResult[] aggregateResultList) {
RowRecord record;
if (leftCRightO) {
- record = new RowRecord(curStartTime);
+ record = new RowRecord(curAggrTimeRange.getMin());
} else {
- record = new RowRecord(curEndTime - 1);
+ record = new RowRecord(curAggrTimeRange.getMax() - 1);
}
for (AggregateResult res : aggregateResultList) {
if (res == null) {
@@ -66,22 +66,24 @@ public abstract class GroupByTimeEngineDataSet extends GroupByTimeDataSet {
}
protected boolean isEndCal() {
- if (curPreAggrStartTime == -1) {
+ if (curPreAggrTimeRange.getMin() == -1) {
return true;
}
- return ascending ? curPreAggrStartTime >= curEndTime : curPreAggrEndTime <= curStartTime;
+ return ascending
+ ? curPreAggrTimeRange.getMin() >= curAggrTimeRange.getMax()
+ : curPreAggrTimeRange.getMax() <= curAggrTimeRange.getMin();
}
// find the next pre-aggregation interval
protected void updatePreAggrInterval() {
- Pair<Long, Long> retPerAggrTimeRange;
- retPerAggrTimeRange = preAggrWindowIterator.getNextTimeRange(curPreAggrStartTime);
+ TimeRange retPerAggrTimeRange = null;
+ if (preAggrWindowIterator.hasNextTimeRange()) {
+ retPerAggrTimeRange = preAggrWindowIterator.nextTimeRange();
+ }
if (retPerAggrTimeRange != null) {
- curPreAggrStartTime = retPerAggrTimeRange.left;
- curPreAggrEndTime = retPerAggrTimeRange.right;
+ curPreAggrTimeRange = retPerAggrTimeRange;
} else {
- curPreAggrStartTime = -1;
- curPreAggrEndTime = -1;
+ curPreAggrTimeRange = new TimeRange(-1, -1);
}
}
diff --git a/server/src/main/java/org/apache/iotdb/db/query/dataset/groupby/GroupByWithValueFilterDataSet.java b/server/src/main/java/org/apache/iotdb/db/query/dataset/groupby/GroupByWithValueFilterDataSet.java
index 6034f22540..a5ab69bd91 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/dataset/groupby/GroupByWithValueFilterDataSet.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/dataset/groupby/GroupByWithValueFilterDataSet.java
@@ -194,10 +194,12 @@ public class GroupByWithValueFilterDataSet extends GroupByTimeEngineDataSet {
curAggregateResults = new AggregateResult[paths.size()];
for (SlidingWindowGroupByExecutor slidingWindowGroupByExecutor :
slidingWindowGroupByExecutors) {
- slidingWindowGroupByExecutor.setTimeRange(curStartTime, curEndTime);
+ slidingWindowGroupByExecutor.setTimeRange(
+ curAggrTimeRange.getMin(), curAggrTimeRange.getMax());
}
while (!isEndCal()) {
- AggregateResult[] aggregations = calcResult(curPreAggrStartTime, curPreAggrEndTime);
+ AggregateResult[] aggregations =
+ calcResult(curPreAggrTimeRange.getMin(), curPreAggrTimeRange.getMax());
for (int i = 0; i < aggregations.length; i++) {
slidingWindowGroupByExecutors[i].update(aggregations[i].clone());
}
diff --git a/server/src/main/java/org/apache/iotdb/db/query/dataset/groupby/GroupByWithoutValueFilterDataSet.java b/server/src/main/java/org/apache/iotdb/db/query/dataset/groupby/GroupByWithoutValueFilterDataSet.java
index 1292e2c90f..2561c8c412 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/dataset/groupby/GroupByWithoutValueFilterDataSet.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/dataset/groupby/GroupByWithoutValueFilterDataSet.java
@@ -194,7 +194,8 @@ public class GroupByWithoutValueFilterDataSet extends GroupByTimeEngineDataSet {
curAggregateResults = new AggregateResult[paths.size()];
for (SlidingWindowGroupByExecutor slidingWindowGroupByExecutor :
slidingWindowGroupByExecutors) {
- slidingWindowGroupByExecutor.setTimeRange(curStartTime, curEndTime);
+ slidingWindowGroupByExecutor.setTimeRange(
+ curAggrTimeRange.getMin(), curAggrTimeRange.getMax());
}
try {
while (!isEndCal()) {
@@ -204,7 +205,8 @@ public class GroupByWithoutValueFilterDataSet extends GroupByTimeEngineDataSet {
List<Integer> indexes = entry.getValue();
GroupByExecutor groupByExecutor = pathExecutors.get(path);
List<AggregateResult> aggregations =
- groupByExecutor.calcResult(curPreAggrStartTime, curPreAggrEndTime);
+ groupByExecutor.calcResult(
+ curPreAggrTimeRange.getMin(), curPreAggrTimeRange.getMax());
for (int i = 0; i < aggregations.size(); i++) {
int resultIndex = indexes.get(i);
slidingWindowGroupByExecutors[resultIndex].update(aggregations.get(i).clone());
@@ -217,7 +219,8 @@ public class GroupByWithoutValueFilterDataSet extends GroupByTimeEngineDataSet {
List<List<Integer>> indexesList = entry.getValue();
AlignedGroupByExecutor groupByExecutor = alignedPathExecutors.get(path);
List<List<AggregateResult>> aggregationsList =
- groupByExecutor.calcAlignedResult(curPreAggrStartTime, curPreAggrEndTime);
+ groupByExecutor.calcAlignedResult(
+ curPreAggrTimeRange.getMin(), curPreAggrTimeRange.getMax());
for (int i = 0; i < path.getMeasurementList().size(); i++) {
List<AggregateResult> aggregations = aggregationsList.get(i);
List<Integer> indexes = indexesList.get(i);
diff --git a/server/src/main/java/org/apache/iotdb/db/query/reader/series/SeriesReader.java b/server/src/main/java/org/apache/iotdb/db/query/reader/series/SeriesReader.java
index 136ce73968..87a9741a4a 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/reader/series/SeriesReader.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/reader/series/SeriesReader.java
@@ -331,6 +331,9 @@ public class SeriesReader {
if (firstChunkMetadata != null) {
return true;
+ // hasNextFile() has not been invoked
+ } else if (firstTimeSeriesMetadata == null && cachedChunkMetadata.isEmpty()) {
+ return false;
}
while (firstChunkMetadata == null && (!cachedChunkMetadata.isEmpty() || hasNextFile())) {
diff --git a/server/src/main/java/org/apache/iotdb/db/utils/timerangeiterator/AggrWindowIterator.java b/server/src/main/java/org/apache/iotdb/db/utils/timerangeiterator/AggrWindowIterator.java
index d4e17d1c32..a86383bdfe 100644
--- a/server/src/main/java/org/apache/iotdb/db/utils/timerangeiterator/AggrWindowIterator.java
+++ b/server/src/main/java/org/apache/iotdb/db/utils/timerangeiterator/AggrWindowIterator.java
@@ -20,7 +20,7 @@
package org.apache.iotdb.db.utils.timerangeiterator;
import org.apache.iotdb.db.qp.utils.DatetimeUtils;
-import org.apache.iotdb.tsfile.utils.Pair;
+import org.apache.iotdb.tsfile.read.common.TimeRange;
import static org.apache.iotdb.db.qp.utils.DatetimeUtils.MS_TO_MONTH;
@@ -35,7 +35,6 @@ public class AggrWindowIterator implements ITimeRangeIterator {
// total query [startTime, endTime)
private final long startTime;
private final long endTime;
-
private final long interval;
private final long slidingStep;
@@ -43,6 +42,8 @@ public class AggrWindowIterator implements ITimeRangeIterator {
private final boolean isSlidingStepByMonth;
private final boolean isIntervalByMonth;
+ private TimeRange curTimeRange;
+
public AggrWindowIterator(
long startTime,
long endTime,
@@ -61,7 +62,7 @@ public class AggrWindowIterator implements ITimeRangeIterator {
}
@Override
- public Pair<Long, Long> getFirstTimeRange() {
+ public TimeRange getFirstTimeRange() {
if (isAscending) {
return getLeftmostTimeRange();
} else {
@@ -69,7 +70,7 @@ public class AggrWindowIterator implements ITimeRangeIterator {
}
}
- private Pair<Long, Long> getLeftmostTimeRange() {
+ private TimeRange getLeftmostTimeRange() {
long retEndTime;
if (isIntervalByMonth) {
// calculate interval length by natural month based on startTime
@@ -78,10 +79,10 @@ public class AggrWindowIterator implements ITimeRangeIterator {
} else {
retEndTime = Math.min(startTime + interval, endTime);
}
- return new Pair<>(startTime, retEndTime);
+ return new TimeRange(startTime, retEndTime);
}
- private Pair<Long, Long> getRightmostTimeRange() {
+ private TimeRange getRightmostTimeRange() {
long retStartTime;
long retEndTime;
long queryRange = endTime - startTime;
@@ -106,12 +107,18 @@ public class AggrWindowIterator implements ITimeRangeIterator {
} else {
retEndTime = Math.min(retStartTime + interval, endTime);
}
- return new Pair<>(retStartTime, retEndTime);
+ return new TimeRange(retStartTime, retEndTime);
}
@Override
- public Pair<Long, Long> getNextTimeRange(long curStartTime) {
+ public boolean hasNextTimeRange() {
+ if (curTimeRange == null) {
+ curTimeRange = getFirstTimeRange();
+ return true;
+ }
+
long retStartTime, retEndTime;
+ long curStartTime = curTimeRange.getMin();
if (isAscending) {
if (isSlidingStepByMonth) {
retStartTime = DatetimeUtils.calcIntervalByMonth(curStartTime, (int) (slidingStep));
@@ -120,7 +127,7 @@ public class AggrWindowIterator implements ITimeRangeIterator {
}
// This is an open interval , [0-100)
if (retStartTime >= endTime) {
- return null;
+ return false;
}
} else {
if (isSlidingStepByMonth) {
@@ -129,7 +136,7 @@ public class AggrWindowIterator implements ITimeRangeIterator {
retStartTime = curStartTime - slidingStep;
}
if (retStartTime < startTime) {
- return null;
+ return false;
}
}
@@ -139,7 +146,16 @@ public class AggrWindowIterator implements ITimeRangeIterator {
retEndTime = retStartTime + interval;
}
retEndTime = Math.min(retEndTime, endTime);
- return new Pair<>(retStartTime, retEndTime);
+ curTimeRange = new TimeRange(retStartTime, retEndTime);
+ return true;
+ }
+
+ @Override
+ public TimeRange nextTimeRange() {
+ if (curTimeRange != null || hasNextTimeRange()) {
+ return curTimeRange;
+ }
+ return null;
}
@Override
diff --git a/server/src/main/java/org/apache/iotdb/db/utils/timerangeiterator/ITimeRangeIterator.java b/server/src/main/java/org/apache/iotdb/db/utils/timerangeiterator/ITimeRangeIterator.java
index 91c5a337db..2a809557e1 100644
--- a/server/src/main/java/org/apache/iotdb/db/utils/timerangeiterator/ITimeRangeIterator.java
+++ b/server/src/main/java/org/apache/iotdb/db/utils/timerangeiterator/ITimeRangeIterator.java
@@ -19,19 +19,21 @@
package org.apache.iotdb.db.utils.timerangeiterator;
-import org.apache.iotdb.tsfile.utils.Pair;
+import org.apache.iotdb.tsfile.read.common.TimeRange;
/** This interface used for iteratively generating aggregated time windows in GROUP BY query. */
public interface ITimeRangeIterator {
/** return the first time range by sorting order */
- Pair<Long, Long> getFirstTimeRange();
+ TimeRange getFirstTimeRange();
+ /** @return whether current iterator has next time range */
+ boolean hasNextTimeRange();
/**
* return the next time range according to curStartTime (the start time of the last returned time
* range)
*/
- Pair<Long, Long> getNextTimeRange(long curStartTime);
+ TimeRange nextTimeRange();
boolean isAscending();
}
diff --git a/server/src/main/java/org/apache/iotdb/db/utils/timerangeiterator/PreAggrWindowIterator.java b/server/src/main/java/org/apache/iotdb/db/utils/timerangeiterator/PreAggrWindowIterator.java
index 62cc7b0297..87e520d030 100644
--- a/server/src/main/java/org/apache/iotdb/db/utils/timerangeiterator/PreAggrWindowIterator.java
+++ b/server/src/main/java/org/apache/iotdb/db/utils/timerangeiterator/PreAggrWindowIterator.java
@@ -19,7 +19,7 @@
package org.apache.iotdb.db.utils.timerangeiterator;
-import org.apache.iotdb.tsfile.utils.Pair;
+import org.apache.iotdb.tsfile.read.common.TimeRange;
/**
* This class iteratively generates pre-aggregated time windows.
@@ -32,7 +32,6 @@ public class PreAggrWindowIterator implements ITimeRangeIterator {
// total query [startTime, endTime)
private final long startTime;
private final long endTime;
-
private final long interval;
private final long slidingStep;
@@ -43,6 +42,8 @@ public class PreAggrWindowIterator implements ITimeRangeIterator {
private boolean isIntervalCyclicChange = false;
private int intervalCnt = 0;
+ private TimeRange curTimeRange;
+
public PreAggrWindowIterator(
long startTime, long endTime, long interval, long slidingStep, boolean isAscending) {
this.startTime = startTime;
@@ -54,7 +55,7 @@ public class PreAggrWindowIterator implements ITimeRangeIterator {
}
@Override
- public Pair<Long, Long> getFirstTimeRange() {
+ public TimeRange getFirstTimeRange() {
if (isAscending) {
return getLeftmostTimeRange();
} else {
@@ -62,13 +63,13 @@ public class PreAggrWindowIterator implements ITimeRangeIterator {
}
}
- private Pair<Long, Long> getLeftmostTimeRange() {
+ private TimeRange getLeftmostTimeRange() {
long retEndTime = Math.min(startTime + curInterval, endTime);
updateIntervalAndStep();
- return new Pair<>(startTime, retEndTime);
+ return new TimeRange(startTime, retEndTime);
}
- private Pair<Long, Long> getRightmostTimeRange() {
+ private TimeRange getRightmostTimeRange() {
long retStartTime;
long retEndTime;
long intervalNum = (long) Math.ceil((endTime - startTime) / (double) slidingStep);
@@ -79,27 +80,42 @@ public class PreAggrWindowIterator implements ITimeRangeIterator {
}
retEndTime = Math.min(retStartTime + curInterval, endTime);
updateIntervalAndStep();
- return new Pair<>(retStartTime, retEndTime);
+ return new TimeRange(retStartTime, retEndTime);
}
@Override
- public Pair<Long, Long> getNextTimeRange(long curStartTime) {
+ public boolean hasNextTimeRange() {
+ if (curTimeRange == null) {
+ curTimeRange = getFirstTimeRange();
+ return true;
+ }
+
long retStartTime, retEndTime;
+ long curStartTime = curTimeRange.getMin();
if (isAscending) {
retStartTime = curStartTime + curSlidingStep;
// This is an open interval , [0-100)
if (retStartTime >= endTime) {
- return null;
+ return false;
}
} else {
retStartTime = curStartTime - curSlidingStep;
if (retStartTime < startTime) {
- return null;
+ return false;
}
}
retEndTime = Math.min(retStartTime + curInterval, endTime);
updateIntervalAndStep();
- return new Pair<>(retStartTime, retEndTime);
+ curTimeRange = new TimeRange(retStartTime, retEndTime);
+ return true;
+ }
+
+ @Override
+ public TimeRange nextTimeRange() {
+ if (curTimeRange != null || hasNextTimeRange()) {
+ return curTimeRange;
+ }
+ return null;
}
private void initIntervalAndStep() {
diff --git a/server/src/main/java/org/apache/iotdb/db/utils/timerangeiterator/PreAggrWindowWithNaturalMonthIterator.java b/server/src/main/java/org/apache/iotdb/db/utils/timerangeiterator/PreAggrWindowWithNaturalMonthIterator.java
index a5426bcf23..f56d288119 100644
--- a/server/src/main/java/org/apache/iotdb/db/utils/timerangeiterator/PreAggrWindowWithNaturalMonthIterator.java
+++ b/server/src/main/java/org/apache/iotdb/db/utils/timerangeiterator/PreAggrWindowWithNaturalMonthIterator.java
@@ -20,7 +20,7 @@
package org.apache.iotdb.db.utils.timerangeiterator;
import org.apache.iotdb.db.utils.datastructure.TimeSelector;
-import org.apache.iotdb.tsfile.utils.Pair;
+import org.apache.iotdb.tsfile.read.common.TimeRange;
public class PreAggrWindowWithNaturalMonthIterator implements ITimeRangeIterator {
@@ -33,6 +33,7 @@ public class PreAggrWindowWithNaturalMonthIterator implements ITimeRangeIterator
private long curStartTimeForIterator;
private long lastEndTime;
+ private TimeRange curTimeRange;
public PreAggrWindowWithNaturalMonthIterator(
long startTime,
@@ -57,48 +58,61 @@ public class PreAggrWindowWithNaturalMonthIterator implements ITimeRangeIterator
}
@Override
- public Pair<Long, Long> getFirstTimeRange() {
+ public TimeRange getFirstTimeRange() {
long retStartTime = timeBoundaryHeap.pollFirst();
lastEndTime = timeBoundaryHeap.first();
- return new Pair<>(retStartTime, lastEndTime);
+ return new TimeRange(retStartTime, lastEndTime);
}
@Override
- public Pair<Long, Long> getNextTimeRange(long curStartTime) {
+ public boolean hasNextTimeRange() {
+ if (curTimeRange == null) {
+ curTimeRange = getFirstTimeRange();
+ return true;
+ }
+
if (lastEndTime >= curStartTimeForIterator) {
tryToExpandHeap();
}
if (timeBoundaryHeap.isEmpty()) {
- return null;
+ return false;
}
long retStartTime = timeBoundaryHeap.pollFirst();
if (retStartTime >= curStartTimeForIterator) {
tryToExpandHeap();
}
if (timeBoundaryHeap.isEmpty()) {
- return null;
+ return false;
}
lastEndTime = timeBoundaryHeap.first();
- return new Pair<>(retStartTime, lastEndTime);
+ curTimeRange = new TimeRange(retStartTime, lastEndTime);
+ return true;
+ }
+
+ @Override
+ public TimeRange nextTimeRange() {
+ if (curTimeRange != null || hasNextTimeRange()) {
+ return curTimeRange;
+ }
+ return null;
}
private void initHeap() {
- Pair<Long, Long> firstTimeRange = aggrWindowIterator.getFirstTimeRange();
- timeBoundaryHeap.add(firstTimeRange.left);
- timeBoundaryHeap.add(firstTimeRange.right);
- curStartTimeForIterator = firstTimeRange.left;
+ TimeRange firstTimeRange = aggrWindowIterator.nextTimeRange();
+ timeBoundaryHeap.add(firstTimeRange.getMin());
+ timeBoundaryHeap.add(firstTimeRange.getMax());
+ curStartTimeForIterator = firstTimeRange.getMin();
tryToExpandHeap();
}
private void tryToExpandHeap() {
- Pair<Long, Long> curTimeRange = aggrWindowIterator.getNextTimeRange(curStartTimeForIterator);
- while (curTimeRange != null && timeBoundaryHeap.size() < HEAP_MAX_SIZE) {
- timeBoundaryHeap.add(curTimeRange.left);
- timeBoundaryHeap.add(curTimeRange.right);
- curStartTimeForIterator = curTimeRange.left;
-
- curTimeRange = aggrWindowIterator.getNextTimeRange(curStartTimeForIterator);
+ TimeRange timeRangeToExpand = null;
+ while (aggrWindowIterator.hasNextTimeRange() && timeBoundaryHeap.size() < HEAP_MAX_SIZE) {
+ timeRangeToExpand = aggrWindowIterator.nextTimeRange();
+ timeBoundaryHeap.add(timeRangeToExpand.getMin());
+ timeBoundaryHeap.add(timeRangeToExpand.getMax());
+ curStartTimeForIterator = timeRangeToExpand.getMin();
}
}
diff --git a/server/src/main/java/org/apache/iotdb/db/utils/timerangeiterator/SingleTimeWindowIterator.java b/server/src/main/java/org/apache/iotdb/db/utils/timerangeiterator/SingleTimeWindowIterator.java
new file mode 100644
index 0000000000..3470f98834
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/utils/timerangeiterator/SingleTimeWindowIterator.java
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.utils.timerangeiterator;
+
+import org.apache.iotdb.tsfile.read.common.TimeRange;
+
+/** Used for aggregation with only one time window. i.e. Aggregation without group by. */
+public class SingleTimeWindowIterator implements ITimeRangeIterator {
+
+ // total query [startTime, endTime)
+ private final long startTime;
+ private final long endTime;
+
+ private TimeRange curTimeRange;
+
+ public SingleTimeWindowIterator(long startTime, long endTime) {
+ this.startTime = startTime;
+ this.endTime = endTime;
+ }
+
+ @Override
+ public TimeRange getFirstTimeRange() {
+ return new TimeRange(startTime, endTime);
+ }
+
+ @Override
+ public boolean hasNextTimeRange() {
+ if (curTimeRange == null) {
+ curTimeRange = getFirstTimeRange();
+ return true;
+ } else {
+ return false;
+ }
+ }
+
+ @Override
+ public TimeRange nextTimeRange() {
+ if (curTimeRange != null || hasNextTimeRange()) {
+ return curTimeRange;
+ }
+ return null;
+ }
+
+ @Override
+ public boolean isAscending() {
+ return false;
+ }
+}
diff --git a/server/src/test/java/org/apache/iotdb/db/mpp/operator/SeriesAggregateScanOperatorTest.java b/server/src/test/java/org/apache/iotdb/db/mpp/operator/SeriesAggregateScanOperatorTest.java
new file mode 100644
index 0000000000..d1c71f2eb4
--- /dev/null
+++ b/server/src/test/java/org/apache/iotdb/db/mpp/operator/SeriesAggregateScanOperatorTest.java
@@ -0,0 +1,373 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.db.mpp.operator;
+
+import org.apache.iotdb.db.engine.querycontext.QueryDataSource;
+import org.apache.iotdb.db.engine.storagegroup.TsFileResource;
+import org.apache.iotdb.db.exception.metadata.IllegalPathException;
+import org.apache.iotdb.db.exception.metadata.MetadataException;
+import org.apache.iotdb.db.metadata.path.MeasurementPath;
+import org.apache.iotdb.db.mpp.common.FragmentInstanceId;
+import org.apache.iotdb.db.mpp.common.PlanFragmentId;
+import org.apache.iotdb.db.mpp.common.QueryId;
+import org.apache.iotdb.db.mpp.execution.FragmentInstanceContext;
+import org.apache.iotdb.db.mpp.execution.FragmentInstanceState;
+import org.apache.iotdb.db.mpp.operator.source.SeriesAggregateScanOperator;
+import org.apache.iotdb.db.mpp.operator.source.SeriesScanOperator;
+import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNodeId;
+import org.apache.iotdb.db.mpp.sql.statement.component.GroupByTimeComponent;
+import org.apache.iotdb.db.query.aggregation.AggregationType;
+import org.apache.iotdb.db.query.reader.series.SeriesReaderTestUtil;
+import org.apache.iotdb.tsfile.exception.write.WriteProcessException;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+import org.apache.iotdb.tsfile.read.common.block.TsBlock;
+import org.apache.iotdb.tsfile.read.filter.TimeFilter;
+import org.apache.iotdb.tsfile.read.filter.basic.Filter;
+import org.apache.iotdb.tsfile.read.filter.operator.AndFilter;
+import org.apache.iotdb.tsfile.write.schema.MeasurementSchema;
+
+import com.google.common.collect.Sets;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.atomic.AtomicReference;
+
+import static org.junit.Assert.assertEquals;
+
+public class SeriesAggregateScanOperatorTest {
+
+ private static final String SERIES_SCAN_OPERATOR_TEST_SG = "root.SeriesScanOperatorTest";
+ private final List<String> deviceIds = new ArrayList<>();
+ private final List<MeasurementSchema> measurementSchemas = new ArrayList<>();
+
+ private final List<TsFileResource> seqResources = new ArrayList<>();
+ private final List<TsFileResource> unSeqResources = new ArrayList<>();
+
+ @Before
+ public void setUp() throws MetadataException, IOException, WriteProcessException {
+ SeriesReaderTestUtil.setUp(
+ measurementSchemas, deviceIds, seqResources, unSeqResources, SERIES_SCAN_OPERATOR_TEST_SG);
+ }
+
+ @After
+ public void tearDown() throws IOException {
+ SeriesReaderTestUtil.tearDown(seqResources, unSeqResources);
+ }
+
+ @Test
+ public void testAggregationWithoutTimeFilter() throws IllegalPathException {
+ SeriesAggregateScanOperator seriesAggregateScanOperator =
+ initSeriesAggregateScanOperator(
+ Collections.singletonList(AggregationType.COUNT), null, true, null);
+ int count = 0;
+ while (seriesAggregateScanOperator.hasNext()) {
+ TsBlock resultTsBlock = seriesAggregateScanOperator.next();
+ assertEquals(500, resultTsBlock.getColumn(0).getLong(0));
+ count++;
+ }
+ assertEquals(1, count);
+ }
+
+ @Test
+ public void testMultiAggregationFuncWithoutTimeFilter1() throws IllegalPathException {
+ List<AggregationType> aggregationTypes = new ArrayList<>();
+ aggregationTypes.add(AggregationType.COUNT);
+ aggregationTypes.add(AggregationType.SUM);
+ SeriesAggregateScanOperator seriesAggregateScanOperator =
+ initSeriesAggregateScanOperator(aggregationTypes, null, true, null);
+ int count = 0;
+ while (seriesAggregateScanOperator.hasNext()) {
+ TsBlock resultTsBlock = seriesAggregateScanOperator.next();
+ assertEquals(500, resultTsBlock.getColumn(0).getLong(0));
+ assertEquals(6524750.0, resultTsBlock.getColumn(1).getDouble(0), 0.0001);
+ count++;
+ }
+ assertEquals(1, count);
+ }
+
+ @Test
+ public void testMultiAggregationFuncWithoutTimeFilter2() throws IllegalPathException {
+ List<AggregationType> aggregationTypes = new ArrayList<>();
+ aggregationTypes.add(AggregationType.FIRST_VALUE);
+ aggregationTypes.add(AggregationType.LAST_VALUE);
+ aggregationTypes.add(AggregationType.MIN_TIME);
+ aggregationTypes.add(AggregationType.MAX_TIME);
+ aggregationTypes.add(AggregationType.MAX_VALUE);
+ aggregationTypes.add(AggregationType.MIN_VALUE);
+ SeriesAggregateScanOperator seriesAggregateScanOperator =
+ initSeriesAggregateScanOperator(aggregationTypes, null, true, null);
+ int count = 0;
+ while (seriesAggregateScanOperator.hasNext()) {
+ TsBlock resultTsBlock = seriesAggregateScanOperator.next();
+ assertEquals(20000, resultTsBlock.getColumn(0).getInt(0));
+ assertEquals(10499, resultTsBlock.getColumn(1).getInt(0));
+ assertEquals(0, resultTsBlock.getColumn(2).getLong(0));
+ assertEquals(499, resultTsBlock.getColumn(3).getLong(0));
+ assertEquals(20199, resultTsBlock.getColumn(4).getInt(0));
+ assertEquals(260, resultTsBlock.getColumn(5).getInt(0));
+ count++;
+ }
+ assertEquals(1, count);
+ }
+
+ @Test
+ public void testAggregationWithTimeFilter1() throws IllegalPathException {
+ Filter timeFilter = TimeFilter.gtEq(120);
+ SeriesAggregateScanOperator seriesAggregateScanOperator =
+ initSeriesAggregateScanOperator(
+ Collections.singletonList(AggregationType.COUNT), timeFilter, true, null);
+ int count = 0;
+ while (seriesAggregateScanOperator.hasNext()) {
+ TsBlock resultTsBlock = seriesAggregateScanOperator.next();
+ assertEquals(resultTsBlock.getColumn(0).getLong(0), 380);
+ count++;
+ }
+ assertEquals(1, count);
+ }
+
+ @Test
+ public void testAggregationWithTimeFilter2() throws IllegalPathException {
+ Filter timeFilter = TimeFilter.ltEq(379);
+ SeriesAggregateScanOperator seriesAggregateScanOperator =
+ initSeriesAggregateScanOperator(
+ Collections.singletonList(AggregationType.COUNT), timeFilter, true, null);
+ int count = 0;
+ while (seriesAggregateScanOperator.hasNext()) {
+ TsBlock resultTsBlock = seriesAggregateScanOperator.next();
+ assertEquals(resultTsBlock.getColumn(0).getLong(0), 380);
+ count++;
+ }
+ assertEquals(1, count);
+ }
+
+ @Test
+ public void testAggregationWithTimeFilter3() throws IllegalPathException {
+ Filter timeFilter = new AndFilter(TimeFilter.gtEq(100), TimeFilter.ltEq(399));
+ SeriesAggregateScanOperator seriesAggregateScanOperator =
+ initSeriesAggregateScanOperator(
+ Collections.singletonList(AggregationType.COUNT), timeFilter, true, null);
+ int count = 0;
+ while (seriesAggregateScanOperator.hasNext()) {
+ TsBlock resultTsBlock = seriesAggregateScanOperator.next();
+ assertEquals(resultTsBlock.getColumn(0).getLong(0), 300);
+ count++;
+ }
+ assertEquals(1, count);
+ }
+
+ @Test
+ public void testMultiAggregationWithTimeFilter() throws IllegalPathException {
+ List<AggregationType> aggregationTypes = new ArrayList<>();
+ aggregationTypes.add(AggregationType.FIRST_VALUE);
+ aggregationTypes.add(AggregationType.LAST_VALUE);
+ aggregationTypes.add(AggregationType.MIN_TIME);
+ aggregationTypes.add(AggregationType.MAX_TIME);
+ aggregationTypes.add(AggregationType.MAX_VALUE);
+ aggregationTypes.add(AggregationType.MIN_VALUE);
+ Filter timeFilter = new AndFilter(TimeFilter.gtEq(100), TimeFilter.ltEq(399));
+ SeriesAggregateScanOperator seriesAggregateScanOperator =
+ initSeriesAggregateScanOperator(aggregationTypes, timeFilter, true, null);
+ int count = 0;
+ while (seriesAggregateScanOperator.hasNext()) {
+ TsBlock resultTsBlock = seriesAggregateScanOperator.next();
+ assertEquals(20100, resultTsBlock.getColumn(0).getInt(0));
+ assertEquals(399, resultTsBlock.getColumn(1).getInt(0));
+ assertEquals(100, resultTsBlock.getColumn(2).getLong(0));
+ assertEquals(399, resultTsBlock.getColumn(3).getLong(0));
+ assertEquals(20199, resultTsBlock.getColumn(4).getInt(0));
+ assertEquals(260, resultTsBlock.getColumn(5).getInt(0));
+ count++;
+ }
+ assertEquals(1, count);
+ }
+
+ @Test
+ public void testGroupByWithoutGlobalTimeFilter() throws IllegalPathException {
+ int[] result = new int[] {100, 100, 100, 100};
+ GroupByTimeComponent groupByTimeParameter = new GroupByTimeComponent(0, 399, 100, 100, true);
+ SeriesAggregateScanOperator seriesAggregateScanOperator =
+ initSeriesAggregateScanOperator(
+ Collections.singletonList(AggregationType.COUNT), null, true, groupByTimeParameter);
+ int count = 0;
+ while (seriesAggregateScanOperator.hasNext()) {
+ TsBlock resultTsBlock = seriesAggregateScanOperator.next();
+ assertEquals(100 * count, resultTsBlock.getTimeColumn().getLong(0));
+ assertEquals(result[count], resultTsBlock.getColumn(0).getLong(0));
+ count++;
+ }
+ assertEquals(4, count);
+ }
+
+ @Test
+ public void testGroupByWithGlobalTimeFilter() throws IllegalPathException {
+ int[] result = new int[] {0, 80, 100, 80};
+ Filter timeFilter = new AndFilter(TimeFilter.gtEq(120), TimeFilter.ltEq(379));
+ GroupByTimeComponent groupByTimeParameter = new GroupByTimeComponent(0, 399, 100, 100, true);
+ SeriesAggregateScanOperator seriesAggregateScanOperator =
+ initSeriesAggregateScanOperator(
+ Collections.singletonList(AggregationType.COUNT),
+ timeFilter,
+ true,
+ groupByTimeParameter);
+ int count = 0;
+ while (seriesAggregateScanOperator.hasNext()) {
+ TsBlock resultTsBlock = seriesAggregateScanOperator.next();
+ assertEquals(100 * count, resultTsBlock.getTimeColumn().getLong(0));
+ assertEquals(result[count], resultTsBlock.getColumn(0).getLong(0));
+ count++;
+ }
+ assertEquals(4, count);
+ }
+
+ @Test
+ public void testGroupByWithMultiFunction() throws IllegalPathException {
+ int[][] result =
+ new int[][] {
+ {20000, 20100, 10200, 10300},
+ {20099, 20199, 299, 399},
+ {20099, 20199, 10259, 10379},
+ {20000, 20100, 260, 380}
+ };
+ List<AggregationType> aggregationTypes = new ArrayList<>();
+ aggregationTypes.add(AggregationType.FIRST_VALUE);
+ aggregationTypes.add(AggregationType.LAST_VALUE);
+ aggregationTypes.add(AggregationType.MAX_VALUE);
+ aggregationTypes.add(AggregationType.MIN_VALUE);
+ GroupByTimeComponent groupByTimeParameter = new GroupByTimeComponent(0, 399, 100, 100, true);
+ SeriesAggregateScanOperator seriesAggregateScanOperator =
+ initSeriesAggregateScanOperator(aggregationTypes, null, true, groupByTimeParameter);
+ int count = 0;
+ while (seriesAggregateScanOperator.hasNext()) {
+ TsBlock resultTsBlock = seriesAggregateScanOperator.next();
+ assertEquals(100 * count, resultTsBlock.getTimeColumn().getLong(0));
+ assertEquals(result[0][count], resultTsBlock.getColumn(0).getInt(0));
+ assertEquals(result[1][count], resultTsBlock.getColumn(1).getInt(0));
+ assertEquals(result[2][count], resultTsBlock.getColumn(2).getInt(0));
+ assertEquals(result[3][count], resultTsBlock.getColumn(3).getInt(0));
+ count++;
+ }
+ assertEquals(4, count);
+ }
+
+ @Test
+ public void testGroupBySlidingTimeWindow() throws IllegalPathException {
+ int[] result = new int[] {50, 50, 50, 50, 50, 50, 50, 50};
+ GroupByTimeComponent groupByTimeParameter = new GroupByTimeComponent(0, 399, 100, 50, true);
+ SeriesAggregateScanOperator seriesAggregateScanOperator =
+ initSeriesAggregateScanOperator(
+ Collections.singletonList(AggregationType.COUNT), null, true, groupByTimeParameter);
+ int count = 0;
+ while (seriesAggregateScanOperator.hasNext()) {
+ TsBlock resultTsBlock = seriesAggregateScanOperator.next();
+ assertEquals(50 * count, resultTsBlock.getTimeColumn().getLong(0));
+ assertEquals(result[count], resultTsBlock.getColumn(0).getLong(0));
+ count++;
+ }
+ assertEquals(result.length, count);
+ }
+
+ @Test
+ public void testGroupBySlidingTimeWindow2() throws IllegalPathException {
+ int[] timeColumn = new int[] {0, 20, 30, 50, 60, 80, 90, 110, 120, 140};
+ int[] result = new int[] {20, 10, 20, 10, 20, 10, 20, 10, 20, 9};
+ GroupByTimeComponent groupByTimeParameter = new GroupByTimeComponent(0, 149, 50, 30, true);
+ SeriesAggregateScanOperator seriesAggregateScanOperator =
+ initSeriesAggregateScanOperator(
+ Collections.singletonList(AggregationType.COUNT), null, true, groupByTimeParameter);
+ int count = 0;
+ while (seriesAggregateScanOperator.hasNext()) {
+ TsBlock resultTsBlock = seriesAggregateScanOperator.next();
+ assertEquals(timeColumn[count], resultTsBlock.getTimeColumn().getLong(0));
+ assertEquals(result[count], resultTsBlock.getColumn(0).getLong(0));
+ count++;
+ }
+ assertEquals(timeColumn.length, count);
+ }
+
+ @Test
+ public void testGroupBySlidingWindowWithMultiFunction() throws IllegalPathException {
+ int[] timeColumn = new int[] {0, 20, 30, 50, 60, 80, 90, 110, 120, 140};
+ int[][] result =
+ new int[][] {
+ {20000, 20020, 20030, 20050, 20060, 20080, 20090, 20110, 20120, 20140},
+ {20019, 20029, 20049, 20059, 20079, 20089, 20109, 20119, 20139, 20148},
+ {20019, 20029, 20049, 20059, 20079, 20089, 20109, 20119, 20139, 20148},
+ {20000, 20020, 20030, 20050, 20060, 20080, 20090, 20110, 20120, 20140}
+ };
+ List<AggregationType> aggregationTypes = new ArrayList<>();
+ aggregationTypes.add(AggregationType.FIRST_VALUE);
+ aggregationTypes.add(AggregationType.LAST_VALUE);
+ aggregationTypes.add(AggregationType.MAX_VALUE);
+ aggregationTypes.add(AggregationType.MIN_VALUE);
+ GroupByTimeComponent groupByTimeParameter = new GroupByTimeComponent(0, 149, 50, 30, true);
+ SeriesAggregateScanOperator seriesAggregateScanOperator =
+ initSeriesAggregateScanOperator(aggregationTypes, null, true, groupByTimeParameter);
+ int count = 0;
+ while (seriesAggregateScanOperator.hasNext()) {
+ TsBlock resultTsBlock = seriesAggregateScanOperator.next();
+ assertEquals(timeColumn[count], resultTsBlock.getTimeColumn().getLong(0));
+ assertEquals(result[0][count], resultTsBlock.getColumn(0).getInt(0));
+ assertEquals(result[1][count], resultTsBlock.getColumn(1).getInt(0));
+ assertEquals(result[2][count], resultTsBlock.getColumn(2).getInt(0));
+ assertEquals(result[3][count], resultTsBlock.getColumn(3).getInt(0));
+ count++;
+ }
+ assertEquals(timeColumn.length, count);
+ }
+
+ public SeriesAggregateScanOperator initSeriesAggregateScanOperator(
+ List<AggregationType> aggregateFuncList,
+ Filter timeFilter,
+ boolean ascending,
+ GroupByTimeComponent groupByTimeParameter)
+ throws IllegalPathException {
+ MeasurementPath measurementPath =
+ new MeasurementPath(SERIES_SCAN_OPERATOR_TEST_SG + ".device0.sensor0", TSDataType.INT32);
+ Set<String> allSensors = Sets.newHashSet("sensor0");
+ QueryId queryId = new QueryId("stub_query");
+ AtomicReference<FragmentInstanceState> state =
+ new AtomicReference<>(FragmentInstanceState.RUNNING);
+ FragmentInstanceContext fragmentInstanceContext =
+ new FragmentInstanceContext(
+ new FragmentInstanceId(new PlanFragmentId(queryId, 0), "stub-instance"), state);
+ PlanNodeId planNodeId = new PlanNodeId("1");
+ fragmentInstanceContext.addOperatorContext(
+ 1, planNodeId, SeriesScanOperator.class.getSimpleName());
+
+ SeriesAggregateScanOperator seriesAggregateScanOperator =
+ new SeriesAggregateScanOperator(
+ planNodeId,
+ measurementPath,
+ allSensors,
+ fragmentInstanceContext.getOperatorContexts().get(0),
+ aggregateFuncList,
+ timeFilter,
+ ascending,
+ groupByTimeParameter);
+ seriesAggregateScanOperator.initQueryDataSource(
+ new QueryDataSource(seqResources, unSeqResources));
+ return seriesAggregateScanOperator;
+ }
+}
diff --git a/server/src/test/java/org/apache/iotdb/db/mpp/operator/SeriesScanOperatorTest.java b/server/src/test/java/org/apache/iotdb/db/mpp/operator/SeriesScanOperatorTest.java
index 3793504970..78e6821bcd 100644
--- a/server/src/test/java/org/apache/iotdb/db/mpp/operator/SeriesScanOperatorTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/mpp/operator/SeriesScanOperatorTest.java
@@ -37,13 +37,13 @@ import org.apache.iotdb.tsfile.read.common.block.TsBlock;
import org.apache.iotdb.tsfile.read.common.block.column.IntColumn;
import org.apache.iotdb.tsfile.write.schema.MeasurementSchema;
+import com.google.common.collect.Sets;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import java.io.IOException;
import java.util.ArrayList;
-import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.concurrent.atomic.AtomicReference;
@@ -76,8 +76,7 @@ public class SeriesScanOperatorTest {
try {
MeasurementPath measurementPath =
new MeasurementPath(SERIES_SCAN_OPERATOR_TEST_SG + ".device0.sensor0", TSDataType.INT32);
- Set<String> allSensors = new HashSet<>();
- allSensors.add("sensor0");
+ Set<String> allSensors = Sets.newHashSet("sensor0");
QueryId queryId = new QueryId("stub_query");
AtomicReference<FragmentInstanceState> state =
new AtomicReference<>(FragmentInstanceState.RUNNING);
@@ -87,6 +86,7 @@ public class SeriesScanOperatorTest {
PlanNodeId planNodeId = new PlanNodeId("1");
fragmentInstanceContext.addOperatorContext(
1, planNodeId, SeriesScanOperator.class.getSimpleName());
+
SeriesScanOperator seriesScanOperator =
new SeriesScanOperator(
planNodeId,
diff --git a/server/src/test/java/org/apache/iotdb/db/mpp/sql/plan/QueryLogicalPlanUtil.java b/server/src/test/java/org/apache/iotdb/db/mpp/sql/plan/QueryLogicalPlanUtil.java
index 57792799ae..e902078fa6 100644
--- a/server/src/test/java/org/apache/iotdb/db/mpp/sql/plan/QueryLogicalPlanUtil.java
+++ b/server/src/test/java/org/apache/iotdb/db/mpp/sql/plan/QueryLogicalPlanUtil.java
@@ -231,6 +231,7 @@ public class QueryLogicalPlanUtil {
new SeriesAggregateScanNode(
new PlanNodeId("test_query_0"),
schemaMap.get("root.sg.d1.s1"),
+ Sets.newHashSet("s1"),
aggregationTypeList1,
OrderBy.TIMESTAMP_DESC,
timeFilter,
@@ -239,6 +240,7 @@ public class QueryLogicalPlanUtil {
new SeriesAggregateScanNode(
new PlanNodeId("test_query_1"),
schemaMap.get("root.sg.d1.s2"),
+ Sets.newHashSet("s2"),
aggregationTypeList2,
OrderBy.TIMESTAMP_DESC,
timeFilter,
@@ -247,6 +249,7 @@ public class QueryLogicalPlanUtil {
new SeriesAggregateScanNode(
new PlanNodeId("test_query_2"),
schemaMap.get("root.sg.d2.s1"),
+ Sets.newHashSet("s1"),
aggregationTypeList1,
OrderBy.TIMESTAMP_DESC,
timeFilter,
@@ -255,6 +258,7 @@ public class QueryLogicalPlanUtil {
new SeriesAggregateScanNode(
new PlanNodeId("test_query_3"),
schemaMap.get("root.sg.d2.s2"),
+ Sets.newHashSet("s2"),
aggregationTypeList2,
OrderBy.TIMESTAMP_DESC,
timeFilter,
@@ -318,6 +322,7 @@ public class QueryLogicalPlanUtil {
new SeriesAggregateScanNode(
new PlanNodeId("test_query_0"),
schemaMap.get("root.sg.d1.s1"),
+ Sets.newHashSet("s1"),
aggregationTypeList1,
OrderBy.TIMESTAMP_DESC,
timeFilter,
@@ -326,6 +331,7 @@ public class QueryLogicalPlanUtil {
new SeriesAggregateScanNode(
new PlanNodeId("test_query_1"),
schemaMap.get("root.sg.d1.s2"),
+ Sets.newHashSet("s2"),
aggregationTypeList2,
OrderBy.TIMESTAMP_DESC,
timeFilter,
@@ -334,6 +340,7 @@ public class QueryLogicalPlanUtil {
new SeriesAggregateScanNode(
new PlanNodeId("test_query_2"),
schemaMap.get("root.sg.d2.s1"),
+ Sets.newHashSet("s1"),
aggregationTypeList1,
OrderBy.TIMESTAMP_DESC,
timeFilter,
@@ -342,6 +349,7 @@ public class QueryLogicalPlanUtil {
new SeriesAggregateScanNode(
new PlanNodeId("test_query_3"),
schemaMap.get("root.sg.d2.s2"),
+ Sets.newHashSet("s2"),
aggregationTypeList2,
OrderBy.TIMESTAMP_DESC,
timeFilter,
diff --git a/server/src/test/java/org/apache/iotdb/db/mpp/sql/plan/node/source/SeriesAggregateScanNodeSerdeTest.java b/server/src/test/java/org/apache/iotdb/db/mpp/sql/plan/node/source/SeriesAggregateScanNodeSerdeTest.java
index b66c3e4e9f..67fb28feab 100644
--- a/server/src/test/java/org/apache/iotdb/db/mpp/sql/plan/node/source/SeriesAggregateScanNodeSerdeTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/mpp/sql/plan/node/source/SeriesAggregateScanNodeSerdeTest.java
@@ -26,11 +26,13 @@ import org.apache.iotdb.db.metadata.path.MeasurementPath;
import org.apache.iotdb.db.mpp.sql.plan.node.PlanNodeDeserializeHelper;
import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNodeId;
import org.apache.iotdb.db.mpp.sql.planner.plan.node.source.SeriesAggregateScanNode;
+import org.apache.iotdb.db.mpp.sql.statement.component.GroupByTimeComponent;
import org.apache.iotdb.db.mpp.sql.statement.component.OrderBy;
import org.apache.iotdb.db.query.aggregation.AggregationType;
import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
import org.apache.iotdb.tsfile.read.filter.operator.In;
+import org.apache.commons.compress.utils.Sets;
import org.junit.Test;
import java.nio.ByteBuffer;
@@ -50,14 +52,17 @@ public class SeriesAggregateScanNodeSerdeTest {
st.add("s2");
List<AggregationType> aggregateFuncList = new ArrayList<>();
aggregateFuncList.add(AggregationType.MAX_TIME);
+ GroupByTimeComponent groupByTimeComponent =
+ new GroupByTimeComponent(1, 100, 1, 1, true, true, true);
SeriesAggregateScanNode seriesAggregateScanNode =
new SeriesAggregateScanNode(
new PlanNodeId("TestSeriesAggregateScanNode"),
new MeasurementPath("root.sg.d1.s1", TSDataType.BOOLEAN),
+ Sets.newHashSet("s1"),
aggregateFuncList,
OrderBy.TIMESTAMP_ASC,
new In<String>(st, VALUE_FILTER, true),
- null);
+ groupByTimeComponent);
seriesAggregateScanNode.setRegionReplicaSet(
new RegionReplicaSet(new DataRegionId(1), new ArrayList<>()));
diff --git a/server/src/test/java/org/apache/iotdb/db/query/dataset/groupby/GroupByTimeDataSetTest.java b/server/src/test/java/org/apache/iotdb/db/query/dataset/groupby/GroupByTimeDataSetTest.java
index 8dfe2245bf..bde500b3f1 100644
--- a/server/src/test/java/org/apache/iotdb/db/query/dataset/groupby/GroupByTimeDataSetTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/query/dataset/groupby/GroupByTimeDataSetTest.java
@@ -20,7 +20,7 @@ package org.apache.iotdb.db.query.dataset.groupby;
import org.apache.iotdb.db.qp.physical.crud.GroupByTimePlan;
import org.apache.iotdb.db.query.aggregation.impl.CountAggrResult;
-import org.apache.iotdb.tsfile.utils.Pair;
+import org.apache.iotdb.tsfile.read.common.TimeRange;
import org.junit.Assert;
import org.junit.Test;
@@ -55,9 +55,9 @@ public class GroupByTimeDataSetTest {
GroupByTimeDataSet groupByEngine = new GroupByWithValueFilterDataSet(queryId, groupByTimePlan);
int cnt = 0;
while (groupByEngine.hasNext()) {
- Pair pair = groupByEngine.nextTimePartition();
- Assert.assertEquals(startTimeArray[cnt], pair.left);
- Assert.assertEquals(endTimeArray[cnt], pair.right);
+ TimeRange timeRange = groupByEngine.nextTimePartition();
+ Assert.assertEquals(startTimeArray[cnt], timeRange.getMin());
+ Assert.assertEquals(endTimeArray[cnt], timeRange.getMax());
cnt++;
}
Assert.assertEquals(startTimeArray.length, cnt);
@@ -83,9 +83,9 @@ public class GroupByTimeDataSetTest {
GroupByTimeDataSet groupByEngine = new GroupByWithValueFilterDataSet(queryId, groupByTimePlan);
int cnt = 0;
while (groupByEngine.hasNext()) {
- Pair pair = groupByEngine.nextTimePartition();
- Assert.assertEquals(startTimeArray[cnt], pair.left);
- Assert.assertEquals(endTimeArray[cnt], pair.right);
+ TimeRange timeRange = groupByEngine.nextTimePartition();
+ Assert.assertEquals(startTimeArray[cnt], timeRange.getMin());
+ Assert.assertEquals(endTimeArray[cnt], timeRange.getMax());
cnt++;
}
Assert.assertEquals(startTimeArray.length, cnt);
@@ -111,9 +111,9 @@ public class GroupByTimeDataSetTest {
GroupByTimeDataSet groupByEngine = new GroupByWithValueFilterDataSet(queryId, groupByTimePlan);
int cnt = 0;
while (groupByEngine.hasNext()) {
- Pair pair = groupByEngine.nextTimePartition();
- Assert.assertEquals(startTimeArray[cnt], pair.left);
- Assert.assertEquals(endTimeArray[cnt], pair.right);
+ TimeRange timeRange = groupByEngine.nextTimePartition();
+ Assert.assertEquals(startTimeArray[cnt], timeRange.getMin());
+ Assert.assertEquals(endTimeArray[cnt], timeRange.getMax());
cnt++;
}
Assert.assertEquals(startTimeArray.length, cnt);
@@ -141,9 +141,9 @@ public class GroupByTimeDataSetTest {
GroupByTimeDataSet groupByEngine = new GroupByWithValueFilterDataSet(queryId, groupByTimePlan);
int cnt = 0;
while (groupByEngine.hasNext()) {
- Pair pair = groupByEngine.nextTimePartition();
- Assert.assertEquals(startTimeArray[cnt], pair.left);
- Assert.assertEquals(endTimeArray[cnt], pair.right);
+ TimeRange timeRange = groupByEngine.nextTimePartition();
+ Assert.assertEquals(startTimeArray[cnt], timeRange.getMin());
+ Assert.assertEquals(endTimeArray[cnt], timeRange.getMax());
cnt++;
}
Assert.assertEquals(startTimeArray.length, cnt);
@@ -170,9 +170,9 @@ public class GroupByTimeDataSetTest {
GroupByTimeDataSet groupByEngine = new GroupByWithValueFilterDataSet(queryId, groupByTimePlan);
int cnt = 0;
while (groupByEngine.hasNext()) {
- Pair pair = groupByEngine.nextTimePartition();
- Assert.assertEquals(startTimeArray[cnt], pair.left);
- Assert.assertEquals(endTimeArray[cnt], pair.right);
+ TimeRange timeRange = groupByEngine.nextTimePartition();
+ Assert.assertEquals(startTimeArray[cnt], timeRange.getMin());
+ Assert.assertEquals(endTimeArray[cnt], timeRange.getMax());
cnt++;
}
Assert.assertEquals(startTimeArray.length, cnt);
@@ -202,10 +202,10 @@ public class GroupByTimeDataSetTest {
GroupByTimeDataSet groupByEngine = new GroupByWithValueFilterDataSet(queryId, groupByTimePlan);
int cnt = 0;
while (groupByEngine.hasNext()) {
- Pair pair = groupByEngine.nextTimePartition();
+ TimeRange timeRange = groupByEngine.nextTimePartition();
Assert.assertTrue(cnt < startTimeArray.length);
- Assert.assertEquals(startTimeArray[cnt], pair.left);
- Assert.assertEquals(endTimeArray[cnt], pair.right);
+ Assert.assertEquals(startTimeArray[cnt], timeRange.getMin());
+ Assert.assertEquals(endTimeArray[cnt], timeRange.getMax());
cnt++;
}
Assert.assertEquals(startTimeArray.length, cnt);
@@ -240,10 +240,10 @@ public class GroupByTimeDataSetTest {
int cnt = 0;
while (groupByEngine.hasNext()) {
- Pair pair = groupByEngine.nextTimePartition();
+ TimeRange timeRange = groupByEngine.nextTimePartition();
Assert.assertTrue(cnt < startTimeArray.length);
- Assert.assertEquals(startTimeArray[cnt], df.format(new Date((long) pair.left)));
- Assert.assertEquals(endTimeArray[cnt], df.format(new Date((long) pair.right)));
+ Assert.assertEquals(startTimeArray[cnt], df.format(new Date((long) timeRange.getMin())));
+ Assert.assertEquals(endTimeArray[cnt], df.format(new Date((long) timeRange.getMax())));
cnt++;
}
@@ -294,10 +294,10 @@ public class GroupByTimeDataSetTest {
int cnt = 0;
while (groupByEngine.hasNext()) {
- Pair pair = groupByEngine.nextTimePartition();
+ TimeRange timeRange = groupByEngine.nextTimePartition();
Assert.assertTrue(cnt < startTimeArray.length);
- Assert.assertEquals(startTimeArray[cnt], df.format(new Date((long) pair.left)));
- Assert.assertEquals(endTimeArray[cnt], df.format(new Date((long) pair.right)));
+ Assert.assertEquals(startTimeArray[cnt], df.format(new Date((long) timeRange.getMin())));
+ Assert.assertEquals(endTimeArray[cnt], df.format(new Date((long) timeRange.getMax())));
cnt++;
}
@@ -334,10 +334,10 @@ public class GroupByTimeDataSetTest {
int cnt = 0;
while (groupByEngine.hasNext()) {
- Pair pair = groupByEngine.nextTimePartition();
+ TimeRange timeRange = groupByEngine.nextTimePartition();
Assert.assertTrue(cnt < startTimeArray.length);
- Assert.assertEquals(startTimeArray[cnt], df.format(new Date((long) pair.left)));
- Assert.assertEquals(endTimeArray[cnt], df.format(new Date((long) pair.right)));
+ Assert.assertEquals(startTimeArray[cnt], df.format(new Date((long) timeRange.getMin())));
+ Assert.assertEquals(endTimeArray[cnt], df.format(new Date((long) timeRange.getMax())));
cnt++;
}
Assert.assertEquals(startTimeArray.length, cnt);
@@ -388,10 +388,10 @@ public class GroupByTimeDataSetTest {
int cnt = 0;
while (groupByEngine.hasNext()) {
- Pair pair = groupByEngine.nextTimePartition();
+ TimeRange timeRange = groupByEngine.nextTimePartition();
Assert.assertTrue(cnt < startTimeArray.length);
- Assert.assertEquals(startTimeArray[cnt], df.format(new Date((long) pair.left)));
- Assert.assertEquals(endTimeArray[cnt], df.format(new Date((long) pair.right)));
+ Assert.assertEquals(startTimeArray[cnt], df.format(new Date((long) timeRange.getMin())));
+ Assert.assertEquals(endTimeArray[cnt], df.format(new Date((long) timeRange.getMax())));
cnt++;
}
Assert.assertEquals(startTimeArray.length, cnt);
@@ -442,10 +442,10 @@ public class GroupByTimeDataSetTest {
int cnt = 0;
while (groupByEngine.hasNext()) {
- Pair pair = groupByEngine.nextTimePartition();
+ TimeRange timeRange = groupByEngine.nextTimePartition();
Assert.assertTrue(cnt < startTimeArray.length);
- Assert.assertEquals(startTimeArray[cnt], df.format(new Date((long) pair.left)));
- Assert.assertEquals(endTimeArray[cnt], df.format(new Date((long) pair.right)));
+ Assert.assertEquals(startTimeArray[cnt], df.format(new Date((long) timeRange.getMin())));
+ Assert.assertEquals(endTimeArray[cnt], df.format(new Date((long) timeRange.getMax())));
cnt++;
}
Assert.assertEquals(startTimeArray.length, cnt);
@@ -481,10 +481,10 @@ public class GroupByTimeDataSetTest {
int cnt = 0;
while (groupByEngine.hasNext()) {
- Pair pair = groupByEngine.nextTimePartition();
+ TimeRange timeRange = groupByEngine.nextTimePartition();
Assert.assertTrue(cnt < startTimeArray.length);
- Assert.assertEquals(startTimeArray[cnt], df.format(new Date((long) pair.left)));
- Assert.assertEquals(endTimeArray[cnt], df.format(new Date((long) pair.right)));
+ Assert.assertEquals(startTimeArray[cnt], df.format(new Date((long) timeRange.getMin())));
+ Assert.assertEquals(endTimeArray[cnt], df.format(new Date((long) timeRange.getMax())));
cnt++;
}
Assert.assertEquals(startTimeArray.length, cnt);
diff --git a/server/src/test/java/org/apache/iotdb/db/query/reader/series/SeriesReaderTestUtil.java b/server/src/test/java/org/apache/iotdb/db/query/reader/series/SeriesReaderTestUtil.java
index aa61459632..15266f5852 100644
--- a/server/src/test/java/org/apache/iotdb/db/query/reader/series/SeriesReaderTestUtil.java
+++ b/server/src/test/java/org/apache/iotdb/db/query/reader/series/SeriesReaderTestUtil.java
@@ -49,6 +49,14 @@ import java.util.Map;
import static org.apache.iotdb.commons.conf.IoTDBConstant.PATH_SEPARATOR;
+/**
+ * This util contains 5 seqFiles and 5 unseqFiles in default.
+ *
+ * <p>Sequence time range of data: [0, 99], [100, 199], [200, 299], [300, 399], [400, 499]
+ *
+ * <p>UnSequence time range of data: [0, 19], [100, 139], [200, 259], [300, 379], [400, 499], [0,
+ * 199]
+ */
public class SeriesReaderTestUtil {
private static int seqFileNum = 5;
diff --git a/server/src/test/java/org/apache/iotdb/db/utils/TimeRangeIteratorTest.java b/server/src/test/java/org/apache/iotdb/db/utils/TimeRangeIteratorTest.java
index 2057ff50f2..a5927b7a18 100644
--- a/server/src/test/java/org/apache/iotdb/db/utils/TimeRangeIteratorTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/utils/TimeRangeIteratorTest.java
@@ -21,7 +21,7 @@ package org.apache.iotdb.db.utils;
import org.apache.iotdb.db.utils.timerangeiterator.ITimeRangeIterator;
import org.apache.iotdb.db.utils.timerangeiterator.TimeRangeIteratorFactory;
-import org.apache.iotdb.tsfile.utils.Pair;
+import org.apache.iotdb.tsfile.read.common.TimeRange;
import org.junit.Assert;
import org.junit.Test;
@@ -31,8 +31,17 @@ public class TimeRangeIteratorTest {
@Test
public void testNotSplitTimeRange() {
String[] res = {
- "<0,4>", "<3,7>", "<6,10>", "<9,13>", "<12,16>", "<15,19>", "<18,22>", "<21,25>", "<24,28>",
- "<27,31>", "<30,32>"
+ "[ 0 : 4 ]",
+ "[ 3 : 7 ]",
+ "[ 6 : 10 ]",
+ "[ 9 : 13 ]",
+ "[ 12 : 16 ]",
+ "[ 15 : 19 ]",
+ "[ 18 : 22 ]",
+ "[ 21 : 25 ]",
+ "[ 24 : 28 ]",
+ "[ 27 : 31 ]",
+ "[ 30 : 32 ]"
};
long startTime = 0, endTime = 32, interval = 4, slidingStep = 3;
@@ -53,25 +62,103 @@ public class TimeRangeIteratorTest {
@Test
public void testSplitTimeRange() {
String[] res4_1 = {
- "<0,1>", "<1,2>", "<2,3>", "<3,4>", "<4,5>", "<5,6>", "<6,7>", "<7,8>", "<8,9>", "<9,10>",
- "<10,11>", "<11,12>", "<12,13>", "<13,14>", "<14,15>", "<15,16>", "<16,17>", "<17,18>",
- "<18,19>", "<19,20>", "<20,21>", "<21,22>", "<22,23>", "<23,24>", "<24,25>", "<25,26>",
- "<26,27>", "<27,28>", "<28,29>", "<29,30>", "<30,31>", "<31,32>"
+ "[ 0 : 1 ]",
+ "[ 1 : 2 ]",
+ "[ 2 : 3 ]",
+ "[ 3 : 4 ]",
+ "[ 4 : 5 ]",
+ "[ 5 : 6 ]",
+ "[ 6 : 7 ]",
+ "[ 7 : 8 ]",
+ "[ 8 : 9 ]",
+ "[ 9 : 10 ]",
+ "[ 10 : 11 ]",
+ "[ 11 : 12 ]",
+ "[ 12 : 13 ]",
+ "[ 13 : 14 ]",
+ "[ 14 : 15 ]",
+ "[ 15 : 16 ]",
+ "[ 16 : 17 ]",
+ "[ 17 : 18 ]",
+ "[ 18 : 19 ]",
+ "[ 19 : 20 ]",
+ "[ 20 : 21 ]",
+ "[ 21 : 22 ]",
+ "[ 22 : 23 ]",
+ "[ 23 : 24 ]",
+ "[ 24 : 25 ]",
+ "[ 25 : 26 ]",
+ "[ 26 : 27 ]",
+ "[ 27 : 28 ]",
+ "[ 28 : 29 ]",
+ "[ 29 : 30 ]",
+ "[ 30 : 31 ]",
+ "[ 31 : 32 ]"
};
String[] res4_2 = {
- "<0,2>", "<2,4>", "<4,6>", "<6,8>", "<8,10>", "<10,12>", "<12,14>", "<14,16>", "<16,18>",
- "<18,20>", "<20,22>", "<22,24>", "<24,26>", "<26,28>", "<28,30>", "<30,32>"
+ "[ 0 : 2 ]",
+ "[ 2 : 4 ]",
+ "[ 4 : 6 ]",
+ "[ 6 : 8 ]",
+ "[ 8 : 10 ]",
+ "[ 10 : 12 ]",
+ "[ 12 : 14 ]",
+ "[ 14 : 16 ]",
+ "[ 16 : 18 ]",
+ "[ 18 : 20 ]",
+ "[ 20 : 22 ]",
+ "[ 22 : 24 ]",
+ "[ 24 : 26 ]",
+ "[ 26 : 28 ]",
+ "[ 28 : 30 ]",
+ "[ 30 : 32 ]"
};
String[] res4_3 = {
- "<0,1>", "<1,3>", "<3,4>", "<4,6>", "<6,7>", "<7,9>", "<9,10>", "<10,12>", "<12,13>",
- "<13,15>", "<15,16>", "<16,18>", "<18,19>", "<19,21>", "<21,22>", "<22,24>", "<24,25>",
- "<25,27>", "<27,28>", "<28,30>", "<30,31>", "<31,32>"
+ "[ 0 : 1 ]",
+ "[ 1 : 3 ]",
+ "[ 3 : 4 ]",
+ "[ 4 : 6 ]",
+ "[ 6 : 7 ]",
+ "[ 7 : 9 ]",
+ "[ 9 : 10 ]",
+ "[ 10 : 12 ]",
+ "[ 12 : 13 ]",
+ "[ 13 : 15 ]",
+ "[ 15 : 16 ]",
+ "[ 16 : 18 ]",
+ "[ 18 : 19 ]",
+ "[ 19 : 21 ]",
+ "[ 21 : 22 ]",
+ "[ 22 : 24 ]",
+ "[ 24 : 25 ]",
+ "[ 25 : 27 ]",
+ "[ 27 : 28 ]",
+ "[ 28 : 30 ]",
+ "[ 30 : 31 ]",
+ "[ 31 : 32 ]"
};
String[] res4_4 = {
- "<0,4>", "<4,8>", "<8,12>", "<12,16>", "<16,20>", "<20,24>", "<24,28>", "<28,32>"
+ "[ 0 : 4 ]",
+ "[ 4 : 8 ]",
+ "[ 8 : 12 ]",
+ "[ 12 : 16 ]",
+ "[ 16 : 20 ]",
+ "[ 20 : 24 ]",
+ "[ 24 : 28 ]",
+ "[ 28 : 32 ]"
+ };
+ String[] res4_5 = {
+ "[ 0 : 4 ]",
+ "[ 5 : 9 ]",
+ "[ 10 : 14 ]",
+ "[ 15 : 19 ]",
+ "[ 20 : 24 ]",
+ "[ 25 : 29 ]",
+ "[ 30 : 32 ]"
+ };
+ String[] res4_6 = {
+ "[ 0 : 4 ]", "[ 6 : 10 ]", "[ 12 : 16 ]", "[ 18 : 22 ]", "[ 24 : 28 ]", "[ 30 : 32 ]"
};
- String[] res4_5 = {"<0,4>", "<5,9>", "<10,14>", "<15,19>", "<20,24>", "<25,29>", "<30,32>"};
- String[] res4_6 = {"<0,4>", "<6,10>", "<12,16>", "<18,22>", "<24,28>", "<30,32>"};
checkRes(
TimeRangeIteratorFactory.getTimeRangeIterator(0, 32, 4, 1, true, false, false, true),
@@ -114,62 +201,62 @@ public class TimeRangeIteratorTest {
@Test
public void testNaturalMonthTimeRange() {
String[] res1 = {
- "<1604102400000,1606694400000>",
- "<1606694400000,1609372800000>",
- "<1609372800000,1612051200000>",
- "<1612051200000,1614470400000>",
- "<1614470400000,1617148800000>"
+ "[ 1604102400000 : 1606694400000 ]",
+ "[ 1606694400000 : 1609372800000 ]",
+ "[ 1609372800000 : 1612051200000 ]",
+ "[ 1612051200000 : 1614470400000 ]",
+ "[ 1614470400000 : 1617148800000 ]"
};
String[] res2 = {
- "<1604102400000,1604966400000>",
- "<1606694400000,1607558400000>",
- "<1609372800000,1610236800000>",
- "<1612051200000,1612915200000>",
- "<1614470400000,1615334400000>"
+ "[ 1604102400000 : 1604966400000 ]",
+ "[ 1606694400000 : 1607558400000 ]",
+ "[ 1609372800000 : 1610236800000 ]",
+ "[ 1612051200000 : 1612915200000 ]",
+ "[ 1614470400000 : 1615334400000 ]"
};
String[] res3 = {
- "<1604102400000,1606694400000>",
- "<1604966400000,1607558400000>",
- "<1605830400000,1608422400000>",
- "<1606694400000,1609372800000>",
- "<1607558400000,1610236800000>",
- "<1608422400000,1611100800000>",
- "<1609286400000,1611964800000>",
- "<1610150400000,1612828800000>",
- "<1611014400000,1613692800000>",
- "<1611878400000,1614470400000>",
- "<1612742400000,1615161600000>",
- "<1613606400000,1616025600000>",
- "<1614470400000,1617148800000>",
- "<1615334400000,1617148800000>",
- "<1616198400000,1617148800000>",
- "<1617062400000,1617148800000>"
+ "[ 1604102400000 : 1606694400000 ]",
+ "[ 1604966400000 : 1607558400000 ]",
+ "[ 1605830400000 : 1608422400000 ]",
+ "[ 1606694400000 : 1609372800000 ]",
+ "[ 1607558400000 : 1610236800000 ]",
+ "[ 1608422400000 : 1611100800000 ]",
+ "[ 1609286400000 : 1611964800000 ]",
+ "[ 1610150400000 : 1612828800000 ]",
+ "[ 1611014400000 : 1613692800000 ]",
+ "[ 1611878400000 : 1614470400000 ]",
+ "[ 1612742400000 : 1615161600000 ]",
+ "[ 1613606400000 : 1616025600000 ]",
+ "[ 1614470400000 : 1617148800000 ]",
+ "[ 1615334400000 : 1617148800000 ]",
+ "[ 1616198400000 : 1617148800000 ]",
+ "[ 1617062400000 : 1617148800000 ]"
};
String[] res4 = {
- "<1604102400000,1604966400000>",
- "<1604966400000,1605830400000>",
- "<1605830400000,1606694400000>",
- "<1606694400000,1607558400000>",
- "<1607558400000,1608422400000>",
- "<1608422400000,1609286400000>",
- "<1609286400000,1609372800000>",
- "<1609372800000,1610150400000>",
- "<1610150400000,1610236800000>",
- "<1610236800000,1611014400000>",
- "<1611014400000,1611100800000>",
- "<1611100800000,1611878400000>",
- "<1611878400000,1611964800000>",
- "<1611964800000,1612742400000>",
- "<1612742400000,1612828800000>",
- "<1612828800000,1613606400000>",
- "<1613606400000,1613692800000>",
- "<1613692800000,1614470400000>",
- "<1614470400000,1615161600000>",
- "<1615161600000,1615334400000>",
- "<1615334400000,1616025600000>",
- "<1616025600000,1616198400000>",
- "<1616198400000,1617062400000>",
- "<1617062400000,1617148800000>"
+ "[ 1604102400000 : 1604966400000 ]",
+ "[ 1604966400000 : 1605830400000 ]",
+ "[ 1605830400000 : 1606694400000 ]",
+ "[ 1606694400000 : 1607558400000 ]",
+ "[ 1607558400000 : 1608422400000 ]",
+ "[ 1608422400000 : 1609286400000 ]",
+ "[ 1609286400000 : 1609372800000 ]",
+ "[ 1609372800000 : 1610150400000 ]",
+ "[ 1610150400000 : 1610236800000 ]",
+ "[ 1610236800000 : 1611014400000 ]",
+ "[ 1611014400000 : 1611100800000 ]",
+ "[ 1611100800000 : 1611878400000 ]",
+ "[ 1611878400000 : 1611964800000 ]",
+ "[ 1611964800000 : 1612742400000 ]",
+ "[ 1612742400000 : 1612828800000 ]",
+ "[ 1612828800000 : 1613606400000 ]",
+ "[ 1613606400000 : 1613692800000 ]",
+ "[ 1613692800000 : 1614470400000 ]",
+ "[ 1614470400000 : 1615161600000 ]",
+ "[ 1615161600000 : 1615334400000 ]",
+ "[ 1615334400000 : 1616025600000 ]",
+ "[ 1616025600000 : 1616198400000 ]",
+ "[ 1616198400000 : 1617062400000 ]",
+ "[ 1617062400000 : 1617148800000 ]"
};
checkRes(
TimeRangeIteratorFactory.getTimeRangeIterator(
@@ -199,22 +286,13 @@ public class TimeRangeIteratorTest {
private void checkRes(ITimeRangeIterator timeRangeIterator, String[] res) {
boolean isAscending = timeRangeIterator.isAscending();
- long curStartTime;
int cnt = isAscending ? 0 : res.length - 1;
- // test first time range
- Pair<Long, Long> firstTimeRange = timeRangeIterator.getFirstTimeRange();
- Assert.assertEquals(res[cnt], firstTimeRange.toString());
- cnt += isAscending ? 1 : -1;
- curStartTime = firstTimeRange.left;
-
// test next time ranges
- Pair<Long, Long> curTimeRange = timeRangeIterator.getNextTimeRange(curStartTime);
- while (curTimeRange != null) {
+ while (timeRangeIterator.hasNextTimeRange()) {
+ TimeRange curTimeRange = timeRangeIterator.nextTimeRange();
Assert.assertEquals(res[cnt], curTimeRange.toString());
cnt += isAscending ? 1 : -1;
- curStartTime = curTimeRange.left;
- curTimeRange = timeRangeIterator.getNextTimeRange(curStartTime);
}
}
}
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/block/TsBlock.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/block/TsBlock.java
index f6ae37a398..256b95498d 100644
--- a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/block/TsBlock.java
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/block/TsBlock.java
@@ -110,6 +110,10 @@ public class TsBlock {
return positionCount;
}
+ public long getStartTime() {
+ return timeColumn.getStartTime();
+ }
+
public long getEndTime() {
return timeColumn.getEndTime();
}
@@ -191,7 +195,7 @@ public class TsBlock {
return new AlignedTsBlockIterator(0, subIndex);
}
- private class TsBlockSingleColumnIterator implements IPointReader, IBatchDataIterator {
+ public class TsBlockSingleColumnIterator implements IPointReader, IBatchDataIterator {
protected int rowIndex;
protected int columnIndex;
@@ -261,6 +265,22 @@ public class TsBlock {
@Override
public void close() {}
+
+ public long getEndTime() {
+ return TsBlock.this.getEndTime();
+ }
+
+ public long getStartTime() {
+ return TsBlock.this.getStartTime();
+ }
+
+ public int getRowIndex() {
+ return rowIndex;
+ }
+
+ public void setRowIndex(int rowIndex) {
+ this.rowIndex = rowIndex;
+ }
}
/** Mainly used for UDF framework. Note that the timestamps are at the last column. */
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/block/TsBlockBuilder.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/block/TsBlockBuilder.java
index 8de927b785..b02e897235 100644
--- a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/block/TsBlockBuilder.java
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/block/TsBlockBuilder.java
@@ -209,6 +209,10 @@ public class TsBlockBuilder {
declaredPositions = 0;
+ timeColumnBuilder =
+ (TimeColumnBuilder)
+ timeColumnBuilder.newColumnBuilderLike(
+ tsBlockBuilderStatus.createColumnBuilderStatus());
for (int i = 0; i < valueColumnBuilders.length; i++) {
valueColumnBuilders[i] =
valueColumnBuilders[i].newColumnBuilderLike(
@@ -240,6 +244,7 @@ public class TsBlockBuilder {
return types.get(channel);
}
+ // Indicate current row number
public void declarePosition() {
declaredPositions++;
}
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/block/column/BinaryColumnBuilder.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/block/column/BinaryColumnBuilder.java
index beb17a3ea5..4efcac5996 100644
--- a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/block/column/BinaryColumnBuilder.java
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/block/column/BinaryColumnBuilder.java
@@ -18,6 +18,7 @@
*/
package org.apache.iotdb.tsfile.read.common.block.column;
+import org.apache.iotdb.tsfile.exception.write.UnSupportedDataTypeException;
import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
import org.apache.iotdb.tsfile.utils.Binary;
import org.apache.iotdb.tsfile.utils.TsPrimitiveType;
@@ -71,6 +72,16 @@ public class BinaryColumnBuilder implements ColumnBuilder {
return this;
}
+ /** Write an Object to the current entry, which should be the Binary type; */
+ @Override
+ public ColumnBuilder writeObject(Object value) {
+ if (value instanceof Binary) {
+ writeBinary((Binary) value);
+ return this;
+ }
+ throw new UnSupportedDataTypeException("BinaryColumn only support Binary data type");
+ }
+
@Override
public ColumnBuilder writeTsPrimitiveType(TsPrimitiveType value) {
return writeBinary(value.getBinary());
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/block/column/BooleanColumnBuilder.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/block/column/BooleanColumnBuilder.java
index 74fedecd60..24a8284e25 100644
--- a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/block/column/BooleanColumnBuilder.java
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/block/column/BooleanColumnBuilder.java
@@ -18,6 +18,7 @@
*/
package org.apache.iotdb.tsfile.read.common.block.column;
+import org.apache.iotdb.tsfile.exception.write.UnSupportedDataTypeException;
import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
import org.apache.iotdb.tsfile.utils.TsPrimitiveType;
@@ -73,6 +74,16 @@ public class BooleanColumnBuilder implements ColumnBuilder {
return this;
}
+ /** Write an Object to the current entry, which should be the Boolean type; */
+ @Override
+ public ColumnBuilder writeObject(Object value) {
+ if (value instanceof Boolean) {
+ writeBoolean((Boolean) value);
+ return this;
+ }
+ throw new UnSupportedDataTypeException("BooleanColumn only support Boolean data type");
+ }
+
@Override
public ColumnBuilder writeTsPrimitiveType(TsPrimitiveType value) {
return writeBoolean(value.getBoolean());
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/block/column/ColumnBuilder.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/block/column/ColumnBuilder.java
index 37efd66949..b431db1786 100644
--- a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/block/column/ColumnBuilder.java
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/block/column/ColumnBuilder.java
@@ -59,6 +59,11 @@ public interface ColumnBuilder {
throw new UnsupportedOperationException(getClass().getName());
}
+ /** Write an Object to the current entry, which should be the corresponding type; */
+ default ColumnBuilder writeObject(Object value) {
+ throw new UnsupportedOperationException(getClass().getName());
+ }
+
int appendColumn(
TimeColumn timeColumn, Column valueColumn, int offset, TimeColumnBuilder timeBuilder);
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/block/column/DoubleColumnBuilder.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/block/column/DoubleColumnBuilder.java
index bff93c61df..7f546e1b69 100644
--- a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/block/column/DoubleColumnBuilder.java
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/block/column/DoubleColumnBuilder.java
@@ -18,6 +18,7 @@
*/
package org.apache.iotdb.tsfile.read.common.block.column;
+import org.apache.iotdb.tsfile.exception.write.UnSupportedDataTypeException;
import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
import org.apache.iotdb.tsfile.utils.TsPrimitiveType;
@@ -73,6 +74,16 @@ public class DoubleColumnBuilder implements ColumnBuilder {
return this;
}
+ /** Write an Object to the current entry, which should be the Double type; */
+ @Override
+ public ColumnBuilder writeObject(Object value) {
+ if (value instanceof Double) {
+ writeDouble((Double) value);
+ return this;
+ }
+ throw new UnSupportedDataTypeException("DoubleColumn only support Double data type");
+ }
+
@Override
public ColumnBuilder writeTsPrimitiveType(TsPrimitiveType value) {
return writeDouble(value.getDouble());
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/block/column/FloatColumnBuilder.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/block/column/FloatColumnBuilder.java
index 0061baa03e..921fef29eb 100644
--- a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/block/column/FloatColumnBuilder.java
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/block/column/FloatColumnBuilder.java
@@ -18,6 +18,7 @@
*/
package org.apache.iotdb.tsfile.read.common.block.column;
+import org.apache.iotdb.tsfile.exception.write.UnSupportedDataTypeException;
import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
import org.apache.iotdb.tsfile.utils.TsPrimitiveType;
@@ -78,6 +79,16 @@ public class FloatColumnBuilder implements ColumnBuilder {
return writeFloat(value.getFloat());
}
+ /** Write an Object to the current entry, which should be the Float type; */
+ @Override
+ public ColumnBuilder writeObject(Object value) {
+ if (value instanceof Float) {
+ writeFloat((Float) value);
+ return this;
+ }
+ throw new UnSupportedDataTypeException("FloatColumn only support Float data type");
+ }
+
@Override
public int appendColumn(
TimeColumn timeColumn, Column valueColumn, int offset, TimeColumnBuilder timeBuilder) {
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/block/column/IntColumnBuilder.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/block/column/IntColumnBuilder.java
index 3a5db5b773..e1be6f6adb 100644
--- a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/block/column/IntColumnBuilder.java
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/block/column/IntColumnBuilder.java
@@ -18,6 +18,7 @@
*/
package org.apache.iotdb.tsfile.read.common.block.column;
+import org.apache.iotdb.tsfile.exception.write.UnSupportedDataTypeException;
import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
import org.apache.iotdb.tsfile.utils.TsPrimitiveType;
@@ -73,6 +74,16 @@ public class IntColumnBuilder implements ColumnBuilder {
return this;
}
+ /** Write an Object to the current entry, which should be the Integer type; */
+ @Override
+ public ColumnBuilder writeObject(Object value) {
+ if (value instanceof Integer) {
+ writeInt((Integer) value);
+ return this;
+ }
+ throw new UnSupportedDataTypeException("IntegerColumn only support Integer data type");
+ }
+
@Override
public ColumnBuilder writeTsPrimitiveType(TsPrimitiveType value) {
return writeInt(value.getInt());
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/block/column/LongColumnBuilder.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/block/column/LongColumnBuilder.java
index 38afe3d711..ad21bec65c 100644
--- a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/block/column/LongColumnBuilder.java
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/block/column/LongColumnBuilder.java
@@ -18,6 +18,7 @@
*/
package org.apache.iotdb.tsfile.read.common.block.column;
+import org.apache.iotdb.tsfile.exception.write.UnSupportedDataTypeException;
import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
import org.apache.iotdb.tsfile.utils.TsPrimitiveType;
@@ -73,6 +74,16 @@ public class LongColumnBuilder implements ColumnBuilder {
return this;
}
+ /** Write an Object to the current entry, which should be the Long type; */
+ @Override
+ public ColumnBuilder writeObject(Object value) {
+ if (value instanceof Long) {
+ writeLong((Long) value);
+ return this;
+ }
+ throw new UnSupportedDataTypeException("LongColumn only support Long data type");
+ }
+
@Override
public ColumnBuilder writeTsPrimitiveType(TsPrimitiveType value) {
return writeLong(value.getLong());
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/block/column/TimeColumn.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/block/column/TimeColumn.java
index d10ba9fc68..d8b44fd384 100644
--- a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/block/column/TimeColumn.java
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/block/column/TimeColumn.java
@@ -104,6 +104,10 @@ public class TimeColumn implements Column {
return new TimeColumn(positionOffset + arrayOffset, length, values);
}
+ public long getStartTime() {
+ return values[arrayOffset];
+ }
+
public long getEndTime() {
return values[getPositionCount() + arrayOffset - 1];
}
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/block/column/TimeColumnBuilder.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/block/column/TimeColumnBuilder.java
index 7b5dee9368..99488d1213 100644
--- a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/block/column/TimeColumnBuilder.java
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/block/column/TimeColumnBuilder.java
@@ -18,6 +18,7 @@
*/
package org.apache.iotdb.tsfile.read.common.block.column;
+import org.apache.iotdb.tsfile.exception.write.UnSupportedDataTypeException;
import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
import org.openjdk.jol.info.ClassLayout;
@@ -71,6 +72,16 @@ public class TimeColumnBuilder implements ColumnBuilder {
throw new UnsupportedOperationException(getClass().getName());
}
+ /** Write an Object to the current entry, which should be the Long type; */
+ @Override
+ public ColumnBuilder writeObject(Object value) {
+ if (value instanceof Long) {
+ writeLong((Long) value);
+ return this;
+ }
+ throw new UnSupportedDataTypeException("LongColumn only support Long data type");
+ }
+
@Override
public ColumnBuilder appendNull() {
throw new UnsupportedOperationException(getClass().getName());