You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ja...@apache.org on 2021/09/29 01:39:15 UTC
[iotdb] branch master updated: [IOTDB-1638] Support vector in
aggregation query (#4005)
This is an automated email from the ASF dual-hosted git repository.
jackietien pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new 8e54838 [IOTDB-1638] Support vector in aggregation query (#4005)
8e54838 is described below
commit 8e548380c14ce9fa85cd1bb8556c03becfa7199b
Author: Xiangwei Wei <34...@users.noreply.github.com>
AuthorDate: Wed Sep 29 09:38:02 2021 +0800
[IOTDB-1638] Support vector in aggregation query (#4005)
---
.../cluster/query/ClusterPhysicalGenerator.java | 7 +-
.../iotdb/cluster/query/ClusterQueryRouter.java | 5 +-
.../query/aggregate/ClusterAggregateExecutor.java | 30 +-
.../query/ClusterAggregateExecutorTest.java | 8 +-
.../iotdb/AlignedTimeseriesSessionExample.java | 7 +
.../db/engine/cache/TimeSeriesMetadataCache.java | 6 +-
.../engine/compaction/utils/CompactionUtils.java | 7 +-
.../org/apache/iotdb/db/metadata/MManager.java | 17 +-
.../iotdb/db/qp/logical/crud/QueryOperator.java | 33 +-
.../iotdb/db/qp/physical/crud/AggregationPlan.java | 5 -
.../db/qp/physical/crud/AlignByDevicePlan.java | 5 +
.../iotdb/db/qp/physical/crud/FillQueryPlan.java | 5 -
.../iotdb/db/qp/physical/crud/LastQueryPlan.java | 5 -
.../iotdb/db/qp/physical/crud/QueryIndexPlan.java | 5 -
.../iotdb/db/qp/physical/crud/QueryPlan.java | 10 -
.../db/qp/physical/crud/RawDataQueryPlan.java | 32 +-
.../apache/iotdb/db/qp/physical/crud/UDTFPlan.java | 5 +-
.../iotdb/db/qp/strategy/PhysicalGenerator.java | 7 +-
.../db/query/aggregation/AggregateResult.java | 10 +-
.../db/query/aggregation/impl/AvgAggrResult.java | 17 +-
.../db/query/aggregation/impl/CountAggrResult.java | 15 +-
.../query/aggregation/impl/ExtremeAggrResult.java | 19 +-
.../aggregation/impl/FirstValueAggrResult.java | 25 +-
.../aggregation/impl/FirstValueDescAggrResult.java | 17 +-
.../aggregation/impl/LastValueAggrResult.java | 21 +-
.../aggregation/impl/LastValueDescAggrResult.java | 17 +-
.../query/aggregation/impl/MaxTimeAggrResult.java | 19 +-
.../aggregation/impl/MaxTimeDescAggrResult.java | 13 +-
.../query/aggregation/impl/MaxValueAggrResult.java | 21 +-
.../query/aggregation/impl/MinTimeAggrResult.java | 17 +-
.../aggregation/impl/MinTimeDescAggrResult.java | 11 +-
.../query/aggregation/impl/MinValueAggrResult.java | 19 +-
.../db/query/aggregation/impl/SumAggrResult.java | 17 +-
.../db/query/dataset/AlignByDeviceDataSet.java | 15 +-
.../dataset/groupby/LocalGroupByExecutor.java | 14 +-
.../db/query/executor/AggregationExecutor.java | 377 ++++++++++++++++++---
.../iotdb/db/query/executor/QueryRouter.java | 11 +-
.../db/query/reader/series/IAggregateReader.java | 2 +-
.../iotdb/db/query/reader/series/SeriesReader.java | 38 +++
.../reader/series/VectorSeriesAggregateReader.java | 177 ++++++++++
.../reader/universal/DescPriorityMergeReader.java | 9 +-
.../iotdb/db/integration/IoTDBAlignByDeviceIT.java | 18 -
.../reader/series/SeriesAggregateReaderTest.java | 10 +-
.../session/IoTDBSessionVectorABDeviceIT.java | 231 +++++++++++++
.../session/IoTDBSessionVectorAggregationIT.java | 266 +++++++++++++++
.../IoTDBSessionVectorAggregationWithUnSeqIT.java | 192 +++++++++++
...ctorIT.java => IoTDBSessionVectorInsertIT.java} | 15 +-
.../tsfile/file/metadata/VectorChunkMetadata.java | 4 +
.../file/metadata/VectorTimeSeriesMetadata.java | 4 +
.../apache/iotdb/tsfile/read/common/BatchData.java | 84 ++++-
.../tsfile/read/common/DescReadWriteBatchData.java | 42 +++
.../tsfile/read/common/IBatchDataIterator.java | 35 ++
.../tsfile/read/reader/BatchDataIterator.java | 54 ---
.../tsfile/read/reader/page/VectorPageReader.java | 6 +-
54 files changed, 1648 insertions(+), 413 deletions(-)
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/query/ClusterPhysicalGenerator.java b/cluster/src/main/java/org/apache/iotdb/cluster/query/ClusterPhysicalGenerator.java
index 0dcda7e..80a6a94 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/query/ClusterPhysicalGenerator.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/query/ClusterPhysicalGenerator.java
@@ -34,7 +34,6 @@ import org.apache.iotdb.db.qp.physical.sys.LoadConfigurationPlan.LoadConfigurati
import org.apache.iotdb.db.qp.strategy.PhysicalGenerator;
import org.apache.iotdb.db.service.IoTDB;
import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
-import org.apache.iotdb.tsfile.utils.Pair;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -46,7 +45,6 @@ import java.io.InputStream;
import java.net.URL;
import java.util.ArrayList;
import java.util.List;
-import java.util.Map;
import java.util.Properties;
public class ClusterPhysicalGenerator extends PhysicalGenerator {
@@ -67,9 +65,8 @@ public class ClusterPhysicalGenerator extends PhysicalGenerator {
}
@Override
- public Pair<List<PartialPath>, Map<String, Integer>> getSeriesSchema(List<PartialPath> paths)
- throws MetadataException {
- return getCMManager().getSeriesSchemas(paths);
+ public List<PartialPath> groupVectorPaths(List<PartialPath> paths) throws MetadataException {
+ return getCMManager().groupVectorPaths(paths);
}
@Override
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/query/ClusterQueryRouter.java b/cluster/src/main/java/org/apache/iotdb/cluster/query/ClusterQueryRouter.java
index e3be92c..eb45353 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/query/ClusterQueryRouter.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/query/ClusterQueryRouter.java
@@ -78,8 +78,9 @@ public class ClusterQueryRouter extends QueryRouter {
}
@Override
- protected AggregationExecutor getAggregationExecutor(AggregationPlan aggregationPlan) {
- return new ClusterAggregateExecutor(aggregationPlan, metaGroupMember);
+ protected AggregationExecutor getAggregationExecutor(
+ QueryContext context, AggregationPlan aggregationPlan) {
+ return new ClusterAggregateExecutor(context, aggregationPlan, metaGroupMember);
}
@Override
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/query/aggregate/ClusterAggregateExecutor.java b/cluster/src/main/java/org/apache/iotdb/cluster/query/aggregate/ClusterAggregateExecutor.java
index a007ee9..af12e5a 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/query/aggregate/ClusterAggregateExecutor.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/query/aggregate/ClusterAggregateExecutor.java
@@ -37,7 +37,6 @@ import org.apache.iotdb.tsfile.read.query.timegenerator.TimeGenerator;
import java.util.ArrayList;
import java.util.List;
-import java.util.Map;
import java.util.Set;
public class ClusterAggregateExecutor extends AggregationExecutor {
@@ -51,8 +50,9 @@ public class ClusterAggregateExecutor extends AggregationExecutor {
*
* @param aggregationPlan
*/
- public ClusterAggregateExecutor(AggregationPlan aggregationPlan, MetaGroupMember metaMember) {
- super(aggregationPlan);
+ public ClusterAggregateExecutor(
+ QueryContext context, AggregationPlan aggregationPlan, MetaGroupMember metaMember) {
+ super(context, aggregationPlan);
this.metaMember = metaMember;
this.readerFactory = new ClusterReaderFactory(metaMember);
this.aggregator = new ClusterAggregator(metaMember);
@@ -60,24 +60,28 @@ public class ClusterAggregateExecutor extends AggregationExecutor {
@Override
protected void aggregateOneSeries(
- Map.Entry<PartialPath, List<Integer>> pathToAggrIndexes,
- AggregateResult[] aggregateResultList,
- Set<String> measurements,
- Filter timeFilter,
- QueryContext context)
+ PartialPath seriesPath,
+ List<Integer> indexes,
+ Set<String> allMeasurementsInDevice,
+ Filter timeFilter)
throws StorageEngineException {
- PartialPath seriesPath = pathToAggrIndexes.getKey();
- TSDataType tsDataType = dataTypes.get(pathToAggrIndexes.getValue().get(0));
+ TSDataType tsDataType = dataTypes.get(indexes.get(0));
List<String> aggregationNames = new ArrayList<>();
- for (int i : pathToAggrIndexes.getValue()) {
+ for (int i : indexes) {
aggregationNames.add(aggregations.get(i));
}
List<AggregateResult> aggregateResult =
aggregator.getAggregateResult(
- seriesPath, measurements, aggregationNames, tsDataType, timeFilter, context, ascending);
+ seriesPath,
+ allMeasurementsInDevice,
+ aggregationNames,
+ tsDataType,
+ timeFilter,
+ context,
+ ascending);
int rstIndex = 0;
- for (int i : pathToAggrIndexes.getValue()) {
+ for (int i : indexes) {
aggregateResultList[i] = aggregateResult.get(rstIndex++);
}
}
diff --git a/cluster/src/test/java/org/apache/iotdb/cluster/query/ClusterAggregateExecutorTest.java b/cluster/src/test/java/org/apache/iotdb/cluster/query/ClusterAggregateExecutorTest.java
index 7d18a00..ffa461e 100644
--- a/cluster/src/test/java/org/apache/iotdb/cluster/query/ClusterAggregateExecutorTest.java
+++ b/cluster/src/test/java/org/apache/iotdb/cluster/query/ClusterAggregateExecutorTest.java
@@ -87,8 +87,8 @@ public class ClusterAggregateExecutorTest extends BaseQueryTest {
QueryContext context =
new RemoteQueryContext(QueryResourceManager.getInstance().assignQueryId(true));
try {
- executor = new ClusterAggregateExecutor(plan, testMetaMember);
- QueryDataSet queryDataSet = executor.executeWithoutValueFilter(context, plan);
+ executor = new ClusterAggregateExecutor(context, plan, testMetaMember);
+ QueryDataSet queryDataSet = executor.executeWithoutValueFilter(plan);
assertTrue(queryDataSet.hasNext());
RowRecord record = queryDataSet.next();
List<Field> fields = record.getFields();
@@ -144,8 +144,8 @@ public class ClusterAggregateExecutorTest extends BaseQueryTest {
QueryContext context =
new RemoteQueryContext(QueryResourceManager.getInstance().assignQueryId(true));
try {
- executor = new ClusterAggregateExecutor(plan, testMetaMember);
- QueryDataSet queryDataSet = executor.executeWithValueFilter(context, plan);
+ executor = new ClusterAggregateExecutor(context, plan, testMetaMember);
+ QueryDataSet queryDataSet = executor.executeWithValueFilter(plan);
assertTrue(queryDataSet.hasNext());
RowRecord record = queryDataSet.next();
List<Field> fields = record.getFields();
diff --git a/example/session/src/main/java/org/apache/iotdb/AlignedTimeseriesSessionExample.java b/example/session/src/main/java/org/apache/iotdb/AlignedTimeseriesSessionExample.java
index 885ffe9..bbfd4c7 100644
--- a/example/session/src/main/java/org/apache/iotdb/AlignedTimeseriesSessionExample.java
+++ b/example/session/src/main/java/org/apache/iotdb/AlignedTimeseriesSessionExample.java
@@ -144,6 +144,13 @@ public class AlignedTimeseriesSessionExample {
}
dataSet.closeOperationHandle();
+ dataSet = session.executeQueryStatement("select count(*) from root.sg_1.d1.vector");
+ System.out.println(dataSet.getColumnNames());
+ while (dataSet.hasNext()) {
+ System.out.println(dataSet.next());
+ }
+
+ dataSet.closeOperationHandle();
dataSet =
session.executeQueryStatement(
"select sum(*) from root.sg_1.d1.vector where time > 50 and s1 > 0 and s2 > 10000");
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/cache/TimeSeriesMetadataCache.java b/server/src/main/java/org/apache/iotdb/db/engine/cache/TimeSeriesMetadataCache.java
index 0755c37..eebfd66 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/cache/TimeSeriesMetadataCache.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/cache/TimeSeriesMetadataCache.java
@@ -213,11 +213,13 @@ public class TimeSeriesMetadataCache {
&& !bloomFilter.contains(key.device + IoTDBConstant.PATH_SEPARATOR + key.measurement)) {
return Collections.emptyList();
}
- return readTimeseriesMetadataForVector(reader, key, subSensorList, allSensors);
+ // for the condition that cache is disabled, we only get what we need
+ Set<String> allSensorSet = new HashSet<>(subSensorList);
+ allSensorSet.add(key.measurement);
+ return readTimeseriesMetadataForVector(reader, key, subSensorList, allSensorSet);
}
List<TimeseriesMetadata> res = new ArrayList<>();
-
getVectorTimeSeriesMetadataListFromCache(key, subSensorList, res);
if (res.isEmpty()) {
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/compaction/utils/CompactionUtils.java b/server/src/main/java/org/apache/iotdb/db/engine/compaction/utils/CompactionUtils.java
index 95d9f5f..23cdb16 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/compaction/utils/CompactionUtils.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/compaction/utils/CompactionUtils.java
@@ -33,7 +33,6 @@ import org.apache.iotdb.tsfile.file.metadata.ChunkMetadata;
import org.apache.iotdb.tsfile.read.TimeValuePair;
import org.apache.iotdb.tsfile.read.TsFileSequenceReader;
import org.apache.iotdb.tsfile.read.common.Chunk;
-import org.apache.iotdb.tsfile.read.reader.BatchDataIterator;
import org.apache.iotdb.tsfile.read.reader.IChunkReader;
import org.apache.iotdb.tsfile.read.reader.IPointReader;
import org.apache.iotdb.tsfile.read.reader.chunk.ChunkReaderByTimestamp;
@@ -110,9 +109,9 @@ public class CompactionUtils {
for (ChunkMetadata chunkMetadata : chunkMetadataList) {
IChunkReader chunkReader = new ChunkReaderByTimestamp(reader.readMemChunk(chunkMetadata));
while (chunkReader.hasNextSatisfiedPage()) {
- IPointReader iPointReader = new BatchDataIterator(chunkReader.nextPageData());
- while (iPointReader.hasNextTimeValuePair()) {
- TimeValuePair timeValuePair = iPointReader.nextTimeValuePair();
+ IPointReader batchIterator = chunkReader.nextPageData().getBatchDataIterator();
+ while (batchIterator.hasNextTimeValuePair()) {
+ TimeValuePair timeValuePair = batchIterator.nextTimeValuePair();
timeValuePairMap.put(timeValuePair.getTimestamp(), timeValuePair);
}
}
diff --git a/server/src/main/java/org/apache/iotdb/db/metadata/MManager.java b/server/src/main/java/org/apache/iotdb/db/metadata/MManager.java
index dacdda7..73806ed 100644
--- a/server/src/main/java/org/apache/iotdb/db/metadata/MManager.java
+++ b/server/src/main/java/org/apache/iotdb/db/metadata/MManager.java
@@ -1302,20 +1302,11 @@ public class MManager {
* @param fullPaths full path list without uniting the sub measurement under the same aligned time
* series.
* @return Size of partial path list could NOT equal to the input list size. For example, the
- * VectorMeasurementSchema (s1,s2) would be returned once; Size of integer list must equal to
- * the input list size. It indicates the index of elements of original list in the result list
+ * VectorMeasurementSchema (s1,s2) would be returned once.
*/
- public Pair<List<PartialPath>, Map<String, Integer>> getSeriesSchemas(List<PartialPath> fullPaths)
- throws MetadataException {
+ public List<PartialPath> groupVectorPaths(List<PartialPath> fullPaths) throws MetadataException {
Map<IMNode, PartialPath> nodeToPartialPath = new LinkedHashMap<>();
- Map<String, Integer> pathIndex = new LinkedHashMap<>();
- for (int i = 0; i < fullPaths.size(); i++) {
- PartialPath path = fullPaths.get(i);
- pathIndex.put(path.getExactFullPath(), i);
- if (path.isMeasurementAliasExists()) {
- pathIndex.put(path.getFullPathWithAlias(), i);
- }
-
+ for (PartialPath path : fullPaths) {
IMeasurementMNode node = getMeasurementMNode(path);
if (!nodeToPartialPath.containsKey(node)) {
nodeToPartialPath.put(node, path.copy());
@@ -1329,7 +1320,7 @@ public class MManager {
}
}
}
- return new Pair<>(new ArrayList<>(nodeToPartialPath.values()), pathIndex);
+ return new ArrayList<>(nodeToPartialPath.values());
}
// attention: this path must be a device node
diff --git a/server/src/main/java/org/apache/iotdb/db/qp/logical/crud/QueryOperator.java b/server/src/main/java/org/apache/iotdb/db/qp/logical/crud/QueryOperator.java
index bf78ca9..3f3282e 100644
--- a/server/src/main/java/org/apache/iotdb/db/qp/logical/crud/QueryOperator.java
+++ b/server/src/main/java/org/apache/iotdb/db/qp/logical/crud/QueryOperator.java
@@ -41,6 +41,7 @@ import org.apache.iotdb.db.query.expression.ResultColumn;
import org.apache.iotdb.db.query.expression.unary.FunctionExpression;
import org.apache.iotdb.db.query.expression.unary.TimeSeriesOperand;
import org.apache.iotdb.db.service.IoTDB;
+import org.apache.iotdb.tsfile.common.constant.TsFileConstant;
import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
import org.apache.iotdb.tsfile.read.expression.IExpression;
@@ -149,21 +150,8 @@ public class QueryOperator extends Operator {
}
public void check() throws LogicalOperatorException {
- if (isAlignByDevice()) {
- if (selectComponent.hasTimeSeriesGeneratingFunction()) {
- throw new LogicalOperatorException(
- "ALIGN BY DEVICE clause is not supported in UDF queries.");
- }
-
- for (PartialPath path : selectComponent.getPaths()) {
- String device = path.getDevice();
- if (!device.isEmpty()) {
- throw new LogicalOperatorException(
- "The paths of the SELECT clause can only be single level. In other words, "
- + "the paths of the SELECT clause can only be measurements or STAR, without DOT."
- + " For more details please refer to the SQL document.");
- }
- }
+ if (isAlignByDevice() && selectComponent.hasTimeSeriesGeneratingFunction()) {
+ throw new LogicalOperatorException("ALIGN BY DEVICE clause is not supported in UDF queries.");
}
}
@@ -227,7 +215,6 @@ public class QueryOperator extends Operator {
List<PartialPath> devices = removeStarsInDeviceWithUnique(prefixPaths);
List<ResultColumn> resultColumns = selectComponent.getResultColumns();
List<String> aggregationFuncs = selectComponent.getAggregationFunctions();
-
// to record result measurement columns
List<String> measurements = new ArrayList<>();
Map<String, MeasurementInfo> measurementInfoMap = new HashMap<>();
@@ -254,12 +241,14 @@ public class QueryOperator extends Operator {
try {
// remove stars in SELECT to get actual paths
List<PartialPath> actualPaths = getMatchedTimeseries(fullPath);
+ if (suffixPath.getNodes().length > 1
+ && (actualPaths.isEmpty() || !(actualPaths.get(0) instanceof VectorPartialPath))) {
+ throw new QueryProcessException(AlignByDevicePlan.MEASUREMENT_ERROR_MESSAGE);
+ }
if (resultColumn.hasAlias() && actualPaths.size() >= 2) {
throw new QueryProcessException(
- String.format(
- "alias %s can only be matched with one time series", resultColumn.getAlias()));
+ String.format(AlignByDevicePlan.ALIAS_ERROR_MESSAGE, resultColumn.getAlias()));
}
-
if (actualPaths.isEmpty()) {
String nonExistMeasurement = getMeasurementName(fullPath, aggregation);
if (measurementSetOfGivenSuffix.add(nonExistMeasurement)) {
@@ -373,11 +362,15 @@ public class QueryOperator extends Operator {
: (((FunctionExpression) expression).getPaths().get(0));
}
+ /**
+ * If path is a vectorPartialPath, we return its measurementId + subMeasurement as the final
+ * measurement. e.g. path: root.sg.d1.vector1[s1], return "vector1.s1".
+ */
private String getMeasurementName(PartialPath path, String aggregation) {
String initialMeasurement = path.getMeasurement();
if (path instanceof VectorPartialPath) {
String subMeasurement = ((VectorPartialPath) path).getSubSensor(0);
- initialMeasurement += "." + subMeasurement;
+ initialMeasurement += TsFileConstant.PATH_SEPARATOR + subMeasurement;
}
if (aggregation != null) {
initialMeasurement = aggregation + "(" + initialMeasurement + ")";
diff --git a/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/AggregationPlan.java b/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/AggregationPlan.java
index bccd004..ea74ecb 100644
--- a/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/AggregationPlan.java
+++ b/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/AggregationPlan.java
@@ -122,9 +122,4 @@ public class AggregationPlan extends RawDataQueryPlan {
}
return columnForDisplay;
}
-
- @Override
- public boolean isRawQuery() {
- return false;
- }
}
diff --git a/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/AlignByDevicePlan.java b/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/AlignByDevicePlan.java
index fd1bebd..288b402 100644
--- a/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/AlignByDevicePlan.java
+++ b/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/AlignByDevicePlan.java
@@ -29,6 +29,11 @@ import java.util.Map;
public class AlignByDevicePlan extends QueryPlan {
+ public static final String MEASUREMENT_ERROR_MESSAGE =
+ "The paths of the SELECT clause can only be measurements or STAR.";
+ public static final String ALIAS_ERROR_MESSAGE =
+ "alias %s can only be matched with one time series";
+
// to record result measurement columns, e.g. temperature, status, speed
private List<String> measurements;
private Map<String, MeasurementInfo> measurementInfoMap;
diff --git a/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/FillQueryPlan.java b/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/FillQueryPlan.java
index 300cc85..a952e7b 100644
--- a/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/FillQueryPlan.java
+++ b/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/FillQueryPlan.java
@@ -49,9 +49,4 @@ public class FillQueryPlan extends RawDataQueryPlan {
public void setFillType(Map<TSDataType, IFill> fillType) {
this.fillType = fillType;
}
-
- @Override
- public boolean isRawQuery() {
- return false;
- }
}
diff --git a/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/LastQueryPlan.java b/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/LastQueryPlan.java
index 21c3424..9f4f684 100644
--- a/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/LastQueryPlan.java
+++ b/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/LastQueryPlan.java
@@ -75,9 +75,4 @@ public class LastQueryPlan extends RawDataQueryPlan {
}
return false;
}
-
- @Override
- public boolean isRawQuery() {
- return false;
- }
}
diff --git a/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/QueryIndexPlan.java b/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/QueryIndexPlan.java
index 8a931bf..27d20f1 100644
--- a/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/QueryIndexPlan.java
+++ b/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/QueryIndexPlan.java
@@ -73,9 +73,4 @@ public class QueryIndexPlan extends RawDataQueryPlan {
public String toString() {
return String.format("Query paths: %s, index type: %s, props: %s", paths, indexType, props);
}
-
- @Override
- public boolean isRawQuery() {
- return false;
- }
}
diff --git a/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/QueryPlan.java b/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/QueryPlan.java
index aff2b51..52cb0d6 100644
--- a/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/QueryPlan.java
+++ b/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/QueryPlan.java
@@ -45,8 +45,6 @@ public abstract class QueryPlan extends PhysicalPlan {
private Map<String, Integer> pathToIndex = new HashMap<>();
- private Map<String, Integer> vectorPathToIndex = new HashMap<>();
-
private boolean enableRedirect = false;
// if true, we don't need the row whose any column is null
@@ -150,14 +148,6 @@ public abstract class QueryPlan extends PhysicalPlan {
this.enableRedirect = enableRedirect;
}
- public Map<String, Integer> getVectorPathToIndex() {
- return vectorPathToIndex;
- }
-
- public void setVectorPathToIndex(Map<String, Integer> vectorPathToIndex) {
- this.vectorPathToIndex = vectorPathToIndex;
- }
-
public List<ResultColumn> getResultColumns() {
return resultColumns;
}
diff --git a/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/RawDataQueryPlan.java b/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/RawDataQueryPlan.java
index 5936633..4e59686 100644
--- a/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/RawDataQueryPlan.java
+++ b/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/RawDataQueryPlan.java
@@ -44,6 +44,7 @@ public class RawDataQueryPlan extends QueryPlan {
private IExpression expression = null;
private Map<String, Set<String>> deviceToMeasurements = new HashMap<>();
+ // TODO: remove this when all types of query supporting vector
/** used to group all the sub sensors of one vector into VectorPartialPath */
private List<PartialPath> deduplicatedVectorPaths = new ArrayList<>();
@@ -93,11 +94,9 @@ public class RawDataQueryPlan extends QueryPlan {
}
}
- if (isRawQuery()) {
- // if it is a RawQueryWithoutValueFilter, we also need to group all the subSensors of one
- // vector into one VectorPartialPath
- transformVectorPaths(physicalGenerator, columnForDisplaySet);
- }
+ // if it is a RawQueryWithoutValueFilter, we also need to group all the subSensors of one
+ // vector into one VectorPartialPath
+ groupVectorPaths(physicalGenerator);
}
public IExpression getExpression() {
@@ -173,25 +172,13 @@ public class RawDataQueryPlan extends QueryPlan {
* them directly into deduplicatedPaths and deduplicatedDataTypes, because we don't know whether
* the raw query has value filter here.
*/
- public void transformVectorPaths(
- PhysicalGenerator physicalGenerator, Set<String> columnForDisplaySet)
- throws MetadataException {
- Pair<List<PartialPath>, Map<String, Integer>> pair =
- physicalGenerator.getSeriesSchema(getDeduplicatedPaths());
-
- List<PartialPath> vectorizedDeduplicatedPaths = pair.left;
+ public void groupVectorPaths(PhysicalGenerator physicalGenerator) throws MetadataException {
+ List<PartialPath> vectorizedDeduplicatedPaths =
+ physicalGenerator.groupVectorPaths(getDeduplicatedPaths());
List<TSDataType> vectorizedDeduplicatedDataTypes =
new ArrayList<>(physicalGenerator.getSeriesTypes(vectorizedDeduplicatedPaths));
setDeduplicatedVectorPaths(vectorizedDeduplicatedPaths);
setDeduplicatedVectorDataTypes(vectorizedDeduplicatedDataTypes);
-
- Map<String, Integer> columnForDisplayToQueryDataSetIndex = pair.right;
- Map<String, Integer> vectorPathToIndex = new HashMap<>();
- for (String columnForDisplay : columnForDisplaySet) {
- vectorPathToIndex.put(
- columnForDisplay, columnForDisplayToQueryDataSetIndex.get(columnForDisplay));
- }
- setVectorPathToIndex(vectorPathToIndex);
}
public List<PartialPath> getDeduplicatedVectorPaths() {
@@ -218,11 +205,6 @@ public class RawDataQueryPlan extends QueryPlan {
if (!this.deduplicatedVectorPaths.isEmpty()) {
this.deduplicatedPaths = this.deduplicatedVectorPaths;
this.deduplicatedDataTypes = this.deduplicatedVectorDataTypes;
- setPathToIndex(getVectorPathToIndex());
}
}
-
- public boolean isRawQuery() {
- return true;
- }
}
diff --git a/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/UDTFPlan.java b/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/UDTFPlan.java
index 57aeff8..502d996 100644
--- a/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/UDTFPlan.java
+++ b/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/UDTFPlan.java
@@ -129,8 +129,7 @@ public class UDTFPlan extends RawDataQueryPlan implements UDFPlan {
return pathNameToReaderIndex.get(pathName);
}
- @Override
- public boolean isRawQuery() {
- return false;
+ public void setPathNameToReaderIndex(Map<String, Integer> pathNameToReaderIndex) {
+ this.pathNameToReaderIndex = pathNameToReaderIndex;
}
}
diff --git a/server/src/main/java/org/apache/iotdb/db/qp/strategy/PhysicalGenerator.java b/server/src/main/java/org/apache/iotdb/db/qp/strategy/PhysicalGenerator.java
index abbb074..335de0f 100644
--- a/server/src/main/java/org/apache/iotdb/db/qp/strategy/PhysicalGenerator.java
+++ b/server/src/main/java/org/apache/iotdb/db/qp/strategy/PhysicalGenerator.java
@@ -29,10 +29,8 @@ import org.apache.iotdb.db.qp.physical.sys.LoadConfigurationPlan.LoadConfigurati
import org.apache.iotdb.db.service.IoTDB;
import org.apache.iotdb.db.utils.SchemaUtils;
import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
-import org.apache.iotdb.tsfile.utils.Pair;
import java.util.List;
-import java.util.Map;
/** Used to convert logical operator to physical plan */
public class PhysicalGenerator {
@@ -60,8 +58,7 @@ public class PhysicalGenerator {
return SchemaUtils.getSeriesTypesByPaths(paths);
}
- public Pair<List<PartialPath>, Map<String, Integer>> getSeriesSchema(List<PartialPath> paths)
- throws MetadataException {
- return IoTDB.metaManager.getSeriesSchemas(paths);
+ public List<PartialPath> groupVectorPaths(List<PartialPath> paths) throws MetadataException {
+ return IoTDB.metaManager.groupVectorPaths(paths);
}
}
diff --git a/server/src/main/java/org/apache/iotdb/db/query/aggregation/AggregateResult.java b/server/src/main/java/org/apache/iotdb/db/query/aggregation/AggregateResult.java
index 8016be7..a1f000f 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/aggregation/AggregateResult.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/aggregation/AggregateResult.java
@@ -25,7 +25,7 @@ import org.apache.iotdb.db.query.reader.series.IReaderByTimestamp;
import org.apache.iotdb.tsfile.exception.write.UnSupportedDataTypeException;
import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
import org.apache.iotdb.tsfile.file.metadata.statistics.Statistics;
-import org.apache.iotdb.tsfile.read.common.BatchData;
+import org.apache.iotdb.tsfile.read.common.IBatchDataIterator;
import org.apache.iotdb.tsfile.utils.Binary;
import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;
@@ -72,20 +72,20 @@ public abstract class AggregateResult {
/**
* Aggregate results cannot be calculated using Statistics directly, using the data in each page
*
- * @param dataInThisPage the data in Page
+ * @param batchIterator the data in Page
*/
- public abstract void updateResultFromPageData(BatchData dataInThisPage)
+ public abstract void updateResultFromPageData(IBatchDataIterator batchIterator)
throws IOException, QueryProcessException;
/**
* Aggregate results cannot be calculated using Statistics directly, using the data in each page
*
- * @param dataInThisPage the data in Page
+ * @param batchIterator the data in Page
* @param minBound calculate points whose time >= bound
* @param maxBound calculate points whose time < bound
*/
public abstract void updateResultFromPageData(
- BatchData dataInThisPage, long minBound, long maxBound) throws IOException;
+ IBatchDataIterator batchIterator, long minBound, long maxBound) throws IOException;
/**
* This method calculates the aggregation using common timestamps of the cross series filter.
diff --git a/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/AvgAggrResult.java b/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/AvgAggrResult.java
index d808c0b..a1fffa6 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/AvgAggrResult.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/AvgAggrResult.java
@@ -28,7 +28,7 @@ import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
import org.apache.iotdb.tsfile.file.metadata.statistics.BooleanStatistics;
import org.apache.iotdb.tsfile.file.metadata.statistics.IntegerStatistics;
import org.apache.iotdb.tsfile.file.metadata.statistics.Statistics;
-import org.apache.iotdb.tsfile.read.common.BatchData;
+import org.apache.iotdb.tsfile.read.common.IBatchDataIterator;
import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;
import java.io.IOException;
@@ -82,18 +82,19 @@ public class AvgAggrResult extends AggregateResult {
}
@Override
- public void updateResultFromPageData(BatchData dataInThisPage) {
- updateResultFromPageData(dataInThisPage, Long.MIN_VALUE, Long.MAX_VALUE);
+ public void updateResultFromPageData(IBatchDataIterator batchIterator) {
+ updateResultFromPageData(batchIterator, Long.MIN_VALUE, Long.MAX_VALUE);
}
@Override
- public void updateResultFromPageData(BatchData dataInThisPage, long minBound, long maxBound) {
- while (dataInThisPage.hasCurrent()) {
- if (dataInThisPage.currentTime() >= maxBound || dataInThisPage.currentTime() < minBound) {
+ public void updateResultFromPageData(
+ IBatchDataIterator batchIterator, long minBound, long maxBound) {
+ while (batchIterator.hasNext()) {
+ if (batchIterator.currentTime() >= maxBound || batchIterator.currentTime() < minBound) {
break;
}
- updateAvg(seriesDataType, dataInThisPage.currentValue());
- dataInThisPage.next();
+ updateAvg(seriesDataType, batchIterator.currentValue());
+ batchIterator.next();
}
}
diff --git a/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/CountAggrResult.java b/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/CountAggrResult.java
index 00bfcad..10b9ad0 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/CountAggrResult.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/CountAggrResult.java
@@ -24,7 +24,7 @@ import org.apache.iotdb.db.query.aggregation.AggregationType;
import org.apache.iotdb.db.query.reader.series.IReaderByTimestamp;
import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
import org.apache.iotdb.tsfile.file.metadata.statistics.Statistics;
-import org.apache.iotdb.tsfile.read.common.BatchData;
+import org.apache.iotdb.tsfile.read.common.IBatchDataIterator;
import java.io.IOException;
import java.io.OutputStream;
@@ -49,19 +49,20 @@ public class CountAggrResult extends AggregateResult {
}
@Override
- public void updateResultFromPageData(BatchData dataInThisPage) {
- setLongValue(getLongValue() + dataInThisPage.length());
+ public void updateResultFromPageData(IBatchDataIterator batchIterator) {
+ setLongValue(getLongValue() + batchIterator.totalLength());
}
@Override
- public void updateResultFromPageData(BatchData dataInThisPage, long minBound, long maxBound) {
+ public void updateResultFromPageData(
+ IBatchDataIterator batchIterator, long minBound, long maxBound) {
int cnt = 0;
- while (dataInThisPage.hasCurrent()) {
- if (dataInThisPage.currentTime() >= maxBound || dataInThisPage.currentTime() < minBound) {
+ while (batchIterator.hasNext()) {
+ if (batchIterator.currentTime() >= maxBound || batchIterator.currentTime() < minBound) {
break;
}
cnt++;
- dataInThisPage.next();
+ batchIterator.next();
}
setLongValue(getLongValue() + cnt);
}
diff --git a/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/ExtremeAggrResult.java b/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/ExtremeAggrResult.java
index fe235e3..144fe00 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/ExtremeAggrResult.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/ExtremeAggrResult.java
@@ -25,7 +25,7 @@ import org.apache.iotdb.db.query.reader.series.IReaderByTimestamp;
import org.apache.iotdb.tsfile.exception.write.UnSupportedDataTypeException;
import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
import org.apache.iotdb.tsfile.file.metadata.statistics.Statistics;
-import org.apache.iotdb.tsfile.read.common.BatchData;
+import org.apache.iotdb.tsfile.read.common.IBatchDataIterator;
import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;
import java.io.IOException;
@@ -93,20 +93,21 @@ public class ExtremeAggrResult extends AggregateResult {
}
@Override
- public void updateResultFromPageData(BatchData dataInThisPage) {
- updateResultFromPageData(dataInThisPage, Long.MIN_VALUE, Long.MAX_VALUE);
+ public void updateResultFromPageData(IBatchDataIterator batchIterator) {
+ updateResultFromPageData(batchIterator, Long.MIN_VALUE, Long.MAX_VALUE);
}
@Override
- public void updateResultFromPageData(BatchData dataInThisPage, long minBound, long maxBound) {
+ public void updateResultFromPageData(
+ IBatchDataIterator batchIterator, long minBound, long maxBound) {
Comparable<Object> extVal = null;
- while (dataInThisPage.hasCurrent()
- && dataInThisPage.currentTime() < maxBound
- && dataInThisPage.currentTime() >= minBound) {
+ while (batchIterator.hasNext()
+ && batchIterator.currentTime() < maxBound
+ && batchIterator.currentTime() >= minBound) {
- extVal = getExtremeValue(extVal, (Comparable<Object>) dataInThisPage.currentValue());
- dataInThisPage.next();
+ extVal = getExtremeValue(extVal, (Comparable<Object>) batchIterator.currentValue());
+ batchIterator.next();
}
updateResult(extVal);
}
diff --git a/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/FirstValueAggrResult.java b/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/FirstValueAggrResult.java
index 56ce8c5..f8a2e51 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/FirstValueAggrResult.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/FirstValueAggrResult.java
@@ -24,7 +24,7 @@ import org.apache.iotdb.db.query.aggregation.AggregationType;
import org.apache.iotdb.db.query.reader.series.IReaderByTimestamp;
import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
import org.apache.iotdb.tsfile.file.metadata.statistics.Statistics;
-import org.apache.iotdb.tsfile.read.common.BatchData;
+import org.apache.iotdb.tsfile.read.common.IBatchDataIterator;
import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;
import java.io.IOException;
@@ -64,27 +64,28 @@ public class FirstValueAggrResult extends AggregateResult {
}
@Override
- public void updateResultFromPageData(BatchData dataInThisPage) {
+ public void updateResultFromPageData(IBatchDataIterator batchIterator) {
if (hasFinalResult()) {
return;
}
- if (dataInThisPage.hasCurrent()) {
- setValue(dataInThisPage.currentValue());
- timestamp = dataInThisPage.currentTime();
+ if (batchIterator.hasNext()) {
+ setValue(batchIterator.currentValue());
+ timestamp = batchIterator.currentTime();
}
}
@Override
- public void updateResultFromPageData(BatchData dataInThisPage, long minBound, long maxBound) {
+ public void updateResultFromPageData(
+ IBatchDataIterator batchIterator, long minBound, long maxBound) {
if (hasFinalResult()) {
return;
}
- if (dataInThisPage.hasCurrent()
- && dataInThisPage.currentTime() < maxBound
- && dataInThisPage.currentTime() >= minBound) {
- setValue(dataInThisPage.currentValue());
- timestamp = dataInThisPage.currentTime();
- dataInThisPage.next();
+ if (batchIterator.hasNext()
+ && batchIterator.currentTime() < maxBound
+ && batchIterator.currentTime() >= minBound) {
+ setValue(batchIterator.currentValue());
+ timestamp = batchIterator.currentTime();
+ batchIterator.next();
}
}
diff --git a/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/FirstValueDescAggrResult.java b/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/FirstValueDescAggrResult.java
index 8eae923..dc20617 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/FirstValueDescAggrResult.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/FirstValueDescAggrResult.java
@@ -22,7 +22,7 @@ package org.apache.iotdb.db.query.aggregation.impl;
import org.apache.iotdb.db.query.reader.series.IReaderByTimestamp;
import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
import org.apache.iotdb.tsfile.file.metadata.statistics.Statistics;
-import org.apache.iotdb.tsfile.read.common.BatchData;
+import org.apache.iotdb.tsfile.read.common.IBatchDataIterator;
import java.io.IOException;
@@ -40,13 +40,14 @@ public class FirstValueDescAggrResult extends FirstValueAggrResult {
}
@Override
- public void updateResultFromPageData(BatchData dataInThisPage, long minBound, long maxBound) {
- while (dataInThisPage.hasCurrent()
- && dataInThisPage.currentTime() < maxBound
- && dataInThisPage.currentTime() >= minBound) {
- setValue(dataInThisPage.currentValue());
- timestamp = dataInThisPage.currentTime();
- dataInThisPage.next();
+ public void updateResultFromPageData(
+ IBatchDataIterator batchIterator, long minBound, long maxBound) {
+ while (batchIterator.hasNext()
+ && batchIterator.currentTime() < maxBound
+ && batchIterator.currentTime() >= minBound) {
+ setValue(batchIterator.currentValue());
+ timestamp = batchIterator.currentTime();
+ batchIterator.next();
}
}
diff --git a/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/LastValueAggrResult.java b/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/LastValueAggrResult.java
index ad06ace..443751c 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/LastValueAggrResult.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/LastValueAggrResult.java
@@ -24,7 +24,7 @@ import org.apache.iotdb.db.query.aggregation.AggregationType;
import org.apache.iotdb.db.query.reader.series.IReaderByTimestamp;
import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
import org.apache.iotdb.tsfile.file.metadata.statistics.Statistics;
-import org.apache.iotdb.tsfile.read.common.BatchData;
+import org.apache.iotdb.tsfile.read.common.IBatchDataIterator;
import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;
import java.io.IOException;
@@ -60,20 +60,21 @@ public class LastValueAggrResult extends AggregateResult {
}
@Override
- public void updateResultFromPageData(BatchData dataInThisPage) {
- updateResultFromPageData(dataInThisPage, Long.MIN_VALUE, Long.MAX_VALUE);
+ public void updateResultFromPageData(IBatchDataIterator batchIterator) {
+ updateResultFromPageData(batchIterator, Long.MIN_VALUE, Long.MAX_VALUE);
}
@Override
- public void updateResultFromPageData(BatchData dataInThisPage, long minBound, long maxBound) {
+ public void updateResultFromPageData(
+ IBatchDataIterator batchIterator, long minBound, long maxBound) {
long time = Long.MIN_VALUE;
Object lastVal = null;
- while (dataInThisPage.hasCurrent()
- && dataInThisPage.currentTime() < maxBound
- && dataInThisPage.currentTime() >= minBound) {
- time = dataInThisPage.currentTime();
- lastVal = dataInThisPage.currentValue();
- dataInThisPage.next();
+ while (batchIterator.hasNext()
+ && batchIterator.currentTime() < maxBound
+ && batchIterator.currentTime() >= minBound) {
+ time = batchIterator.currentTime();
+ lastVal = batchIterator.currentValue();
+ batchIterator.next();
}
if (time != Long.MIN_VALUE) {
diff --git a/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/LastValueDescAggrResult.java b/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/LastValueDescAggrResult.java
index 587c7a3..981167b 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/LastValueDescAggrResult.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/LastValueDescAggrResult.java
@@ -21,7 +21,7 @@ package org.apache.iotdb.db.query.aggregation.impl;
import org.apache.iotdb.db.query.reader.series.IReaderByTimestamp;
import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
import org.apache.iotdb.tsfile.file.metadata.statistics.Statistics;
-import org.apache.iotdb.tsfile.read.common.BatchData;
+import org.apache.iotdb.tsfile.read.common.IBatchDataIterator;
import java.io.IOException;
@@ -42,18 +42,19 @@ public class LastValueDescAggrResult extends LastValueAggrResult {
}
@Override
- public void updateResultFromPageData(BatchData dataInThisPage, long minBound, long maxBound) {
+ public void updateResultFromPageData(
+ IBatchDataIterator batchIterator, long minBound, long maxBound) {
if (hasFinalResult()) {
return;
}
long time = Long.MIN_VALUE;
Object lastVal = null;
- if (dataInThisPage.hasCurrent()
- && dataInThisPage.currentTime() < maxBound
- && dataInThisPage.currentTime() >= minBound) {
- time = dataInThisPage.currentTime();
- lastVal = dataInThisPage.currentValue();
- dataInThisPage.next();
+ if (batchIterator.hasNext()
+ && batchIterator.currentTime() < maxBound
+ && batchIterator.currentTime() >= minBound) {
+ time = batchIterator.currentTime();
+ lastVal = batchIterator.currentValue();
+ batchIterator.next();
}
if (time != Long.MIN_VALUE) {
diff --git a/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/MaxTimeAggrResult.java b/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/MaxTimeAggrResult.java
index fad90de..46ebe15 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/MaxTimeAggrResult.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/MaxTimeAggrResult.java
@@ -24,7 +24,7 @@ import org.apache.iotdb.db.query.aggregation.AggregationType;
import org.apache.iotdb.db.query.reader.series.IReaderByTimestamp;
import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
import org.apache.iotdb.tsfile.file.metadata.statistics.Statistics;
-import org.apache.iotdb.tsfile.read.common.BatchData;
+import org.apache.iotdb.tsfile.read.common.IBatchDataIterator;
import java.io.IOException;
import java.io.OutputStream;
@@ -49,17 +49,18 @@ public class MaxTimeAggrResult extends AggregateResult {
}
@Override
- public void updateResultFromPageData(BatchData dataInThisPage) {
- updateResultFromPageData(dataInThisPage, Long.MIN_VALUE, Long.MAX_VALUE);
+ public void updateResultFromPageData(IBatchDataIterator batchIterator) {
+ updateResultFromPageData(batchIterator, Long.MIN_VALUE, Long.MAX_VALUE);
}
@Override
- public void updateResultFromPageData(BatchData dataInThisPage, long minBound, long maxBound) {
- while (dataInThisPage.hasCurrent()
- && dataInThisPage.currentTime() < maxBound
- && dataInThisPage.currentTime() >= minBound) {
- updateMaxTimeResult(dataInThisPage.currentTime());
- dataInThisPage.next();
+ public void updateResultFromPageData(
+ IBatchDataIterator batchIterator, long minBound, long maxBound) {
+ while (batchIterator.hasNext()
+ && batchIterator.currentTime() < maxBound
+ && batchIterator.currentTime() >= minBound) {
+ updateMaxTimeResult(batchIterator.currentTime());
+ batchIterator.next();
}
}
diff --git a/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/MaxTimeDescAggrResult.java b/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/MaxTimeDescAggrResult.java
index a69dcc9..e867bf7 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/MaxTimeDescAggrResult.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/MaxTimeDescAggrResult.java
@@ -20,7 +20,7 @@ package org.apache.iotdb.db.query.aggregation.impl;
import org.apache.iotdb.db.query.reader.series.IReaderByTimestamp;
import org.apache.iotdb.tsfile.file.metadata.statistics.Statistics;
-import org.apache.iotdb.tsfile.read.common.BatchData;
+import org.apache.iotdb.tsfile.read.common.IBatchDataIterator;
import java.io.IOException;
@@ -35,14 +35,15 @@ public class MaxTimeDescAggrResult extends MaxTimeAggrResult {
}
@Override
- public void updateResultFromPageData(BatchData dataInThisPage, long minBound, long maxBound) {
+ public void updateResultFromPageData(
+ IBatchDataIterator batchIterator, long minBound, long maxBound) {
if (hasFinalResult()) {
return;
}
- if (dataInThisPage.hasCurrent()
- && dataInThisPage.currentTime() < maxBound
- && dataInThisPage.currentTime() >= minBound) {
- updateMaxTimeResult(dataInThisPage.currentTime());
+ if (batchIterator.hasNext()
+ && batchIterator.currentTime() < maxBound
+ && batchIterator.currentTime() >= minBound) {
+ updateMaxTimeResult(batchIterator.currentTime());
}
}
diff --git a/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/MaxValueAggrResult.java b/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/MaxValueAggrResult.java
index 34d9622..a61583b 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/MaxValueAggrResult.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/MaxValueAggrResult.java
@@ -24,7 +24,7 @@ import org.apache.iotdb.db.query.aggregation.AggregationType;
import org.apache.iotdb.db.query.reader.series.IReaderByTimestamp;
import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
import org.apache.iotdb.tsfile.file.metadata.statistics.Statistics;
-import org.apache.iotdb.tsfile.read.common.BatchData;
+import org.apache.iotdb.tsfile.read.common.IBatchDataIterator;
import java.io.IOException;
import java.io.OutputStream;
@@ -49,21 +49,22 @@ public class MaxValueAggrResult extends AggregateResult {
}
@Override
- public void updateResultFromPageData(BatchData dataInThisPage) {
- updateResultFromPageData(dataInThisPage, Long.MIN_VALUE, Long.MAX_VALUE);
+ public void updateResultFromPageData(IBatchDataIterator batchIterator) {
+ updateResultFromPageData(batchIterator, Long.MIN_VALUE, Long.MAX_VALUE);
}
@Override
- public void updateResultFromPageData(BatchData dataInThisPage, long minBound, long maxBound) {
+ public void updateResultFromPageData(
+ IBatchDataIterator batchIterator, long minBound, long maxBound) {
Comparable<Object> maxVal = null;
- while (dataInThisPage.hasCurrent()
- && dataInThisPage.currentTime() < maxBound
- && dataInThisPage.currentTime() >= minBound) {
- if (maxVal == null || maxVal.compareTo(dataInThisPage.currentValue()) < 0) {
- maxVal = (Comparable<Object>) dataInThisPage.currentValue();
+ while (batchIterator.hasNext()
+ && batchIterator.currentTime() < maxBound
+ && batchIterator.currentTime() >= minBound) {
+ if (maxVal == null || maxVal.compareTo(batchIterator.currentValue()) < 0) {
+ maxVal = (Comparable<Object>) batchIterator.currentValue();
}
- dataInThisPage.next();
+ batchIterator.next();
}
updateResult(maxVal);
}
diff --git a/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/MinTimeAggrResult.java b/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/MinTimeAggrResult.java
index 37883d5..4d0365f 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/MinTimeAggrResult.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/MinTimeAggrResult.java
@@ -24,7 +24,7 @@ import org.apache.iotdb.db.query.aggregation.AggregationType;
import org.apache.iotdb.db.query.reader.series.IReaderByTimestamp;
import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
import org.apache.iotdb.tsfile.file.metadata.statistics.Statistics;
-import org.apache.iotdb.tsfile.read.common.BatchData;
+import org.apache.iotdb.tsfile.read.common.IBatchDataIterator;
import java.io.IOException;
import java.io.OutputStream;
@@ -52,19 +52,20 @@ public class MinTimeAggrResult extends AggregateResult {
}
@Override
- public void updateResultFromPageData(BatchData dataInThisPage) {
- updateResultFromPageData(dataInThisPage, Long.MIN_VALUE, Long.MAX_VALUE);
+ public void updateResultFromPageData(IBatchDataIterator batchIterator) {
+ updateResultFromPageData(batchIterator, Long.MIN_VALUE, Long.MAX_VALUE);
}
@Override
- public void updateResultFromPageData(BatchData dataInThisPage, long minBound, long maxBound) {
+ public void updateResultFromPageData(
+ IBatchDataIterator batchIterator, long minBound, long maxBound) {
if (hasFinalResult()) {
return;
}
- if (dataInThisPage.hasCurrent()
- && dataInThisPage.currentTime() < maxBound
- && dataInThisPage.currentTime() >= minBound) {
- setLongValue(dataInThisPage.currentTime());
+ if (batchIterator.hasNext()
+ && batchIterator.currentTime() < maxBound
+ && batchIterator.currentTime() >= minBound) {
+ setLongValue(batchIterator.currentTime());
}
}
diff --git a/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/MinTimeDescAggrResult.java b/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/MinTimeDescAggrResult.java
index 6b45f14..9abceb5 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/MinTimeDescAggrResult.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/MinTimeDescAggrResult.java
@@ -20,7 +20,7 @@ package org.apache.iotdb.db.query.aggregation.impl;
import org.apache.iotdb.db.query.reader.series.IReaderByTimestamp;
import org.apache.iotdb.tsfile.file.metadata.statistics.Statistics;
-import org.apache.iotdb.tsfile.read.common.BatchData;
+import org.apache.iotdb.tsfile.read.common.IBatchDataIterator;
import java.io.IOException;
@@ -33,10 +33,11 @@ public class MinTimeDescAggrResult extends MinTimeAggrResult {
}
@Override
- public void updateResultFromPageData(BatchData dataInThisPage, long minBound, long maxBound) {
- while (dataInThisPage.hasCurrent() && dataInThisPage.currentTime() >= minBound) {
- setValue(dataInThisPage.currentTime());
- dataInThisPage.next();
+ public void updateResultFromPageData(
+ IBatchDataIterator batchIterator, long minBound, long maxBound) {
+ while (batchIterator.hasNext() && batchIterator.currentTime() >= minBound) {
+ setValue(batchIterator.currentTime());
+ batchIterator.next();
}
}
diff --git a/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/MinValueAggrResult.java b/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/MinValueAggrResult.java
index eefbbb1..f3c01ed 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/MinValueAggrResult.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/MinValueAggrResult.java
@@ -24,7 +24,7 @@ import org.apache.iotdb.db.query.aggregation.AggregationType;
import org.apache.iotdb.db.query.reader.series.IReaderByTimestamp;
import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
import org.apache.iotdb.tsfile.file.metadata.statistics.Statistics;
-import org.apache.iotdb.tsfile.read.common.BatchData;
+import org.apache.iotdb.tsfile.read.common.IBatchDataIterator;
import java.io.IOException;
import java.io.OutputStream;
@@ -49,17 +49,18 @@ public class MinValueAggrResult extends AggregateResult {
}
@Override
- public void updateResultFromPageData(BatchData dataInThisPage) {
- updateResultFromPageData(dataInThisPage, Long.MIN_VALUE, Long.MAX_VALUE);
+ public void updateResultFromPageData(IBatchDataIterator batchIterator) {
+ updateResultFromPageData(batchIterator, Long.MIN_VALUE, Long.MAX_VALUE);
}
@Override
- public void updateResultFromPageData(BatchData dataInThisPage, long minBound, long maxBound) {
- while (dataInThisPage.hasCurrent()
- && dataInThisPage.currentTime() < maxBound
- && dataInThisPage.currentTime() >= minBound) {
- updateResult((Comparable<Object>) dataInThisPage.currentValue());
- dataInThisPage.next();
+ public void updateResultFromPageData(
+ IBatchDataIterator batchIterator, long minBound, long maxBound) {
+ while (batchIterator.hasNext()
+ && batchIterator.currentTime() < maxBound
+ && batchIterator.currentTime() >= minBound) {
+ updateResult((Comparable<Object>) batchIterator.currentValue());
+ batchIterator.next();
}
}
diff --git a/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/SumAggrResult.java b/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/SumAggrResult.java
index 8a11502..2590d3d 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/SumAggrResult.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/SumAggrResult.java
@@ -27,7 +27,7 @@ import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
import org.apache.iotdb.tsfile.file.metadata.statistics.BooleanStatistics;
import org.apache.iotdb.tsfile.file.metadata.statistics.IntegerStatistics;
import org.apache.iotdb.tsfile.file.metadata.statistics.Statistics;
-import org.apache.iotdb.tsfile.read.common.BatchData;
+import org.apache.iotdb.tsfile.read.common.IBatchDataIterator;
import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;
import java.io.IOException;
@@ -62,18 +62,19 @@ public class SumAggrResult extends AggregateResult {
}
@Override
- public void updateResultFromPageData(BatchData dataInThisPage) {
- updateResultFromPageData(dataInThisPage, Long.MIN_VALUE, Long.MAX_VALUE);
+ public void updateResultFromPageData(IBatchDataIterator batchIterator) {
+ updateResultFromPageData(batchIterator, Long.MIN_VALUE, Long.MAX_VALUE);
}
@Override
- public void updateResultFromPageData(BatchData dataInThisPage, long minBound, long maxBound) {
- while (dataInThisPage.hasCurrent()) {
- if (dataInThisPage.currentTime() >= maxBound || dataInThisPage.currentTime() < minBound) {
+ public void updateResultFromPageData(
+ IBatchDataIterator batchIterator, long minBound, long maxBound) {
+ while (batchIterator.hasNext()) {
+ if (batchIterator.currentTime() >= maxBound || batchIterator.currentTime() < minBound) {
break;
}
- updateSum(dataInThisPage.currentValue());
- dataInThisPage.next();
+ updateSum(batchIterator.currentValue());
+ batchIterator.next();
}
}
diff --git a/server/src/main/java/org/apache/iotdb/db/query/dataset/AlignByDeviceDataSet.java b/server/src/main/java/org/apache/iotdb/db/query/dataset/AlignByDeviceDataSet.java
index 86008d1..7ab4095 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/dataset/AlignByDeviceDataSet.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/dataset/AlignByDeviceDataSet.java
@@ -35,6 +35,7 @@ import org.apache.iotdb.db.query.context.QueryContext;
import org.apache.iotdb.db.query.executor.IQueryRouter;
import org.apache.iotdb.db.service.IoTDB;
import org.apache.iotdb.rpc.RedirectException;
+import org.apache.iotdb.tsfile.common.constant.TsFileConstant;
import org.apache.iotdb.tsfile.exception.filter.QueryFilterOptimizationException;
import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
import org.apache.iotdb.tsfile.read.common.Field;
@@ -224,6 +225,11 @@ public class AlignByDeviceDataSet extends QueryDataSet {
return false;
}
+ /**
+ * Get all measurements under given device. For a vectorMeasurementSchema, we return its
+ * measurementId + all subMeasurement. e.g. schema: vector1[s1, s2], return ["vector1.s1",
+ * "vector1.s2"].
+ */
protected Set<String> getMeasurementsUnderGivenDevice(PartialPath device) throws IOException {
try {
Set<String> res = new HashSet<>();
@@ -233,7 +239,7 @@ public class AlignByDeviceDataSet extends QueryDataSet {
for (IMeasurementSchema schema : measurementSchemas) {
if (schema instanceof VectorMeasurementSchema) {
for (String subMeasurement : schema.getSubMeasurementsList()) {
- res.add(schema.getMeasurementId() + "." + subMeasurement);
+ res.add(schema.getMeasurementId() + TsFileConstant.PATH_SEPARATOR + subMeasurement);
}
} else {
res.add(schema.getMeasurementId());
@@ -246,6 +252,10 @@ public class AlignByDeviceDataSet extends QueryDataSet {
}
}
+ /**
+ * Attention. For a vectorPath(root.sg.d1.vector1.s1), device is root.sg.d1, measurement is
+ * "vector1.s1".
+ */
private PartialPath transformPath(PartialPath device, String measurement) throws IOException {
try {
PartialPath fullPath = new PartialPath(device.getFullPath(), measurement);
@@ -253,7 +263,8 @@ public class AlignByDeviceDataSet extends QueryDataSet {
if (schema instanceof MeasurementSchema) {
return fullPath;
} else {
- return new VectorPartialPath(fullPath.getDevice(), fullPath.getMeasurement());
+ String vectorPath = fullPath.getDevice();
+ return new VectorPartialPath(vectorPath, fullPath.getMeasurement());
}
} catch (MetadataException e) {
throw new IOException("Cannot get node from " + device, e);
diff --git a/server/src/main/java/org/apache/iotdb/db/query/dataset/groupby/LocalGroupByExecutor.java b/server/src/main/java/org/apache/iotdb/db/query/dataset/groupby/LocalGroupByExecutor.java
index b8cec3d..ef7704e 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/dataset/groupby/LocalGroupByExecutor.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/dataset/groupby/LocalGroupByExecutor.java
@@ -32,6 +32,7 @@ import org.apache.iotdb.db.query.reader.series.SeriesAggregateReader;
import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
import org.apache.iotdb.tsfile.file.metadata.statistics.Statistics;
import org.apache.iotdb.tsfile.read.common.BatchData;
+import org.apache.iotdb.tsfile.read.common.IBatchDataIterator;
import org.apache.iotdb.tsfile.read.common.TimeRange;
import org.apache.iotdb.tsfile.read.filter.basic.Filter;
import org.apache.iotdb.tsfile.utils.Pair;
@@ -133,19 +134,20 @@ public class LocalGroupByExecutor implements GroupByExecutor {
}
// lazy reset batch data for calculation
batchData.resetBatchData(lastReadCurArrayIndex, lastReadCurListIndex);
+ IBatchDataIterator batchIterator = batchData.getBatchDataIterator();
if (ascending) {
// skip points that cannot be calculated
- while (batchData.hasCurrent() && batchData.currentTime() < curStartTime) {
- batchData.next();
+ while (batchIterator.hasNext() && batchIterator.currentTime() < curStartTime) {
+ batchIterator.next();
}
} else {
- while (batchData.hasCurrent() && batchData.currentTime() >= curEndTime) {
- batchData.next();
+ while (batchIterator.hasNext() && batchIterator.currentTime() >= curEndTime) {
+ batchIterator.next();
}
}
- if (batchData.hasCurrent()) {
- result.updateResultFromPageData(batchData, curStartTime, curEndTime);
+ if (batchIterator.hasNext()) {
+ result.updateResultFromPageData(batchIterator, curStartTime, curEndTime);
}
}
lastReadCurArrayIndex = batchData.getReadCurArrayIndex();
diff --git a/server/src/main/java/org/apache/iotdb/db/query/executor/AggregationExecutor.java b/server/src/main/java/org/apache/iotdb/db/query/executor/AggregationExecutor.java
index c01f3c3..0b711f8 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/executor/AggregationExecutor.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/executor/AggregationExecutor.java
@@ -27,6 +27,7 @@ import org.apache.iotdb.db.engine.storagegroup.StorageGroupProcessor;
import org.apache.iotdb.db.exception.StorageEngineException;
import org.apache.iotdb.db.exception.query.QueryProcessException;
import org.apache.iotdb.db.metadata.PartialPath;
+import org.apache.iotdb.db.metadata.VectorPartialPath;
import org.apache.iotdb.db.qp.physical.crud.AggregationPlan;
import org.apache.iotdb.db.qp.physical.crud.QueryPlan;
import org.apache.iotdb.db.qp.physical.crud.RawDataQueryPlan;
@@ -40,12 +41,14 @@ import org.apache.iotdb.db.query.reader.series.IAggregateReader;
import org.apache.iotdb.db.query.reader.series.IReaderByTimestamp;
import org.apache.iotdb.db.query.reader.series.SeriesAggregateReader;
import org.apache.iotdb.db.query.reader.series.SeriesReaderByTimestamp;
+import org.apache.iotdb.db.query.reader.series.VectorSeriesAggregateReader;
import org.apache.iotdb.db.query.timegenerator.ServerTimeGenerator;
import org.apache.iotdb.db.utils.FilePathUtils;
import org.apache.iotdb.db.utils.QueryUtils;
import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
import org.apache.iotdb.tsfile.file.metadata.statistics.Statistics;
import org.apache.iotdb.tsfile.read.common.BatchData;
+import org.apache.iotdb.tsfile.read.common.IBatchDataIterator;
import org.apache.iotdb.tsfile.read.common.RowRecord;
import org.apache.iotdb.tsfile.read.expression.IExpression;
import org.apache.iotdb.tsfile.read.expression.impl.GlobalTimeExpression;
@@ -72,26 +75,25 @@ public class AggregationExecutor {
protected List<String> aggregations;
protected IExpression expression;
protected boolean ascending;
+ protected QueryContext context;
+ protected AggregateResult[] aggregateResultList;
/** aggregation batch calculation size. */
private int aggregateFetchSize;
- protected AggregationExecutor(AggregationPlan aggregationPlan) {
+ protected AggregationExecutor(QueryContext context, AggregationPlan aggregationPlan) {
this.selectedSeries = aggregationPlan.getDeduplicatedPaths();
this.dataTypes = aggregationPlan.getDeduplicatedDataTypes();
this.aggregations = aggregationPlan.getDeduplicatedAggregations();
this.expression = aggregationPlan.getExpression();
this.aggregateFetchSize = IoTDBDescriptor.getInstance().getConfig().getBatchSize();
this.ascending = aggregationPlan.isAscending();
+ this.context = context;
+ this.aggregateResultList = new AggregateResult[selectedSeries.size()];
}
- /**
- * execute aggregate function with only time filter or no filter.
- *
- * @param context query context
- */
- public QueryDataSet executeWithoutValueFilter(
- QueryContext context, AggregationPlan aggregationPlan)
+ /** execute aggregate function with only time filter or no filter. */
+ public QueryDataSet executeWithoutValueFilter(AggregationPlan aggregationPlan)
throws StorageEngineException, IOException, QueryProcessException {
Filter timeFilter = null;
@@ -102,18 +104,29 @@ public class AggregationExecutor {
// TODO use multi-thread
Map<PartialPath, List<Integer>> pathToAggrIndexesMap =
groupAggregationsBySeries(selectedSeries);
- AggregateResult[] aggregateResultList = new AggregateResult[selectedSeries.size()];
// TODO-Cluster: group the paths by storage group to reduce communications
List<StorageGroupProcessor> list =
StorageEngine.getInstance().mergeLock(new ArrayList<>(pathToAggrIndexesMap.keySet()));
+
+ // Attention: this method will REMOVE vector path from pathToAggrIndexesMap
+ Map<PartialPath, List<List<Integer>>> vectorPathIndexesMap =
+ groupVectorSeries(pathToAggrIndexesMap);
try {
for (Map.Entry<PartialPath, List<Integer>> entry : pathToAggrIndexesMap.entrySet()) {
+ PartialPath seriesPath = entry.getKey();
aggregateOneSeries(
- entry,
- aggregateResultList,
- aggregationPlan.getAllMeasurementsInDevice(entry.getKey().getDevice()),
- timeFilter,
- context);
+ seriesPath,
+ entry.getValue(),
+ aggregationPlan.getAllMeasurementsInDevice(seriesPath.getDevice()),
+ timeFilter);
+ }
+ for (Map.Entry<PartialPath, List<List<Integer>>> entry : vectorPathIndexesMap.entrySet()) {
+ VectorPartialPath vectorSeries = (VectorPartialPath) entry.getKey();
+ aggregateOneVectorSeries(
+ vectorSeries,
+ entry.getValue(),
+ aggregationPlan.getAllMeasurementsInDevice(vectorSeries.getDevice()),
+ timeFilter);
}
} finally {
StorageEngine.getInstance().mergeUnLock(list);
@@ -125,25 +138,20 @@ public class AggregationExecutor {
/**
* get aggregation result for one series
*
- * @param pathToAggrIndexes entry of path to aggregation indexes map
* @param timeFilter time filter
- * @param context query context
*/
protected void aggregateOneSeries(
- Map.Entry<PartialPath, List<Integer>> pathToAggrIndexes,
- AggregateResult[] aggregateResultList,
- Set<String> measurements,
- Filter timeFilter,
- QueryContext context)
+ PartialPath seriesPath,
+ List<Integer> indexes,
+ Set<String> allMeasurementsInDevice,
+ Filter timeFilter)
throws IOException, QueryProcessException, StorageEngineException {
List<AggregateResult> ascAggregateResultList = new ArrayList<>();
List<AggregateResult> descAggregateResultList = new ArrayList<>();
boolean[] isAsc = new boolean[aggregateResultList.length];
- PartialPath seriesPath = pathToAggrIndexes.getKey();
- TSDataType tsDataType = dataTypes.get(pathToAggrIndexes.getValue().get(0));
-
- for (int i : pathToAggrIndexes.getValue()) {
+ TSDataType tsDataType = dataTypes.get(indexes.get(0));
+ for (int i : indexes) {
// construct AggregateResult
AggregateResult aggregateResult =
AggregateResultFactory.getAggrResultByName(aggregations.get(i), tsDataType);
@@ -156,7 +164,7 @@ public class AggregationExecutor {
}
aggregateOneSeries(
seriesPath,
- measurements,
+ allMeasurementsInDevice,
context,
timeFilter,
tsDataType,
@@ -166,7 +174,7 @@ public class AggregationExecutor {
int ascIndex = 0;
int descIndex = 0;
- for (int i : pathToAggrIndexes.getValue()) {
+ for (int i : indexes) {
aggregateResultList[i] =
isAsc[i]
? ascAggregateResultList.get(ascIndex++)
@@ -174,6 +182,58 @@ public class AggregationExecutor {
}
}
+ protected void aggregateOneVectorSeries(
+ VectorPartialPath seriesPath,
+ List<List<Integer>> subIndexes,
+ Set<String> allMeasurementsInDevice,
+ Filter timeFilter)
+ throws IOException, QueryProcessException, StorageEngineException {
+ List<List<AggregateResult>> ascAggregateResultList = new ArrayList<>();
+ List<List<AggregateResult>> descAggregateResultList = new ArrayList<>();
+ boolean[] isAsc = new boolean[aggregateResultList.length];
+
+ for (List<Integer> subIndex : subIndexes) {
+ TSDataType tsDataType = dataTypes.get(subIndex.get(0));
+ List<AggregateResult> subAscResultList = new ArrayList<>();
+ List<AggregateResult> subDescResultList = new ArrayList<>();
+ for (int i : subIndex) {
+ // construct AggregateResult
+ AggregateResult aggregateResult =
+ AggregateResultFactory.getAggrResultByName(aggregations.get(i), tsDataType);
+ if (aggregateResult.isAscending()) {
+ subAscResultList.add(aggregateResult);
+ isAsc[i] = true;
+ } else {
+ subDescResultList.add(aggregateResult);
+ }
+ }
+ ascAggregateResultList.add(subAscResultList);
+ descAggregateResultList.add(subDescResultList);
+ }
+
+ aggregateOneVectorSeries(
+ seriesPath,
+ allMeasurementsInDevice,
+ context,
+ timeFilter,
+ TSDataType.VECTOR,
+ ascAggregateResultList,
+ descAggregateResultList,
+ null);
+
+ for (int i = 0; i < subIndexes.size(); i++) {
+ List<Integer> subIndex = subIndexes.get(i);
+ List<AggregateResult> subAscResultList = ascAggregateResultList.get(i);
+ List<AggregateResult> subDescResultList = descAggregateResultList.get(i);
+ int ascIndex = 0;
+ int descIndex = 0;
+ for (int index : subIndex) {
+ aggregateResultList[index] =
+ isAsc[index] ? subAscResultList.get(ascIndex++) : subDescResultList.get(descIndex++);
+ }
+ }
+ }
+
@SuppressWarnings("squid:S107")
public static void aggregateOneSeries(
PartialPath seriesPath,
@@ -225,6 +285,65 @@ public class AggregationExecutor {
}
}
+ public static void aggregateOneVectorSeries(
+ VectorPartialPath seriesPath,
+ Set<String> measurements,
+ QueryContext context,
+ Filter timeFilter,
+ TSDataType tsDataType,
+ List<List<AggregateResult>> ascAggregateResultList,
+ List<List<AggregateResult>> descAggregateResultList,
+ TsFileFilter fileFilter)
+ throws StorageEngineException, IOException, QueryProcessException {
+
+ // construct series reader without value filter
+ QueryDataSource queryDataSource =
+ QueryResourceManager.getInstance().getQueryDataSource(seriesPath, context, timeFilter);
+ if (fileFilter != null) {
+ QueryUtils.filterQueryDataSource(queryDataSource, fileFilter);
+ }
+ // update filter by TTL
+ timeFilter = queryDataSource.updateFilterUsingTTL(timeFilter);
+
+ if (!isAggregateResultEmpty(ascAggregateResultList)) {
+ VectorSeriesAggregateReader seriesReader =
+ new VectorSeriesAggregateReader(
+ seriesPath,
+ measurements,
+ tsDataType,
+ context,
+ queryDataSource,
+ timeFilter,
+ null,
+ null,
+ true);
+ aggregateFromVectorReader(seriesReader, ascAggregateResultList);
+ }
+ if (!isAggregateResultEmpty(descAggregateResultList)) {
+ VectorSeriesAggregateReader seriesReader =
+ new VectorSeriesAggregateReader(
+ seriesPath,
+ measurements,
+ tsDataType,
+ context,
+ queryDataSource,
+ timeFilter,
+ null,
+ null,
+ false);
+ aggregateFromVectorReader(seriesReader, descAggregateResultList);
+ }
+ }
+
+ private static boolean isAggregateResultEmpty(List<List<AggregateResult>> resultList) {
+ for (List<AggregateResult> result : resultList) {
+ if (!result.isEmpty()) {
+ return false;
+ }
+ }
+ return true;
+ }
+
@SuppressWarnings("squid:S3776") // Suppress high Cognitive Complexity warning
private static void aggregateFromReader(
IAggregateReader seriesReader, List<AggregateResult> aggregateResultList)
@@ -270,6 +389,69 @@ public class AggregationExecutor {
}
}
+ private static void aggregateFromVectorReader(
+ VectorSeriesAggregateReader seriesReader, List<List<AggregateResult>> aggregateResultList)
+ throws QueryProcessException, IOException {
+ int remainingToCalculate = 0;
+ List<boolean[]> isCalculatedArray = new ArrayList<>();
+ for (List<AggregateResult> subAggregateResults : aggregateResultList) {
+ remainingToCalculate += subAggregateResults.size();
+ boolean[] subCalculatedArray = new boolean[subAggregateResults.size()];
+ isCalculatedArray.add(subCalculatedArray);
+ }
+
+ while (seriesReader.hasNextFile()) {
+ // cal by file statistics
+ if (seriesReader.canUseCurrentFileStatistics()) {
+ while (seriesReader.hasNextSubSeries()) {
+ Statistics fileStatistics = seriesReader.currentFileStatistics();
+ remainingToCalculate =
+ aggregateStatistics(
+ aggregateResultList.get(seriesReader.getCurIndex()),
+ isCalculatedArray.get(seriesReader.getCurIndex()),
+ remainingToCalculate,
+ fileStatistics);
+ if (remainingToCalculate == 0) {
+ seriesReader.resetIndex();
+ return;
+ }
+ seriesReader.nextSeries();
+ }
+ seriesReader.skipCurrentFile();
+ continue;
+ }
+
+ while (seriesReader.hasNextChunk()) {
+ // cal by chunk statistics
+ if (seriesReader.canUseCurrentChunkStatistics()) {
+ while (seriesReader.hasNextSubSeries()) {
+ Statistics chunkStatistics = seriesReader.currentChunkStatistics();
+ remainingToCalculate =
+ aggregateStatistics(
+ aggregateResultList.get(seriesReader.getCurIndex()),
+ isCalculatedArray.get(seriesReader.getCurIndex()),
+ remainingToCalculate,
+ chunkStatistics);
+ if (remainingToCalculate == 0) {
+ seriesReader.resetIndex();
+ return;
+ }
+ seriesReader.nextSeries();
+ }
+ seriesReader.skipCurrentChunk();
+ continue;
+ }
+
+ remainingToCalculate =
+ aggregateVectorPages(
+ seriesReader, aggregateResultList, isCalculatedArray, remainingToCalculate);
+ if (remainingToCalculate == 0) {
+ return;
+ }
+ }
+ }
+ }
+
/** Aggregate each result in the list with the statistics */
private static int aggregateStatistics(
List<AggregateResult> aggregateResultList,
@@ -314,31 +496,87 @@ public class AggregationExecutor {
seriesReader.skipCurrentPage();
continue;
}
- BatchData nextOverlappedPageData = seriesReader.nextPage();
- for (int i = 0; i < aggregateResultList.size(); i++) {
- if (!isCalculatedArray[i]) {
- AggregateResult aggregateResult = aggregateResultList.get(i);
- aggregateResult.updateResultFromPageData(nextOverlappedPageData);
- nextOverlappedPageData.resetBatchData();
- if (aggregateResult.hasFinalResult()) {
- isCalculatedArray[i] = true;
- remainingToCalculate--;
- if (remainingToCalculate == 0) {
- return 0;
- }
+ IBatchDataIterator batchDataIterator = seriesReader.nextPage().getBatchDataIterator();
+ remainingToCalculate =
+ aggregateBatchData(
+ aggregateResultList, isCalculatedArray, remainingToCalculate, batchDataIterator);
+ }
+ return remainingToCalculate;
+ }
+
+ private static int aggregateVectorPages(
+ VectorSeriesAggregateReader seriesReader,
+ List<List<AggregateResult>> aggregateResultList,
+ List<boolean[]> isCalculatedArray,
+ int remainingToCalculate)
+ throws IOException, QueryProcessException {
+ while (seriesReader.hasNextPage()) {
+ // cal by page statistics
+ if (seriesReader.canUseCurrentPageStatistics()) {
+ while (seriesReader.hasNextSubSeries()) {
+ Statistics pageStatistic = seriesReader.currentPageStatistics();
+ remainingToCalculate =
+ aggregateStatistics(
+ aggregateResultList.get(seriesReader.getCurIndex()),
+ isCalculatedArray.get(seriesReader.getCurIndex()),
+ remainingToCalculate,
+ pageStatistic);
+ if (remainingToCalculate == 0) {
+ seriesReader.resetIndex();
+ return 0;
}
+ seriesReader.nextSeries();
}
+ seriesReader.skipCurrentPage();
+ continue;
+ }
+
+ BatchData nextOverlappedPageData = seriesReader.nextPage();
+ while (seriesReader.hasNextSubSeries()) {
+ int subIndex = seriesReader.getCurIndex();
+ IBatchDataIterator batchIterator = nextOverlappedPageData.getBatchDataIterator(subIndex);
+ remainingToCalculate =
+ aggregateBatchData(
+ aggregateResultList.get(subIndex),
+ isCalculatedArray.get(subIndex),
+ remainingToCalculate,
+ batchIterator);
+ if (remainingToCalculate == 0) {
+ seriesReader.resetIndex();
+ return 0;
+ }
+ seriesReader.nextSeries();
}
}
return remainingToCalculate;
}
- /**
- * execute aggregate function with value filter.
- *
- * @param context query context.
- */
- public QueryDataSet executeWithValueFilter(QueryContext context, AggregationPlan queryPlan)
+ private static int aggregateBatchData(
+ List<AggregateResult> aggregateResultList,
+ boolean[] isCalculatedArray,
+ int remainingToCalculate,
+ IBatchDataIterator batchIterator)
+ throws QueryProcessException, IOException {
+ int newRemainingToCalculate = remainingToCalculate;
+ for (int i = 0; i < aggregateResultList.size(); i++) {
+ if (!isCalculatedArray[i]) {
+ AggregateResult aggregateResult = aggregateResultList.get(i);
+ aggregateResult.updateResultFromPageData(batchIterator);
+ batchIterator.reset();
+ if (aggregateResult.hasFinalResult()) {
+ isCalculatedArray[i] = true;
+ remainingToCalculate--;
+ if (remainingToCalculate == 0) {
+ return newRemainingToCalculate;
+ }
+ }
+ }
+ }
+ return newRemainingToCalculate;
+ }
+
+ /** execute aggregate function with value filter. */
+ public QueryDataSet executeWithValueFilter(AggregationPlan queryPlan)
throws StorageEngineException, IOException, QueryProcessException {
optimizeLastElementFunc(queryPlan);
@@ -362,15 +600,13 @@ public class AggregationExecutor {
StorageEngine.getInstance().mergeUnLock(list);
}
- List<AggregateResult> aggregateResults = new ArrayList<>();
for (int i = 0; i < selectedSeries.size(); i++) {
- AggregateResult result =
+ aggregateResultList[i] =
AggregateResultFactory.getAggrResultByName(
aggregations.get(i), dataTypes.get(i), ascending);
- aggregateResults.add(result);
}
- aggregateWithValueFilter(aggregateResults, timestampGenerator, readerToAggrIndexesMap);
- return constructDataSet(aggregateResults, queryPlan);
+ aggregateWithValueFilter(timestampGenerator, readerToAggrIndexesMap);
+ return constructDataSet(Arrays.asList(aggregateResultList), queryPlan);
}
private void optimizeLastElementFunc(QueryPlan queryPlan) {
@@ -408,7 +644,6 @@ public class AggregationExecutor {
/** calculate aggregation result with value filter. */
private void aggregateWithValueFilter(
- List<AggregateResult> aggregateResults,
TimeGenerator timestampGenerator,
Map<IReaderByTimestamp, List<Integer>> readerToAggrIndexesMap)
throws IOException {
@@ -435,17 +670,16 @@ public class AggregationExecutor {
if (cached.get(pathId)) {
Object[] values = timestampGenerator.getValues(selectedSeries.get(pathId));
for (Integer i : entry.getValue()) {
- aggregateResults.get(i).updateResultUsingValues(timeArray, timeArrayLength, values);
+ aggregateResultList[i].updateResultUsingValues(timeArray, timeArrayLength, values);
}
} else {
if (entry.getValue().size() == 1) {
- aggregateResults
- .get(entry.getValue().get(0))
- .updateResultUsingTimestamps(timeArray, timeArrayLength, entry.getKey());
+ aggregateResultList[entry.getValue().get(0)].updateResultUsingTimestamps(
+ timeArray, timeArrayLength, entry.getKey());
} else {
Object[] values = entry.getKey().getValuesInTimestamps(timeArray, timeArrayLength);
for (Integer i : entry.getValue()) {
- aggregateResults.get(i).updateResultUsingValues(timeArray, timeArrayLength, values);
+ aggregateResultList[i].updateResultUsingValues(timeArray, timeArrayLength, values);
}
}
}
@@ -511,4 +745,35 @@ public class AggregationExecutor {
}
return pathToAggrIndexesMap;
}
+
+ /**
+ * Group all the subSensors of one vector into one VectorPartialPath and Remove vectorPartialPath
+ * from pathToAggrIndexesMap. For example, input map: vector1[s1] -> [1, 3], vector1[s2] -> [2,4],
+ * will return vector1[s1,s2], [[1,3], [2,4]]
+ */
+ private Map<PartialPath, List<List<Integer>>> groupVectorSeries(
+ Map<PartialPath, List<Integer>> pathToAggrIndexesMap) {
+ Map<PartialPath, List<List<Integer>>> result = new HashMap<>();
+ Map<String, VectorPartialPath> temp = new HashMap<>();
+
+ List<PartialPath> seriesPaths = new ArrayList<>(pathToAggrIndexesMap.keySet());
+ for (PartialPath seriesPath : seriesPaths) {
+ if (seriesPath instanceof VectorPartialPath) {
+ List<Integer> indexes = pathToAggrIndexesMap.remove(seriesPath);
+ VectorPartialPath groupPath = temp.get(seriesPath.getFullPath());
+ if (groupPath == null) {
+ groupPath = (VectorPartialPath) seriesPath.copy();
+ temp.put(seriesPath.getFullPath(), groupPath);
+ result.computeIfAbsent(groupPath, key -> new ArrayList<>()).add(indexes);
+ } else {
+ // groupPath is changed here so we update it
+ List<List<Integer>> subIndexes = result.remove(groupPath);
+ subIndexes.add(indexes);
+ groupPath.addSubSensor(((VectorPartialPath) seriesPath).getSubSensorsList());
+ result.put(groupPath, subIndexes);
+ }
+ }
+ }
+ return result;
+ }
}
diff --git a/server/src/main/java/org/apache/iotdb/db/query/executor/QueryRouter.java b/server/src/main/java/org/apache/iotdb/db/query/executor/QueryRouter.java
index d2ae91f..a134d8e 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/executor/QueryRouter.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/executor/QueryRouter.java
@@ -139,22 +139,23 @@ public class QueryRouter implements IQueryRouter {
aggregationPlan.setExpression(optimizedExpression);
- AggregationExecutor engineExecutor = getAggregationExecutor(aggregationPlan);
+ AggregationExecutor engineExecutor = getAggregationExecutor(context, aggregationPlan);
QueryDataSet dataSet = null;
if (optimizedExpression != null
&& optimizedExpression.getType() != ExpressionType.GLOBAL_TIME) {
- dataSet = engineExecutor.executeWithValueFilter(context, aggregationPlan);
+ dataSet = engineExecutor.executeWithValueFilter(aggregationPlan);
} else {
- dataSet = engineExecutor.executeWithoutValueFilter(context, aggregationPlan);
+ dataSet = engineExecutor.executeWithoutValueFilter(aggregationPlan);
}
return dataSet;
}
- protected AggregationExecutor getAggregationExecutor(AggregationPlan aggregationPlan) {
- return new AggregationExecutor(aggregationPlan);
+ protected AggregationExecutor getAggregationExecutor(
+ QueryContext context, AggregationPlan aggregationPlan) {
+ return new AggregationExecutor(context, aggregationPlan);
}
@Override
diff --git a/server/src/main/java/org/apache/iotdb/db/query/reader/series/IAggregateReader.java b/server/src/main/java/org/apache/iotdb/db/query/reader/series/IAggregateReader.java
index 867ad9d..c48b88e 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/reader/series/IAggregateReader.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/reader/series/IAggregateReader.java
@@ -37,7 +37,7 @@ public interface IAggregateReader {
boolean canUseCurrentChunkStatistics() throws IOException;
- Statistics currentChunkStatistics();
+ Statistics currentChunkStatistics() throws IOException;
void skipCurrentChunk();
diff --git a/server/src/main/java/org/apache/iotdb/db/query/reader/series/SeriesReader.java b/server/src/main/java/org/apache/iotdb/db/query/reader/series/SeriesReader.java
index ebd3db0..b6f2d3e 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/reader/series/SeriesReader.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/reader/series/SeriesReader.java
@@ -35,6 +35,8 @@ import org.apache.iotdb.db.utils.QueryUtils;
import org.apache.iotdb.db.utils.TestOnly;
import org.apache.iotdb.tsfile.file.metadata.IChunkMetadata;
import org.apache.iotdb.tsfile.file.metadata.ITimeSeriesMetadata;
+import org.apache.iotdb.tsfile.file.metadata.VectorChunkMetadata;
+import org.apache.iotdb.tsfile.file.metadata.VectorTimeSeriesMetadata;
import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
import org.apache.iotdb.tsfile.file.metadata.statistics.Statistics;
import org.apache.iotdb.tsfile.read.TimeValuePair;
@@ -43,6 +45,7 @@ import org.apache.iotdb.tsfile.read.common.BatchDataFactory;
import org.apache.iotdb.tsfile.read.filter.basic.Filter;
import org.apache.iotdb.tsfile.read.filter.basic.UnaryFilter;
import org.apache.iotdb.tsfile.read.reader.IPageReader;
+import org.apache.iotdb.tsfile.read.reader.page.VectorPageReader;
import java.io.IOException;
import java.io.Serializable;
@@ -273,6 +276,13 @@ public class SeriesReader {
return firstTimeSeriesMetadata.getStatistics();
}
+ Statistics currentFileStatistics(int index) throws IOException {
+ if (!(firstTimeSeriesMetadata instanceof VectorTimeSeriesMetadata)) {
+ throw new IOException("Can only get statistics by index from vectorTimeSeriesMetaData");
+ }
+ return ((VectorTimeSeriesMetadata) firstTimeSeriesMetadata).getStatistics(index);
+ }
+
boolean currentFileModified() throws IOException {
if (firstTimeSeriesMetadata == null) {
throw new IOException("no first file");
@@ -394,6 +404,13 @@ public class SeriesReader {
return firstChunkMetadata.getStatistics();
}
+ Statistics currentChunkStatistics(int index) throws IOException {
+ if (!(firstChunkMetadata instanceof VectorChunkMetadata)) {
+ throw new IOException("Can only get statistics by index from vectorChunkMetaData");
+ }
+ return ((VectorChunkMetadata) firstChunkMetadata).getStatistics(index);
+ }
+
boolean currentChunkModified() throws IOException {
if (firstChunkMetadata == null) {
throw new IOException("no first chunk");
@@ -615,6 +632,16 @@ public class SeriesReader {
return firstPageReader.getStatistics();
}
+ Statistics currentPageStatistics(int index) throws IOException {
+ if (firstPageReader == null) {
+ return null;
+ }
+ if (!(firstPageReader.isVectorPageReader())) {
+ throw new IOException("Can only get statistics by index from VectorPageReader");
+ }
+ return firstPageReader.getStatistics(index);
+ }
+
boolean currentPageModified() throws IOException {
if (firstPageReader == null) {
throw new IOException("no first page");
@@ -1023,10 +1050,21 @@ public class SeriesReader {
this.isSeq = isSeq;
}
+ public boolean isVectorPageReader() {
+ return data instanceof VectorPageReader;
+ }
+
Statistics getStatistics() {
return data.getStatistics();
}
+ Statistics getStatistics(int index) throws IOException {
+ if (!(data instanceof VectorPageReader)) {
+ throw new IOException("Can only get statistics by index from VectorPageReader");
+ }
+ return ((VectorPageReader) data).getStatistics(index);
+ }
+
BatchData getAllSatisfiedPageData(boolean ascending) throws IOException {
return data.getAllSatisfiedPageData(ascending);
}
diff --git a/server/src/main/java/org/apache/iotdb/db/query/reader/series/VectorSeriesAggregateReader.java b/server/src/main/java/org/apache/iotdb/db/query/reader/series/VectorSeriesAggregateReader.java
new file mode 100644
index 0000000..23b1243
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/query/reader/series/VectorSeriesAggregateReader.java
@@ -0,0 +1,177 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.query.reader.series;
+
+import org.apache.iotdb.db.engine.querycontext.QueryDataSource;
+import org.apache.iotdb.db.metadata.VectorPartialPath;
+import org.apache.iotdb.db.query.context.QueryContext;
+import org.apache.iotdb.db.query.filter.TsFileFilter;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+import org.apache.iotdb.tsfile.file.metadata.statistics.Statistics;
+import org.apache.iotdb.tsfile.read.common.BatchData;
+import org.apache.iotdb.tsfile.read.filter.basic.Filter;
+
+import java.io.IOException;
+import java.util.Set;
+
+public class VectorSeriesAggregateReader implements IAggregateReader {
+
+ private final SeriesReader seriesReader;
+ /**
+ * Used to locate the subSensor that we are traversing now. Use hasNextSubSeries() method to check
+ * if we have more sub series in one loop. And use nextSeries() method to move to next sub series.
+ */
+ private int curIndex = 0;
+
+ private final int subSensorSize;
+
+ public VectorSeriesAggregateReader(
+ VectorPartialPath seriesPath,
+ Set<String> allSensors,
+ TSDataType dataType,
+ QueryContext context,
+ QueryDataSource dataSource,
+ Filter timeFilter,
+ Filter valueFilter,
+ TsFileFilter fileFilter,
+ boolean ascending) {
+ this.seriesReader =
+ new SeriesReader(
+ seriesPath,
+ allSensors,
+ dataType,
+ context,
+ dataSource,
+ timeFilter,
+ valueFilter,
+ fileFilter,
+ ascending);
+ this.subSensorSize = seriesPath.getSubSensorsList().size();
+ }
+
+ @Override
+ public boolean isAscending() {
+ return seriesReader.getOrderUtils().getAscending();
+ }
+
+ @Override
+ public boolean hasNextFile() throws IOException {
+ return seriesReader.hasNextFile();
+ }
+
+ @Override
+ public boolean canUseCurrentFileStatistics() throws IOException {
+ Statistics fileStatistics = currentFileStatistics();
+ return !seriesReader.isFileOverlapped()
+ && containedByTimeFilter(fileStatistics)
+ && !seriesReader.currentFileModified();
+ }
+
+ @Override
+ public Statistics currentFileStatistics() throws IOException {
+ return seriesReader.currentFileStatistics(curIndex);
+ }
+
+ @Override
+ public void skipCurrentFile() {
+ seriesReader.skipCurrentFile();
+ }
+
+ @Override
+ public boolean hasNextChunk() throws IOException {
+ return seriesReader.hasNextChunk();
+ }
+
+ @Override
+ public boolean canUseCurrentChunkStatistics() throws IOException {
+ Statistics chunkStatistics = currentChunkStatistics();
+ return !seriesReader.isChunkOverlapped()
+ && containedByTimeFilter(chunkStatistics)
+ && !seriesReader.currentChunkModified();
+ }
+
+ @Override
+ public Statistics currentChunkStatistics() throws IOException {
+ return seriesReader.currentChunkStatistics(curIndex);
+ }
+
+ @Override
+ public void skipCurrentChunk() {
+ seriesReader.skipCurrentChunk();
+ }
+
+ @Override
+ public boolean hasNextPage() throws IOException {
+ return seriesReader.hasNextPage();
+ }
+
+ @Override
+ public boolean canUseCurrentPageStatistics() throws IOException {
+ Statistics currentPageStatistics = currentPageStatistics();
+ if (currentPageStatistics == null) {
+ return false;
+ }
+ return !seriesReader.isPageOverlapped()
+ && containedByTimeFilter(currentPageStatistics)
+ && !seriesReader.currentPageModified();
+ }
+
+ @Override
+ public Statistics currentPageStatistics() throws IOException {
+ return seriesReader.currentPageStatistics(curIndex);
+ }
+
+ @Override
+ public void skipCurrentPage() {
+ seriesReader.skipCurrentPage();
+ }
+
+ @Override
+ public BatchData nextPage() throws IOException {
+ return seriesReader.nextPage().flip();
+ }
+
+ private boolean containedByTimeFilter(Statistics statistics) {
+ Filter timeFilter = seriesReader.getTimeFilter();
+ return timeFilter == null
+ || timeFilter.containStartEndTime(statistics.getStartTime(), statistics.getEndTime());
+ }
+
+ public boolean hasNextSubSeries() {
+ if (getCurIndex() < subSensorSize) {
+ return true;
+ } else {
+ resetIndex();
+ return false;
+ }
+ }
+
+ public void nextSeries() {
+ curIndex++;
+ }
+
+ public int getCurIndex() {
+ return curIndex;
+ }
+
+ public void resetIndex() {
+ curIndex = 0;
+ }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/query/reader/universal/DescPriorityMergeReader.java b/server/src/main/java/org/apache/iotdb/db/query/reader/universal/DescPriorityMergeReader.java
index 06fd982..08ae2c2 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/reader/universal/DescPriorityMergeReader.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/reader/universal/DescPriorityMergeReader.java
@@ -37,16 +37,9 @@ public class DescPriorityMergeReader extends PriorityMergeReader {
});
}
- /**
- * @param reader
- * @param priority
- * @param endTime
- * @param queryContext
- * @throws IOException
- */
@Override
public void addReader(
- IPointReader reader, MergeReaderPriority priority, long endTime, QueryContext queryContext)
+ IPointReader reader, MergeReaderPriority priority, long endTime, QueryContext context)
throws IOException {
if (reader.hasNextTimeValuePair()) {
heap.add(new Element(reader, reader.nextTimeValuePair(), priority));
diff --git a/server/src/test/java/org/apache/iotdb/db/integration/IoTDBAlignByDeviceIT.java b/server/src/test/java/org/apache/iotdb/db/integration/IoTDBAlignByDeviceIT.java
index 2333032..f144216 100644
--- a/server/src/test/java/org/apache/iotdb/db/integration/IoTDBAlignByDeviceIT.java
+++ b/server/src/test/java/org/apache/iotdb/db/integration/IoTDBAlignByDeviceIT.java
@@ -667,24 +667,6 @@ public class IoTDBAlignByDeviceIT {
}
@Test
- public void errorCaseTest1() throws ClassNotFoundException {
- Class.forName(Config.JDBC_DRIVER_NAME);
- try (Connection connection =
- DriverManager.getConnection(
- Config.IOTDB_URL_PREFIX + "127.0.0.1:6667/", "root", "root");
- Statement statement = connection.createStatement()) {
- statement.execute("select d0.s1, d0.s2, d1.s0 from root.vehicle align by device");
- fail("No exception thrown.");
- } catch (Exception e) {
- Assert.assertTrue(
- e.getMessage()
- .contains(
- "The paths of the SELECT clause can only be single level. In other words, "
- + "the paths of the SELECT clause can only be measurements or STAR, without DOT."));
- }
- }
-
- @Test
public void errorCaseTest3() throws ClassNotFoundException {
Class.forName(Config.JDBC_DRIVER_NAME);
try (Connection connection =
diff --git a/server/src/test/java/org/apache/iotdb/db/query/reader/series/SeriesAggregateReaderTest.java b/server/src/test/java/org/apache/iotdb/db/query/reader/series/SeriesAggregateReaderTest.java
index a77d592..2bd0a08 100644
--- a/server/src/test/java/org/apache/iotdb/db/query/reader/series/SeriesAggregateReaderTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/query/reader/series/SeriesAggregateReaderTest.java
@@ -31,7 +31,7 @@ import org.apache.iotdb.db.query.factory.AggregateResultFactory;
import org.apache.iotdb.tsfile.exception.write.WriteProcessException;
import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
import org.apache.iotdb.tsfile.file.metadata.statistics.Statistics;
-import org.apache.iotdb.tsfile.read.common.BatchData;
+import org.apache.iotdb.tsfile.read.common.IBatchDataIterator;
import org.apache.iotdb.tsfile.write.schema.MeasurementSchema;
import org.junit.After;
@@ -120,10 +120,10 @@ public class SeriesAggregateReaderTest {
}
while (seriesReader.hasNextPage()) {
- BatchData nextOverlappedPageData = seriesReader.nextPage();
- aggregateResult.updateResultFromPageData(nextOverlappedPageData);
- nextOverlappedPageData.resetBatchData();
- assertTrue(nextOverlappedPageData.hasCurrent());
+ IBatchDataIterator batchDataIterator = seriesReader.nextPage().getBatchDataIterator();
+ aggregateResult.updateResultFromPageData(batchDataIterator);
+ batchDataIterator.reset();
+ assertTrue(batchDataIterator.hasNext());
}
loopTime++;
}
diff --git a/session/src/test/java/org/apache/iotdb/session/IoTDBSessionVectorABDeviceIT.java b/session/src/test/java/org/apache/iotdb/session/IoTDBSessionVectorABDeviceIT.java
new file mode 100644
index 0000000..963b17a2
--- /dev/null
+++ b/session/src/test/java/org/apache/iotdb/session/IoTDBSessionVectorABDeviceIT.java
@@ -0,0 +1,231 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.session;
+
+import org.apache.iotdb.db.conf.IoTDBConfig;
+import org.apache.iotdb.db.conf.IoTDBDescriptor;
+import org.apache.iotdb.db.engine.compaction.CompactionStrategy;
+import org.apache.iotdb.db.utils.EnvironmentUtils;
+import org.apache.iotdb.rpc.IoTDBConnectionException;
+import org.apache.iotdb.rpc.StatementExecutionException;
+import org.apache.iotdb.tsfile.file.metadata.enums.CompressionType;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSEncoding;
+import org.apache.iotdb.tsfile.read.common.RowRecord;
+
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.fail;
+
+public class IoTDBSessionVectorABDeviceIT {
+ private static final String ROOT_SG1_D1_VECTOR1 = "root.sg1.d1.vector1";
+ private static final String ROOT_SG1_D1 = "root.sg1.d1";
+ private static final IoTDBConfig CONFIG = IoTDBDescriptor.getInstance().getConfig();
+ private static Session session;
+
+ @BeforeClass
+ public static void setUp() throws Exception {
+ CONFIG.setCompactionStrategy(CompactionStrategy.NO_COMPACTION);
+ EnvironmentUtils.envSetUp();
+ session = new Session("127.0.0.1", 6667, "root", "root");
+ session.open();
+ createAlignedTimeseries();
+ prepareAlignedTimeSeriesData();
+ prepareNonAlignedTimeSeriesData();
+ }
+
+ @AfterClass
+ public static void tearDown() throws Exception {
+ session.close();
+ EnvironmentUtils.cleanEnv();
+ CONFIG.setCompactionStrategy(CompactionStrategy.LEVEL_COMPACTION);
+ }
+
+ @Test
+ public void subMeasurementAlignByDeviceTest() {
+ try {
+ SessionDataSet dataSet =
+ session.executeQueryStatement(
+ "select vector1.s1, vector1.s2 from root.sg1.d1 limit 1 align by device");
+ assertEquals(4, dataSet.getColumnNames().size());
+ assertEquals("Time", dataSet.getColumnNames().get(0));
+ assertEquals("Device", dataSet.getColumnNames().get(1));
+ assertEquals("vector1.s1", dataSet.getColumnNames().get(2));
+ assertEquals("vector1.s2", dataSet.getColumnNames().get(3));
+ while (dataSet.hasNext()) {
+ RowRecord rowRecord = dataSet.next();
+ assertEquals(1, rowRecord.getTimestamp());
+ assertEquals("root.sg1.d1", rowRecord.getFields().get(0).getBinaryV().toString());
+ assertEquals(2, rowRecord.getFields().get(1).getLongV());
+ assertEquals(3, rowRecord.getFields().get(2).getLongV());
+ dataSet.next();
+ }
+
+ dataSet.closeOperationHandle();
+ } catch (IoTDBConnectionException | StatementExecutionException e) {
+ e.printStackTrace();
+ fail(e.getMessage());
+ }
+ }
+
+ @Test
+ public void vectorAlignByDeviceTest() {
+ try {
+ SessionDataSet dataSet =
+ session.executeQueryStatement(
+ "select vector1.* from root.sg1.d1 limit 1 align by device");
+ assertEquals(4, dataSet.getColumnNames().size());
+ assertEquals("Time", dataSet.getColumnNames().get(0));
+ assertEquals("Device", dataSet.getColumnNames().get(1));
+ assertEquals("vector1.s1", dataSet.getColumnNames().get(2));
+ assertEquals("vector1.s2", dataSet.getColumnNames().get(3));
+ while (dataSet.hasNext()) {
+ RowRecord rowRecord = dataSet.next();
+ assertEquals(1, rowRecord.getTimestamp());
+ assertEquals("root.sg1.d1", rowRecord.getFields().get(0).getBinaryV().toString());
+ assertEquals(2, rowRecord.getFields().get(1).getLongV());
+ assertEquals(3, rowRecord.getFields().get(2).getLongV());
+ dataSet.next();
+ }
+
+ dataSet.closeOperationHandle();
+ } catch (IoTDBConnectionException | StatementExecutionException e) {
+ e.printStackTrace();
+ fail(e.getMessage());
+ }
+ }
+
+ @Test
+ /** Ignore until the tablet interface. */
+ public void vectorAlignByDeviceWithWildcardTest() {
+ try {
+ SessionDataSet dataSet =
+ session.executeQueryStatement("select ** from root.sg1.d1 limit 1 align by device");
+ assertEquals(7, dataSet.getColumnNames().size());
+ assertEquals("Time", dataSet.getColumnNames().get(0));
+ assertEquals("Device", dataSet.getColumnNames().get(1));
+ assertEquals("s3", dataSet.getColumnNames().get(2));
+ assertEquals("s4", dataSet.getColumnNames().get(3));
+ assertEquals("s5", dataSet.getColumnNames().get(4));
+ assertEquals("vector1.s1", dataSet.getColumnNames().get(5));
+ assertEquals("vector1.s2", dataSet.getColumnNames().get(6));
+
+ while (dataSet.hasNext()) {
+ RowRecord rowRecord = dataSet.next();
+ assertEquals(1, rowRecord.getTimestamp());
+ assertEquals("root.sg1.d1", rowRecord.getFields().get(0).getBinaryV().toString());
+ assertEquals(2, rowRecord.getFields().get(4).getLongV());
+ assertEquals(3, rowRecord.getFields().get(5).getLongV());
+ assertEquals(4, rowRecord.getFields().get(1).getLongV());
+ assertEquals(5, rowRecord.getFields().get(2).getLongV());
+ assertEquals(6, rowRecord.getFields().get(3).getLongV());
+ dataSet.next();
+ }
+
+ dataSet.closeOperationHandle();
+ } catch (IoTDBConnectionException | StatementExecutionException e) {
+ e.printStackTrace();
+ fail(e.getMessage());
+ }
+ }
+
+ @Test
+ public void vectorAggregationAlignByDeviceTest() {
+ try {
+ SessionDataSet dataSet =
+ session.executeQueryStatement("select count(vector1.*) from root.sg1.d1 align by device");
+ assertEquals(3, dataSet.getColumnNames().size());
+ assertEquals("Device", dataSet.getColumnNames().get(0));
+ assertEquals("count(vector1.s1)", dataSet.getColumnNames().get(1));
+ assertEquals("count(vector1.s2)", dataSet.getColumnNames().get(2));
+ while (dataSet.hasNext()) {
+ RowRecord rowRecord = dataSet.next();
+ assertEquals("root.sg1.d1", rowRecord.getFields().get(0).getBinaryV().toString());
+ assertEquals(100, rowRecord.getFields().get(1).getLongV());
+ assertEquals(100, rowRecord.getFields().get(2).getLongV());
+ dataSet.next();
+ }
+
+ dataSet.closeOperationHandle();
+ } catch (IoTDBConnectionException | StatementExecutionException e) {
+ e.printStackTrace();
+ fail(e.getMessage());
+ }
+ }
+
+ private static void createAlignedTimeseries()
+ throws StatementExecutionException, IoTDBConnectionException {
+ List<String> measurements = new ArrayList<>();
+ for (int i = 1; i <= 2; i++) {
+ measurements.add("s" + i);
+ }
+ List<TSDataType> dataTypes = new ArrayList<>();
+ dataTypes.add(TSDataType.INT64);
+ dataTypes.add(TSDataType.INT64);
+ List<TSEncoding> encodings = new ArrayList<>();
+ for (int i = 1; i <= 2; i++) {
+ encodings.add(TSEncoding.RLE);
+ }
+ session.createAlignedTimeseries(
+ ROOT_SG1_D1_VECTOR1, measurements, dataTypes, encodings, CompressionType.SNAPPY, null);
+ }
+
+ private static void prepareAlignedTimeSeriesData()
+ throws StatementExecutionException, IoTDBConnectionException {
+ List<String> measurements = new ArrayList<>();
+ List<TSDataType> types = new ArrayList<>();
+ measurements.add("s1");
+ measurements.add("s2");
+ types.add(TSDataType.INT64);
+ types.add(TSDataType.INT64);
+
+ for (long time = 1; time <= 100; time++) {
+ List<Object> values = new ArrayList<>();
+ values.add(time + 1);
+ values.add(time + 2);
+ session.insertAlignedRecord(ROOT_SG1_D1_VECTOR1, time, measurements, types, values);
+ }
+ }
+
+ private static void prepareNonAlignedTimeSeriesData()
+ throws IoTDBConnectionException, StatementExecutionException {
+ List<String> measurements = new ArrayList<>();
+ List<TSDataType> types = new ArrayList<>();
+ measurements.add("s3");
+ measurements.add("s4");
+ measurements.add("s5");
+ types.add(TSDataType.INT64);
+ types.add(TSDataType.INT64);
+ types.add(TSDataType.INT64);
+
+ for (long time = 1; time <= 100; time++) {
+ List<Object> values = new ArrayList<>();
+ values.add(time + 3L);
+ values.add(time + 4L);
+ values.add(time + 5L);
+ session.insertRecord(ROOT_SG1_D1, time, measurements, types, values);
+ }
+ }
+}
diff --git a/session/src/test/java/org/apache/iotdb/session/IoTDBSessionVectorAggregationIT.java b/session/src/test/java/org/apache/iotdb/session/IoTDBSessionVectorAggregationIT.java
new file mode 100644
index 0000000..597061b
--- /dev/null
+++ b/session/src/test/java/org/apache/iotdb/session/IoTDBSessionVectorAggregationIT.java
@@ -0,0 +1,266 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.session;
+
+import org.apache.iotdb.db.conf.IoTDBConfig;
+import org.apache.iotdb.db.conf.IoTDBDescriptor;
+import org.apache.iotdb.db.engine.compaction.CompactionStrategy;
+import org.apache.iotdb.db.utils.EnvironmentUtils;
+import org.apache.iotdb.rpc.IoTDBConnectionException;
+import org.apache.iotdb.rpc.StatementExecutionException;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+import org.apache.iotdb.tsfile.read.common.RowRecord;
+import org.apache.iotdb.tsfile.write.record.Tablet;
+import org.apache.iotdb.tsfile.write.schema.IMeasurementSchema;
+import org.apache.iotdb.tsfile.write.schema.VectorMeasurementSchema;
+
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.fail;
+
+public class IoTDBSessionVectorAggregationIT {
+
+ private static final String ROOT_SG1_D1_VECTOR1 = "root.sg1.d1.vector1";
+ private static final String ROOT_SG1_D1 = "root.sg1.d1";
+ private static final IoTDBConfig CONFIG = IoTDBDescriptor.getInstance().getConfig();
+ private static Session session;
+
+ @BeforeClass
+ public static void setUp() throws Exception {
+ CONFIG.setCompactionStrategy(CompactionStrategy.NO_COMPACTION);
+ EnvironmentUtils.envSetUp();
+ session = new Session("127.0.0.1", 6667, "root", "root");
+ session.open();
+ prepareAlignedTimeseriesData();
+ prepareNonAlignedTimeSeriesData();
+ }
+
+ @AfterClass
+ public static void tearDown() throws Exception {
+ session.close();
+ EnvironmentUtils.cleanEnv();
+ CONFIG.setCompactionStrategy(CompactionStrategy.LEVEL_COMPACTION);
+ }
+
+ @Test
+ public void vectorAggregationCountTest() {
+ try {
+ SessionDataSet dataSet =
+ session.executeQueryStatement("select count(s1), count(s2) from root.sg1.d1.vector1");
+ assertEquals(2, dataSet.getColumnNames().size());
+ assertEquals("count(" + ROOT_SG1_D1_VECTOR1 + ".s1)", dataSet.getColumnNames().get(0));
+ assertEquals("count(" + ROOT_SG1_D1_VECTOR1 + ".s2)", dataSet.getColumnNames().get(1));
+ while (dataSet.hasNext()) {
+ RowRecord rowRecord = dataSet.next();
+ assertEquals(100, rowRecord.getFields().get(0).getLongV());
+ assertEquals(100, rowRecord.getFields().get(1).getLongV());
+ dataSet.next();
+ }
+
+ dataSet.closeOperationHandle();
+ } catch (IoTDBConnectionException | StatementExecutionException e) {
+ e.printStackTrace();
+ fail(e.getMessage());
+ }
+ }
+
+ @Test
+ public void vectorAggregationSumAvgTest() {
+ try {
+ SessionDataSet dataSet =
+ session.executeQueryStatement("select sum(s1), avg(s2) from root.sg1.d1.vector1");
+ assertEquals(2, dataSet.getColumnNames().size());
+ assertEquals("sum(" + ROOT_SG1_D1_VECTOR1 + ".s1)", dataSet.getColumnNames().get(0));
+ assertEquals("avg(" + ROOT_SG1_D1_VECTOR1 + ".s2)", dataSet.getColumnNames().get(1));
+ while (dataSet.hasNext()) {
+ RowRecord rowRecord = dataSet.next();
+ assertEquals(5150, rowRecord.getFields().get(0).getDoubleV(), 0.01);
+ assertEquals(52.5, rowRecord.getFields().get(1).getDoubleV(), 0.01);
+ dataSet.next();
+ }
+
+ dataSet.closeOperationHandle();
+ } catch (IoTDBConnectionException | StatementExecutionException e) {
+ e.printStackTrace();
+ fail(e.getMessage());
+ }
+ }
+
+ @Test
+ public void vectorAggregationMinMaxTimeTest() {
+ try {
+ SessionDataSet dataSet =
+ session.executeQueryStatement(
+ "select min_time(s1), min_time(s2), max_time(s1), max_time(s2) from root.sg1.d1.vector1");
+ assertEquals(4, dataSet.getColumnNames().size());
+ assertEquals("min_time(" + ROOT_SG1_D1_VECTOR1 + ".s1)", dataSet.getColumnNames().get(0));
+ assertEquals("min_time(" + ROOT_SG1_D1_VECTOR1 + ".s2)", dataSet.getColumnNames().get(1));
+ assertEquals("max_time(" + ROOT_SG1_D1_VECTOR1 + ".s1)", dataSet.getColumnNames().get(2));
+ assertEquals("max_time(" + ROOT_SG1_D1_VECTOR1 + ".s2)", dataSet.getColumnNames().get(3));
+ while (dataSet.hasNext()) {
+ RowRecord rowRecord = dataSet.next();
+ assertEquals(1, rowRecord.getFields().get(0).getLongV());
+ assertEquals(1, rowRecord.getFields().get(1).getLongV());
+ assertEquals(100, rowRecord.getFields().get(2).getLongV());
+ assertEquals(100, rowRecord.getFields().get(3).getLongV());
+ dataSet.next();
+ }
+
+ dataSet.closeOperationHandle();
+ } catch (IoTDBConnectionException | StatementExecutionException e) {
+ e.printStackTrace();
+ fail(e.getMessage());
+ }
+ }
+
+ @Test
+ public void vectorAggregationMinMaxValueTest() {
+ try {
+ SessionDataSet dataSet =
+ session.executeQueryStatement(
+ "select min_value(s1), max_value(s2) from root.sg1.d1.vector1");
+ assertEquals(2, dataSet.getColumnNames().size());
+ assertEquals("min_value(" + ROOT_SG1_D1_VECTOR1 + ".s1)", dataSet.getColumnNames().get(0));
+ assertEquals("max_value(" + ROOT_SG1_D1_VECTOR1 + ".s2)", dataSet.getColumnNames().get(1));
+ while (dataSet.hasNext()) {
+ RowRecord rowRecord = dataSet.next();
+ assertEquals(2, rowRecord.getFields().get(0).getLongV());
+ assertEquals(102, rowRecord.getFields().get(1).getIntV());
+ dataSet.next();
+ }
+
+ dataSet.closeOperationHandle();
+ } catch (IoTDBConnectionException | StatementExecutionException e) {
+ e.printStackTrace();
+ fail(e.getMessage());
+ }
+ }
+
+ @Test
+ public void vectorAggregationFirstLastValueTest() {
+ try {
+ SessionDataSet dataSet =
+ session.executeQueryStatement(
+ "select first_value(s1), last_value(s2) from root.sg1.d1.vector1");
+ assertEquals(2, dataSet.getColumnNames().size());
+ assertEquals("first_value(" + ROOT_SG1_D1_VECTOR1 + ".s1)", dataSet.getColumnNames().get(0));
+ assertEquals("last_value(" + ROOT_SG1_D1_VECTOR1 + ".s2)", dataSet.getColumnNames().get(1));
+ while (dataSet.hasNext()) {
+ RowRecord rowRecord = dataSet.next();
+ assertEquals(2, rowRecord.getFields().get(0).getLongV());
+ assertEquals(102, rowRecord.getFields().get(1).getIntV());
+ dataSet.next();
+ }
+
+ dataSet.closeOperationHandle();
+ } catch (IoTDBConnectionException | StatementExecutionException e) {
+ e.printStackTrace();
+ fail(e.getMessage());
+ }
+ }
+
+ /** Test query vector time series and non aligned time series togther. */
+ @Test
+ public void vectorComplexTest() {
+ try {
+ SessionDataSet dataSet =
+ session.executeQueryStatement(
+ "select count(vector1.s1), max_value(s3), count(vector1.s2), min_time(s4) from root.sg1.d1");
+ assertEquals(4, dataSet.getColumnNames().size());
+ assertEquals("count(" + ROOT_SG1_D1_VECTOR1 + ".s1)", dataSet.getColumnNames().get(0));
+ assertEquals("max_value(" + ROOT_SG1_D1 + ".s3)", dataSet.getColumnNames().get(1));
+ assertEquals("count(" + ROOT_SG1_D1_VECTOR1 + ".s2)", dataSet.getColumnNames().get(2));
+ assertEquals("min_time(" + ROOT_SG1_D1 + ".s4)", dataSet.getColumnNames().get(3));
+ while (dataSet.hasNext()) {
+ RowRecord rowRecord = dataSet.next();
+ assertEquals(100, rowRecord.getFields().get(0).getLongV());
+ assertEquals(103, rowRecord.getFields().get(1).getLongV());
+ assertEquals(100, rowRecord.getFields().get(2).getLongV());
+ assertEquals(1, rowRecord.getFields().get(3).getLongV());
+ dataSet.next();
+ }
+
+ dataSet.closeOperationHandle();
+ } catch (IoTDBConnectionException | StatementExecutionException e) {
+ e.printStackTrace();
+ fail(e.getMessage());
+ }
+ }
+
+ /** Method 1 for insert tablet with aligned timeseries */
+ private static void prepareAlignedTimeseriesData()
+ throws IoTDBConnectionException, StatementExecutionException {
+ // The schema of measurements of one device
+ // only measurementId and data type in MeasurementSchema take effects in Tablet
+ List<IMeasurementSchema> schemaList = new ArrayList<>();
+ schemaList.add(
+ new VectorMeasurementSchema(
+ "vector1",
+ new String[] {"s1", "s2"},
+ new TSDataType[] {TSDataType.INT64, TSDataType.INT32}));
+
+ Tablet tablet = new Tablet(ROOT_SG1_D1_VECTOR1, schemaList);
+ tablet.setAligned(true);
+
+ for (long row = 1; row <= 100; row++) {
+ int rowIndex = tablet.rowSize++;
+ tablet.addTimestamp(rowIndex, row);
+ tablet.addValue(schemaList.get(0).getSubMeasurementsList().get(0), rowIndex, row + 1);
+ tablet.addValue(schemaList.get(0).getSubMeasurementsList().get(1), rowIndex, (int) (row + 2));
+
+ if (tablet.rowSize == tablet.getMaxRowNumber()) {
+ session.insertTablet(tablet, true);
+ tablet.reset();
+ }
+ }
+
+ if (tablet.rowSize != 0) {
+ session.insertTablet(tablet);
+ tablet.reset();
+ }
+ session.executeNonQueryStatement("flush");
+ }
+
+ private static void prepareNonAlignedTimeSeriesData()
+ throws IoTDBConnectionException, StatementExecutionException {
+ List<String> measurements = new ArrayList<>();
+ List<TSDataType> types = new ArrayList<>();
+ measurements.add("s3");
+ measurements.add("s4");
+ measurements.add("s5");
+ types.add(TSDataType.INT64);
+ types.add(TSDataType.INT64);
+ types.add(TSDataType.INT64);
+
+ for (long time = 1; time <= 100; time++) {
+ List<Object> values = new ArrayList<>();
+ values.add(time + 3L);
+ values.add(time + 4L);
+ values.add(time + 5L);
+ session.insertRecord(ROOT_SG1_D1, time, measurements, types, values);
+ }
+ }
+}
diff --git a/session/src/test/java/org/apache/iotdb/session/IoTDBSessionVectorAggregationWithUnSeqIT.java b/session/src/test/java/org/apache/iotdb/session/IoTDBSessionVectorAggregationWithUnSeqIT.java
new file mode 100644
index 0000000..c2fac60
--- /dev/null
+++ b/session/src/test/java/org/apache/iotdb/session/IoTDBSessionVectorAggregationWithUnSeqIT.java
@@ -0,0 +1,192 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.session;
+
+import org.apache.iotdb.db.conf.IoTDBConfig;
+import org.apache.iotdb.db.conf.IoTDBDescriptor;
+import org.apache.iotdb.db.engine.compaction.CompactionStrategy;
+import org.apache.iotdb.db.utils.EnvironmentUtils;
+import org.apache.iotdb.rpc.IoTDBConnectionException;
+import org.apache.iotdb.rpc.StatementExecutionException;
+import org.apache.iotdb.tsfile.file.metadata.enums.CompressionType;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSEncoding;
+import org.apache.iotdb.tsfile.read.common.RowRecord;
+
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.fail;
+
+public class IoTDBSessionVectorAggregationWithUnSeqIT {
+
+ private static final String ROOT_SG1_D1_VECTOR1 = "root.sg1.d1.vector1";
+ private static final IoTDBConfig CONFIG = IoTDBDescriptor.getInstance().getConfig();
+ private static Session session;
+
+ @BeforeClass
+ public static void setUp() throws Exception {
+ CONFIG.setCompactionStrategy(CompactionStrategy.NO_COMPACTION);
+ EnvironmentUtils.envSetUp();
+ session = new Session("127.0.0.1", 6667, "root", "root");
+ session.open();
+ createAlignedTimeseries();
+ prepareAlignedTimeseriesDataWithUnSeq();
+ }
+
+ @AfterClass
+ public static void tearDown() throws Exception {
+ session.close();
+ EnvironmentUtils.cleanEnv();
+ CONFIG.setCompactionStrategy(CompactionStrategy.LEVEL_COMPACTION);
+ }
+
+ @Test
+ public void vectorAggregationCountTest() {
+ try {
+ SessionDataSet dataSet =
+ session.executeQueryStatement("select count(s1), count(s2) from root.sg1.d1.vector1");
+ assertEquals(2, dataSet.getColumnNames().size());
+ assertEquals("count(" + ROOT_SG1_D1_VECTOR1 + ".s1)", dataSet.getColumnNames().get(0));
+ assertEquals("count(" + ROOT_SG1_D1_VECTOR1 + ".s2)", dataSet.getColumnNames().get(1));
+ while (dataSet.hasNext()) {
+ RowRecord rowRecord = dataSet.next();
+ assertEquals(200, rowRecord.getFields().get(0).getLongV());
+ assertEquals(200, rowRecord.getFields().get(1).getLongV());
+ dataSet.next();
+ }
+
+ dataSet.closeOperationHandle();
+ } catch (IoTDBConnectionException | StatementExecutionException e) {
+ e.printStackTrace();
+ fail(e.getMessage());
+ }
+ }
+
+ @Test
+ public void vectorAggregationMinMaxTimeTest() {
+ try {
+ SessionDataSet dataSet =
+ session.executeQueryStatement(
+ "select min_time(s1), max_time(s1), min_time(s2), max_time(s2) from root.sg1.d1.vector1");
+ assertEquals(4, dataSet.getColumnNames().size());
+ assertEquals("min_time(" + ROOT_SG1_D1_VECTOR1 + ".s1)", dataSet.getColumnNames().get(0));
+ assertEquals("max_time(" + ROOT_SG1_D1_VECTOR1 + ".s1)", dataSet.getColumnNames().get(1));
+ assertEquals("min_time(" + ROOT_SG1_D1_VECTOR1 + ".s2)", dataSet.getColumnNames().get(2));
+ assertEquals("max_time(" + ROOT_SG1_D1_VECTOR1 + ".s2)", dataSet.getColumnNames().get(3));
+ while (dataSet.hasNext()) {
+ RowRecord rowRecord = dataSet.next();
+ assertEquals(1, rowRecord.getFields().get(0).getLongV());
+ assertEquals(200, rowRecord.getFields().get(1).getLongV());
+ assertEquals(1, rowRecord.getFields().get(2).getLongV());
+ assertEquals(200, rowRecord.getFields().get(3).getLongV());
+ dataSet.next();
+ }
+
+ dataSet.closeOperationHandle();
+ } catch (IoTDBConnectionException | StatementExecutionException e) {
+ e.printStackTrace();
+ fail(e.getMessage());
+ }
+ }
+
+ @Test
+ public void vectorAggregationMinMaxValueTest() {
+ try {
+ SessionDataSet dataSet =
+ session.executeQueryStatement(
+ "select min_value(s1), min_value(s2), max_value(s1), max_value(s2) from root.sg1.d1.vector1");
+ assertEquals(4, dataSet.getColumnNames().size());
+ assertEquals("min_value(" + ROOT_SG1_D1_VECTOR1 + ".s1)", dataSet.getColumnNames().get(0));
+ assertEquals("min_value(" + ROOT_SG1_D1_VECTOR1 + ".s2)", dataSet.getColumnNames().get(1));
+ assertEquals("max_value(" + ROOT_SG1_D1_VECTOR1 + ".s1)", dataSet.getColumnNames().get(2));
+ assertEquals("max_value(" + ROOT_SG1_D1_VECTOR1 + ".s2)", dataSet.getColumnNames().get(3));
+ while (dataSet.hasNext()) {
+ RowRecord rowRecord = dataSet.next();
+ assertEquals(2, rowRecord.getFields().get(0).getLongV());
+ assertEquals(3, rowRecord.getFields().get(1).getLongV());
+ assertEquals(201, rowRecord.getFields().get(2).getLongV());
+ assertEquals(202, rowRecord.getFields().get(3).getLongV());
+ dataSet.next();
+ }
+
+ dataSet.closeOperationHandle();
+ } catch (IoTDBConnectionException | StatementExecutionException e) {
+ e.printStackTrace();
+ fail(e.getMessage());
+ }
+ }
+
+ private static void createAlignedTimeseries()
+ throws StatementExecutionException, IoTDBConnectionException {
+ List<String> measurements = new ArrayList<>();
+ for (int i = 1; i <= 2; i++) {
+ measurements.add("s" + i);
+ }
+ List<TSDataType> dataTypes = new ArrayList<>();
+ dataTypes.add(TSDataType.INT64);
+ dataTypes.add(TSDataType.INT64);
+ List<TSEncoding> encodings = new ArrayList<>();
+ for (int i = 1; i <= 2; i++) {
+ encodings.add(TSEncoding.RLE);
+ }
+ session.createAlignedTimeseries(
+ ROOT_SG1_D1_VECTOR1, measurements, dataTypes, encodings, CompressionType.SNAPPY, null);
+ }
+
+ private static void prepareAlignedTimeseriesDataWithUnSeq()
+ throws StatementExecutionException, IoTDBConnectionException {
+ List<String> measurements = new ArrayList<>();
+ List<TSDataType> types = new ArrayList<>();
+ measurements.add("s1");
+ measurements.add("s2");
+ types.add(TSDataType.INT64);
+ types.add(TSDataType.INT64);
+
+ for (long time = 1; time <= 200; time++) {
+ List<Object> values = new ArrayList<>();
+ values.add(time + 1);
+ values.add(time + 2);
+ session.insertAlignedRecord(ROOT_SG1_D1_VECTOR1, time, measurements, types, values);
+ }
+ session.executeNonQueryStatement("flush");
+
+ for (long time = 2; time <= 50; time += 2) {
+ List<Object> values = new ArrayList<>();
+ values.add(time + 1);
+ values.add(time + 2);
+ session.insertAlignedRecord(ROOT_SG1_D1_VECTOR1, time, measurements, types, values);
+ }
+ session.executeNonQueryStatement("flush");
+
+ for (long time = 150; time <= 200; time += 10) {
+ List<Object> values = new ArrayList<>();
+ values.add(time + 1);
+ values.add(time + 2);
+ session.insertAlignedRecord(ROOT_SG1_D1_VECTOR1, time, measurements, types, values);
+ }
+ session.executeNonQueryStatement("flush");
+ }
+}
diff --git a/session/src/test/java/org/apache/iotdb/session/IoTDBSessionVectorIT.java b/session/src/test/java/org/apache/iotdb/session/IoTDBSessionVectorInsertIT.java
similarity index 98%
rename from session/src/test/java/org/apache/iotdb/session/IoTDBSessionVectorIT.java
rename to session/src/test/java/org/apache/iotdb/session/IoTDBSessionVectorInsertIT.java
index 58cc28d..877ebb5 100644
--- a/session/src/test/java/org/apache/iotdb/session/IoTDBSessionVectorIT.java
+++ b/session/src/test/java/org/apache/iotdb/session/IoTDBSessionVectorInsertIT.java
@@ -16,6 +16,7 @@
* specific language governing permissions and limitations
* under the License.
*/
+
package org.apache.iotdb.session;
import org.apache.iotdb.db.conf.IoTDBConstant;
@@ -40,7 +41,7 @@ import static org.junit.Assert.assertEquals;
import static org.junit.Assert.fail;
/** use session interface to IT for vector timeseries insert and select Black-box Testing */
-public class IoTDBSessionVectorIT {
+public class IoTDBSessionVectorInsertIT {
private static final String ROOT_SG1_D1_VECTOR1 = "root.sg_1.d1.vector";
private static final String ROOT_SG1_D1 = "root.sg_1.d1";
private static final String ROOT_SG1_D2 = "root.sg_1.d2";
@@ -64,7 +65,7 @@ public class IoTDBSessionVectorIT {
}
@Test
- public void alignedTabletTest() {
+ public void testInsertAlignedTablet() {
try {
insertTabletWithAlignedTimeseriesMethod();
session.executeNonQueryStatement("flush");
@@ -89,7 +90,7 @@ public class IoTDBSessionVectorIT {
}
@Test
- public void alignedSingleSelectTest() {
+ public void testInsertAlignedRecord() {
try {
insertAlignedRecord(ROOT_SG1_D1_VECTOR1);
session.executeNonQueryStatement("flush");
@@ -111,7 +112,7 @@ public class IoTDBSessionVectorIT {
}
@Test
- public void alignedStringSingleSelectTest() {
+ public void testInsertAlignedStringRecord() {
try {
insertAlignedStringRecord(ROOT_SG1_D1_VECTOR1);
session.executeNonQueryStatement("flush");
@@ -133,7 +134,7 @@ public class IoTDBSessionVectorIT {
}
@Test
- public void alignedStringRecordsSingleSelectTest() {
+ public void testInsertAlignedStringRecords() {
try {
insertAlignedStringRecords(ROOT_SG1_D1_VECTOR1);
session.executeNonQueryStatement("flush");
@@ -155,7 +156,7 @@ public class IoTDBSessionVectorIT {
}
@Test
- public void alignedRecordsSingleSelectTest() {
+ public void testInsertAlignedRecords() {
try {
insertAlignedRecords(ROOT_SG1_D1_VECTOR1);
session.executeNonQueryStatement("flush");
@@ -177,7 +178,7 @@ public class IoTDBSessionVectorIT {
}
@Test
- public void alignedRecordsOfOneDeviceSingleSelectTest() {
+ public void testInsertAlignedRecordsOfOneDevice() {
try {
insertAlignedRecordsOfOneDevice(ROOT_SG1_D1_VECTOR1);
session.executeNonQueryStatement("flush");
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/file/metadata/VectorChunkMetadata.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/file/metadata/VectorChunkMetadata.java
index 6558da0..2bfda53 100644
--- a/tsfile/src/main/java/org/apache/iotdb/tsfile/file/metadata/VectorChunkMetadata.java
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/file/metadata/VectorChunkMetadata.java
@@ -49,6 +49,10 @@ public class VectorChunkMetadata implements IChunkMetadata {
: timeChunkMetadata.getStatistics();
}
+ public Statistics getStatistics(int index) {
+ return valueChunkMetadataList.get(index).getStatistics();
+ }
+
@Override
public boolean isModified() {
return timeChunkMetadata.isModified();
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/file/metadata/VectorTimeSeriesMetadata.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/file/metadata/VectorTimeSeriesMetadata.java
index 87e88af..c3d727f 100644
--- a/tsfile/src/main/java/org/apache/iotdb/tsfile/file/metadata/VectorTimeSeriesMetadata.java
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/file/metadata/VectorTimeSeriesMetadata.java
@@ -49,6 +49,10 @@ public class VectorTimeSeriesMetadata implements ITimeSeriesMetadata {
: timeseriesMetadata.getStatistics();
}
+ public Statistics getStatistics(int index) {
+ return valueTimeseriesMetadataList.get(index).getStatistics();
+ }
+
@Override
public boolean isModified() {
return timeseriesMetadata.isModified();
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/BatchData.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/BatchData.java
index 1005f34..4cb8f17 100644
--- a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/BatchData.java
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/BatchData.java
@@ -22,7 +22,7 @@ import org.apache.iotdb.tsfile.common.conf.TSFileConfig;
import org.apache.iotdb.tsfile.exception.write.UnSupportedDataTypeException;
import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
import org.apache.iotdb.tsfile.read.TimeValuePair;
-import org.apache.iotdb.tsfile.read.reader.BatchDataIterator;
+import org.apache.iotdb.tsfile.read.reader.IPointReader;
import org.apache.iotdb.tsfile.utils.Binary;
import org.apache.iotdb.tsfile.utils.TsPrimitiveType;
import org.apache.iotdb.tsfile.utils.TsPrimitiveType.TsBinary;
@@ -170,6 +170,10 @@ public class BatchData {
return dataType;
}
+ public void setDataType(TSDataType dataType) {
+ this.dataType = dataType;
+ }
+
public BatchDataType getBatchDataType() {
return batchDataType;
}
@@ -633,7 +637,12 @@ public class BatchData {
}
public BatchDataIterator getBatchDataIterator() {
- return new BatchDataIterator(this);
+ return new BatchDataIterator();
+ }
+
+ /** Only used for the batch data of vector time series. */
+ public IBatchDataIterator getBatchDataIterator(int subIndex) {
+ return new VectorBatchDataIterator(subIndex);
}
/**
@@ -692,4 +701,75 @@ public class BatchData {
}
}
}
+
+ private class BatchDataIterator implements IPointReader, IBatchDataIterator {
+
+ @Override
+ public boolean hasNext() {
+ return BatchData.this.hasCurrent();
+ }
+
+ @Override
+ public void next() {
+ BatchData.this.next();
+ }
+
+ @Override
+ public long currentTime() {
+ return BatchData.this.currentTime();
+ }
+
+ @Override
+ public Object currentValue() {
+ return BatchData.this.currentValue();
+ }
+
+ @Override
+ public void reset() {
+ BatchData.this.resetBatchData();
+ }
+
+ @Override
+ public int totalLength() {
+ return BatchData.this.length();
+ }
+
+ @Override
+ public boolean hasNextTimeValuePair() {
+ return hasNext();
+ }
+
+ @Override
+ public TimeValuePair nextTimeValuePair() {
+ TimeValuePair timeValuePair = new TimeValuePair(currentTime(), currentTsPrimitiveType());
+ next();
+ return timeValuePair;
+ }
+
+ @Override
+ public TimeValuePair currentTimeValuePair() {
+ return new TimeValuePair(currentTime(), currentTsPrimitiveType());
+ }
+
+ @Override
+ public void close() {}
+ }
+
+ private class VectorBatchDataIterator extends BatchDataIterator {
+
+ private final int subIndex;
+
+ private VectorBatchDataIterator(int subIndex) {
+ this.subIndex = subIndex;
+ }
+
+ @Override
+ public Object currentValue() {
+ if (dataType == TSDataType.VECTOR) {
+ return getVector()[subIndex].getValue();
+ } else {
+ return null;
+ }
+ }
+ }
}
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/DescReadWriteBatchData.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/DescReadWriteBatchData.java
index ed6e6c2..ca14066 100644
--- a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/DescReadWriteBatchData.java
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/DescReadWriteBatchData.java
@@ -21,6 +21,7 @@ package org.apache.iotdb.tsfile.read.common;
import org.apache.iotdb.tsfile.exception.write.UnSupportedDataTypeException;
import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
import org.apache.iotdb.tsfile.utils.Binary;
+import org.apache.iotdb.tsfile.utils.TsPrimitiveType;
import java.util.LinkedList;
@@ -70,6 +71,10 @@ public class DescReadWriteBatchData extends DescReadBatchData {
binaryRet = new LinkedList<>();
binaryRet.add(new Binary[capacity]);
break;
+ case VECTOR:
+ vectorRet = new LinkedList<>();
+ vectorRet.add(new TsPrimitiveType[capacity][]);
+ break;
default:
throw new UnSupportedDataTypeException(String.valueOf(dataType));
}
@@ -297,6 +302,43 @@ public class DescReadWriteBatchData extends DescReadBatchData {
count++;
}
+ /**
+ * put vector data.
+ *
+ * @param t timestamp
+ * @param v vector data.
+ */
+ @Override
+ public void putVector(long t, TsPrimitiveType[] v) {
+ if (writeCurArrayIndex == -1) {
+ if (capacity >= CAPACITY_THRESHOLD) {
+ ((LinkedList<long[]>) timeRet).addFirst(new long[capacity]);
+ ((LinkedList<TsPrimitiveType[][]>) vectorRet).addFirst(new TsPrimitiveType[capacity][]);
+ writeCurListIndex++;
+ writeCurArrayIndex = capacity - 1;
+ } else {
+ int newCapacity = capacity << 1;
+
+ long[] newTimeData = new long[newCapacity];
+ TsPrimitiveType[][] newValueData = new TsPrimitiveType[newCapacity][];
+
+ System.arraycopy(timeRet.get(0), 0, newTimeData, newCapacity - capacity, capacity);
+ System.arraycopy(vectorRet.get(0), 0, newValueData, newCapacity - capacity, capacity);
+
+ timeRet.set(0, newTimeData);
+ vectorRet.set(0, newValueData);
+
+ writeCurArrayIndex = newCapacity - capacity - 1;
+ capacity = newCapacity;
+ }
+ }
+ timeRet.get(0)[writeCurArrayIndex] = t;
+ vectorRet.get(0)[writeCurArrayIndex] = v;
+
+ writeCurArrayIndex--;
+ count++;
+ }
+
@Override
public boolean hasCurrent() {
return (readCurListIndex == 0 && readCurArrayIndex > writeCurArrayIndex)
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/IBatchDataIterator.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/IBatchDataIterator.java
new file mode 100644
index 0000000..756aaff
--- /dev/null
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/IBatchDataIterator.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.tsfile.read.common;
+
+public interface IBatchDataIterator {
+
+ boolean hasNext();
+
+ void next();
+
+ long currentTime();
+
+ Object currentValue();
+
+ void reset();
+
+ int totalLength();
+}
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/reader/BatchDataIterator.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/reader/BatchDataIterator.java
deleted file mode 100644
index ae02551..0000000
--- a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/reader/BatchDataIterator.java
+++ /dev/null
@@ -1,54 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.iotdb.tsfile.read.reader;
-
-import org.apache.iotdb.tsfile.read.TimeValuePair;
-import org.apache.iotdb.tsfile.read.common.BatchData;
-
-public class BatchDataIterator implements IPointReader {
-
- private BatchData batchData;
-
- public BatchDataIterator(BatchData batchData) {
- this.batchData = batchData;
- }
-
- @Override
- public boolean hasNextTimeValuePair() {
- return batchData.hasCurrent();
- }
-
- @Override
- public TimeValuePair nextTimeValuePair() {
- TimeValuePair timeValuePair =
- new TimeValuePair(batchData.currentTime(), batchData.currentTsPrimitiveType());
- batchData.next();
- return timeValuePair;
- }
-
- @Override
- public TimeValuePair currentTimeValuePair() {
- return new TimeValuePair(batchData.currentTime(), batchData.currentTsPrimitiveType());
- }
-
- @Override
- public void close() {
- batchData = null;
- }
-}
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/reader/page/VectorPageReader.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/reader/page/VectorPageReader.java
index 09a2ab7..4c5290c 100644
--- a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/reader/page/VectorPageReader.java
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/reader/page/VectorPageReader.java
@@ -101,7 +101,7 @@ public class VectorPageReader implements IPageReader {
pageData.putVector(timeBatch[i], v);
}
}
- return pageData;
+ return pageData.flip();
}
public void setDeleteIntervalList(List<List<TimeRange>> list) {
@@ -117,6 +117,10 @@ public class VectorPageReader implements IPageReader {
: timePageReader.getStatistics();
}
+ public Statistics getStatistics(int index) {
+ return valuePageReaderList.get(index).getStatistics();
+ }
+
@Override
public void setFilter(Filter filter) {
this.filter = filter;