You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by qi...@apache.org on 2019/12/28 02:34:39 UTC
[incubator-iotdb] branch master updated: [IOTDB-340] Remove
unnecessary getting data types from MManager (#677)
This is an automated email from the ASF dual-hosted git repository.
qiaojialin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new 2200115 [IOTDB-340] Remove unnecessary getting data types from MManager (#677)
2200115 is described below
commit 22001156bd39e7bde092bb56c6184d7eef2892fd
Author: Dawei Liu <at...@163.com>
AuthorDate: Sat Dec 28 10:34:27 2019 +0800
[IOTDB-340] Remove unnecessary getting data types from MManager (#677)
* [IOTDB-340] Remove unnecessary getting data types from MManager when querying
---
.../iotdb/hadoop/tsfile/TSFRecordReader.java | 22 +++--
.../qp/executor/AbstractQueryProcessExecutor.java | 97 +++++-----------------
.../db/qp/executor/IQueryProcessExecutor.java | 20 ++---
.../iotdb/db/qp/executor/QueryProcessExecutor.java | 39 +++++----
.../iotdb/db/qp/physical/crud/AggregationPlan.java | 13 +++
.../iotdb/db/qp/physical/crud/QueryPlan.java | 42 ++++++++++
.../iotdb/db/qp/strategy/PhysicalGenerator.java | 62 +++++++++++++-
.../db/query/dataset/DeviceIterateDataSet.java | 52 ++++++++----
.../dataset/groupby/GroupByEngineDataSet.java | 38 ++++-----
.../groupby/GroupByWithValueFilterDataSet.java | 36 ++++----
.../groupby/GroupByWithoutValueFilterDataSet.java | 24 +++---
.../db/query/executor/AggregateEngineExecutor.java | 35 ++++----
.../iotdb/db/query/executor/EngineExecutor.java | 59 ++++++-------
.../iotdb/db/query/executor/EngineQueryRouter.java | 68 ++++++++-------
.../db/query/executor/FillEngineExecutor.java | 16 ++--
.../db/query/executor/IEngineQueryRouter.java | 39 ++-------
.../db/engine/modification/DeletionQueryTest.java | 57 ++++++++++---
.../db/integration/IoTDBSequenceDataQueryIT.java | 90 +++++++++++++-------
.../iotdb/db/integration/IoTDBSeriesReaderIT.java | 93 ++++++++++++++-------
.../apache/iotdb/db/qp/utils/MemIntQpExecutor.java | 19 ++---
.../query/executor/GroupByEngineDataSetTest.java | 62 +++++++++++---
.../tsfile/read/expression/QueryExpression.java | 17 +++-
.../tsfile/read/query/dataset/QueryDataSet.java | 4 -
23 files changed, 617 insertions(+), 387 deletions(-)
diff --git a/hadoop/src/main/java/org/apache/iotdb/hadoop/tsfile/TSFRecordReader.java b/hadoop/src/main/java/org/apache/iotdb/hadoop/tsfile/TSFRecordReader.java
index 3c4d29c..0bf0353 100644
--- a/hadoop/src/main/java/org/apache/iotdb/hadoop/tsfile/TSFRecordReader.java
+++ b/hadoop/src/main/java/org/apache/iotdb/hadoop/tsfile/TSFRecordReader.java
@@ -18,8 +18,21 @@
*/
package org.apache.iotdb.hadoop.tsfile;
+import static java.util.stream.Collectors.toList;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.io.*;
+import org.apache.hadoop.io.BooleanWritable;
+import org.apache.hadoop.io.DoubleWritable;
+import org.apache.hadoop.io.FloatWritable;
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.MapWritable;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.RecordReader;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
@@ -35,13 +48,6 @@ import org.apache.iotdb.tsfile.read.query.dataset.QueryDataSet;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.List;
-
-import static java.util.stream.Collectors.toList;
-
/**
* @author Yuan Tian
*/
diff --git a/server/src/main/java/org/apache/iotdb/db/qp/executor/AbstractQueryProcessExecutor.java b/server/src/main/java/org/apache/iotdb/db/qp/executor/AbstractQueryProcessExecutor.java
index e0d768d..d137ca8 100644
--- a/server/src/main/java/org/apache/iotdb/db/qp/executor/AbstractQueryProcessExecutor.java
+++ b/server/src/main/java/org/apache/iotdb/db/qp/executor/AbstractQueryProcessExecutor.java
@@ -19,6 +19,17 @@
package org.apache.iotdb.db.qp.executor;
+import static org.apache.iotdb.db.conf.IoTDBConstant.COLUMN_ITEM;
+import static org.apache.iotdb.db.conf.IoTDBConstant.COLUMN_PARAMETER;
+import static org.apache.iotdb.db.conf.IoTDBConstant.COLUMN_STORAGE_GROUP;
+import static org.apache.iotdb.db.conf.IoTDBConstant.COLUMN_TTL;
+import static org.apache.iotdb.db.conf.IoTDBConstant.COLUMN_VALUE;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
import org.apache.iotdb.db.conf.IoTDBConstant;
import org.apache.iotdb.db.conf.IoTDBDescriptor;
import org.apache.iotdb.db.conf.adapter.CompressionRatio;
@@ -30,7 +41,11 @@ import org.apache.iotdb.db.exception.query.QueryProcessException;
import org.apache.iotdb.db.metadata.MManager;
import org.apache.iotdb.db.metadata.MNode;
import org.apache.iotdb.db.qp.physical.PhysicalPlan;
-import org.apache.iotdb.db.qp.physical.crud.*;
+import org.apache.iotdb.db.qp.physical.crud.AggregationPlan;
+import org.apache.iotdb.db.qp.physical.crud.DeletePlan;
+import org.apache.iotdb.db.qp.physical.crud.FillQueryPlan;
+import org.apache.iotdb.db.qp.physical.crud.GroupByPlan;
+import org.apache.iotdb.db.qp.physical.crud.QueryPlan;
import org.apache.iotdb.db.qp.physical.sys.AuthorPlan;
import org.apache.iotdb.db.qp.physical.sys.ShowPlan;
import org.apache.iotdb.db.qp.physical.sys.ShowTTLPlan;
@@ -45,18 +60,9 @@ import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
import org.apache.iotdb.tsfile.read.common.Field;
import org.apache.iotdb.tsfile.read.common.Path;
import org.apache.iotdb.tsfile.read.common.RowRecord;
-import org.apache.iotdb.tsfile.read.expression.QueryExpression;
import org.apache.iotdb.tsfile.read.query.dataset.QueryDataSet;
import org.apache.iotdb.tsfile.utils.Binary;
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Set;
-
-import static org.apache.iotdb.db.conf.IoTDBConstant.*;
-
public abstract class AbstractQueryProcessExecutor implements IQueryProcessExecutor {
IEngineQueryRouter queryRouter = new EngineQueryRouter();
@@ -209,35 +215,17 @@ public abstract class AbstractQueryProcessExecutor implements IQueryProcessExecu
if (queryPlan.isGroupByDevice()) {
queryDataSet = new DeviceIterateDataSet(queryPlan, context, queryRouter);
} else {
- // deduplicate executed paths and aggregations if exist
- List<Path> deduplicatedPaths = new ArrayList<>();
if (queryPlan instanceof GroupByPlan) {
GroupByPlan groupByPlan = (GroupByPlan) queryPlan;
- List<String> deduplicatedAggregations = new ArrayList<>();
- deduplicate(groupByPlan.getPaths(), groupByPlan.getAggregations(), deduplicatedPaths,
- deduplicatedAggregations);
- return groupBy(deduplicatedPaths, deduplicatedAggregations, groupByPlan.getExpression(),
- groupByPlan.getUnit(),
- groupByPlan.getSlidingStep(), groupByPlan.getStartTime(), groupByPlan.getEndTime(), context);
+ return groupBy(groupByPlan, context);
} else if (queryPlan instanceof AggregationPlan) {
- List<String> deduplicatedAggregations = new ArrayList<>();
- deduplicate(queryPlan.getPaths(), queryPlan.getAggregations(), deduplicatedPaths,
- deduplicatedAggregations);
- queryDataSet = aggregate(deduplicatedPaths, deduplicatedAggregations,
- queryPlan.getExpression(),
- context);
+ AggregationPlan aggregationPlan = (AggregationPlan) queryPlan;
+ queryDataSet = aggregate(aggregationPlan, context);
} else if (queryPlan instanceof FillQueryPlan) {
FillQueryPlan fillQueryPlan = (FillQueryPlan) queryPlan;
- deduplicate(queryPlan.getPaths(), deduplicatedPaths);
- queryDataSet = fill(deduplicatedPaths, fillQueryPlan.getQueryTime(),
- fillQueryPlan.getFillType(),
- context);
+ queryDataSet = fill(fillQueryPlan, context);
} else {
- deduplicate(queryPlan.getPaths(), deduplicatedPaths);
- QueryExpression queryExpression = QueryExpression.create()
- .setSelectSeries(deduplicatedPaths)
- .setExpression(queryPlan.getExpression());
- queryDataSet = queryRouter.query(queryExpression, context);
+ queryDataSet = queryRouter.query(queryPlan, context);
}
}
queryDataSet.setRowLimit(queryPlan.getRowLimit());
@@ -245,49 +233,6 @@ public abstract class AbstractQueryProcessExecutor implements IQueryProcessExecu
return queryDataSet;
}
- /**
- * Note that the deduplication strategy must be consistent with that of IoTDBQueryResultSet.
- */
- private void deduplicate(List<Path> paths, List<String> aggregations,
- List<Path> deduplicatedPaths,
- List<String> deduplicatedAggregations) throws QueryProcessException {
- if (paths == null || aggregations == null || deduplicatedPaths == null
- || deduplicatedAggregations == null) {
- throw new QueryProcessException("Parameters should not be null.");
- }
- if (paths.size() != aggregations.size()) {
- throw new QueryProcessException(
- "The size of the path list does not equal that of the aggregation list.");
- }
- Set<String> columnSet = new HashSet<>();
- for (int i = 0; i < paths.size(); i++) {
- String column = aggregations.get(i) + "(" + paths.get(i).toString() + ")";
- if (!columnSet.contains(column)) {
- deduplicatedPaths.add(paths.get(i));
- deduplicatedAggregations.add(aggregations.get(i));
- columnSet.add(column);
- }
- }
- }
-
- /**
- * Note that the deduplication strategy must be consistent with that of IoTDBQueryResultSet.
- */
- private void deduplicate(List<Path> paths, List<Path> deduplicatedPaths)
- throws QueryProcessException {
- if (paths == null || deduplicatedPaths == null) {
- throw new QueryProcessException("Parameters should not be null.");
- }
- Set<String> columnSet = new HashSet<>();
- for (Path path : paths) {
- String column = path.toString();
- if (!columnSet.contains(column)) {
- deduplicatedPaths.add(path);
- columnSet.add(column);
- }
- }
- }
-
@Override
public void delete(DeletePlan deletePlan) throws QueryProcessException {
try {
diff --git a/server/src/main/java/org/apache/iotdb/db/qp/executor/IQueryProcessExecutor.java b/server/src/main/java/org/apache/iotdb/db/qp/executor/IQueryProcessExecutor.java
index a7bc1b5..6ec80b0 100644
--- a/server/src/main/java/org/apache/iotdb/db/qp/executor/IQueryProcessExecutor.java
+++ b/server/src/main/java/org/apache/iotdb/db/qp/executor/IQueryProcessExecutor.java
@@ -26,8 +26,11 @@ import org.apache.iotdb.db.exception.StorageEngineException;
import org.apache.iotdb.db.exception.path.PathException;
import org.apache.iotdb.db.exception.query.QueryProcessException;
import org.apache.iotdb.db.qp.physical.PhysicalPlan;
+import org.apache.iotdb.db.qp.physical.crud.AggregationPlan;
import org.apache.iotdb.db.qp.physical.crud.BatchInsertPlan;
import org.apache.iotdb.db.qp.physical.crud.DeletePlan;
+import org.apache.iotdb.db.qp.physical.crud.FillQueryPlan;
+import org.apache.iotdb.db.qp.physical.crud.GroupByPlan;
import org.apache.iotdb.db.qp.physical.crud.InsertPlan;
import org.apache.iotdb.db.query.context.QueryContext;
import org.apache.iotdb.db.query.fill.IFill;
@@ -60,31 +63,28 @@ public interface IQueryProcessExecutor {
/**
* process aggregate plan of qp layer, construct queryDataSet.
*/
- QueryDataSet aggregate(List<Path> paths, List<String> aggres, IExpression expression,
- QueryContext context)
+ QueryDataSet aggregate(AggregationPlan aggregationPlan, QueryContext context)
throws IOException, QueryProcessException, StorageEngineException, QueryFilterOptimizationException;
/**
* process group by plan of qp layer, construct queryDataSet.
*/
- QueryDataSet groupBy(List<Path> paths, List<String> aggres, IExpression expression,
- long unit, long slidingStep, long startTime, long endTime, QueryContext context)
+ QueryDataSet groupBy(GroupByPlan groupByPlan, QueryContext context)
throws IOException, QueryProcessException, StorageEngineException, QueryFilterOptimizationException;
/**
* process fill plan of qp layer, construct queryDataSet.
*/
- QueryDataSet fill(List<Path> fillPaths, long queryTime, Map<TSDataType, IFill> fillTypes,
- QueryContext context)
+ QueryDataSet fill(FillQueryPlan fillQueryPlan, QueryContext context)
throws IOException, QueryProcessException, StorageEngineException;
/**
* execute update command and return whether the operator is successful.
*
- * @param path : update series seriesPath
+ * @param path : update series seriesPath
* @param startTime start time in update command
- * @param endTime end time in update command
- * @param value - in type of string
+ * @param endTime end time in update command
+ * @param value - in type of string
*/
void update(Path path, long startTime, long endTime, String value)
throws QueryProcessException;
@@ -99,7 +99,7 @@ public interface IQueryProcessExecutor {
/**
* execute delete command and return whether the operator is successful.
*
- * @param path : delete series seriesPath
+ * @param path : delete series seriesPath
* @param deleteTime end time in delete command
*/
void delete(Path path, long deleteTime) throws QueryProcessException;
diff --git a/server/src/main/java/org/apache/iotdb/db/qp/executor/QueryProcessExecutor.java b/server/src/main/java/org/apache/iotdb/db/qp/executor/QueryProcessExecutor.java
index f846f37..acc58e2 100644
--- a/server/src/main/java/org/apache/iotdb/db/qp/executor/QueryProcessExecutor.java
+++ b/server/src/main/java/org/apache/iotdb/db/qp/executor/QueryProcessExecutor.java
@@ -41,8 +41,11 @@ import org.apache.iotdb.db.qp.logical.sys.AuthorOperator;
import org.apache.iotdb.db.qp.logical.sys.AuthorOperator.AuthorType;
import org.apache.iotdb.db.qp.logical.sys.PropertyOperator;
import org.apache.iotdb.db.qp.physical.PhysicalPlan;
+import org.apache.iotdb.db.qp.physical.crud.AggregationPlan;
import org.apache.iotdb.db.qp.physical.crud.BatchInsertPlan;
import org.apache.iotdb.db.qp.physical.crud.DeletePlan;
+import org.apache.iotdb.db.qp.physical.crud.FillQueryPlan;
+import org.apache.iotdb.db.qp.physical.crud.GroupByPlan;
import org.apache.iotdb.db.qp.physical.crud.InsertPlan;
import org.apache.iotdb.db.qp.physical.crud.UpdatePlan;
import org.apache.iotdb.db.qp.physical.sys.*;
@@ -185,10 +188,11 @@ public class QueryProcessExecutor extends AbstractQueryProcessExecutor {
try {
// check file
RestorableTsFileIOWriter restorableTsFileIOWriter = new RestorableTsFileIOWriter(file);
- if (restorableTsFileIOWriter.hasCrashed()){
+ if (restorableTsFileIOWriter.hasCrashed()) {
restorableTsFileIOWriter.close();
throw new QueryProcessException(
- String.format("Cannot load file %s because the file has crashed.", file.getAbsolutePath()));
+ String.format("Cannot load file %s because the file has crashed.",
+ file.getAbsolutePath()));
}
Map<String, MeasurementSchema> schemaMap = new HashMap<>();
List<ChunkGroupMetaData> chunkGroupMetaData = new ArrayList<>();
@@ -205,7 +209,7 @@ public class QueryProcessExecutor extends AbstractQueryProcessExecutor {
}
//create schemas if they doesn't exist
- if(plan.isAutoCreateSchema()) {
+ if (plan.isAutoCreateSchema()) {
createSchemaAutomatically(chunkGroupMetaData, schemaMap, plan.getSgLevel());
}
@@ -299,25 +303,21 @@ public class QueryProcessExecutor extends AbstractQueryProcessExecutor {
}
@Override
- public QueryDataSet aggregate(List<Path> paths, List<String> aggres, IExpression expression,
- QueryContext context) throws StorageEngineException, QueryFilterOptimizationException,
- QueryProcessException, IOException {
- return queryRouter.aggregate(paths, aggres, expression, context);
+ public QueryDataSet aggregate(AggregationPlan aggregationPlan, QueryContext context)
+ throws StorageEngineException, QueryFilterOptimizationException, QueryProcessException, IOException {
+ return queryRouter.aggregate(aggregationPlan, context);
}
@Override
- public QueryDataSet fill(List<Path> fillPaths, long queryTime, Map<TSDataType, IFill> fillTypes,
- QueryContext context)
+ public QueryDataSet fill(FillQueryPlan fillQueryPlan, QueryContext context)
throws IOException, QueryProcessException, StorageEngineException {
- return queryRouter.fill(fillPaths, queryTime, fillTypes, context);
+ return queryRouter.fill(fillQueryPlan, context);
}
@Override
- public QueryDataSet groupBy(List<Path> paths, List<String> aggres, IExpression expression,
- long unit, long slidingStep, long startTime, long endTime,
- QueryContext context)
- throws StorageEngineException, QueryFilterOptimizationException, QueryProcessException, IOException {
- return queryRouter.groupBy(paths, aggres, expression, unit, slidingStep, startTime, endTime, context);
+ public QueryDataSet groupBy(GroupByPlan groupByPlan, QueryContext context)
+ throws StorageEngineException, QueryFilterOptimizationException, QueryProcessException, IOException {
+ return queryRouter.groupBy(groupByPlan, context);
}
@@ -370,7 +370,8 @@ public class QueryProcessExecutor extends AbstractQueryProcessExecutor {
if (!node.hasChild(measurement)) {
if (!IoTDBDescriptor.getInstance().getConfig().isAutoCreateSchemaEnabled()) {
throw new QueryProcessException(
- String.format("Current deviceId[%s] does not contain measurement:%s", deviceId, measurement));
+ String.format("Current deviceId[%s] does not contain measurement:%s", deviceId,
+ measurement));
}
try {
addPathToMTree(deviceId, measurement, strValue);
@@ -388,7 +389,8 @@ public class QueryProcessExecutor extends AbstractQueryProcessExecutor {
return measurementNode;
}
- private void checkPathExists(MNode node, String fullPath, MeasurementSchema schema, boolean autoCreateSchema)
+ private void checkPathExists(MNode node, String fullPath, MeasurementSchema schema,
+ boolean autoCreateSchema)
throws QueryProcessException, StorageEngineException, MetadataException {
// check if timeseries exists
String measurement = schema.getMeasurementId();
@@ -398,7 +400,8 @@ public class QueryProcessExecutor extends AbstractQueryProcessExecutor {
String.format("Path[%s] does not exist", fullPath));
}
try {
- addPathToMTree(fullPath, schema.getType(), schema.getEncodingType(), schema.getCompressor());
+ addPathToMTree(fullPath, schema.getType(), schema.getEncodingType(),
+ schema.getCompressor());
} catch (MetadataException e) {
if (!e.getMessage().contains("already exist")) {
throw e;
diff --git a/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/AggregationPlan.java b/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/AggregationPlan.java
index f5d9815..9d8a6c4 100644
--- a/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/AggregationPlan.java
+++ b/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/AggregationPlan.java
@@ -25,6 +25,7 @@ import org.apache.iotdb.db.qp.logical.Operator;
public class AggregationPlan extends QueryPlan {
private List<String> aggregations = new ArrayList<>();
+ private List<String> deduplicatedAggregations = new ArrayList<>();
public AggregationPlan() {
super();
@@ -39,4 +40,16 @@ public class AggregationPlan extends QueryPlan {
public void setAggregations(List<String> aggregations) {
this.aggregations = aggregations;
}
+
+ public List<String> getDeduplicatedAggregations() {
+ return deduplicatedAggregations;
+ }
+
+ public void addDeduplicatedAggregations(String aggregations) {
+ this.deduplicatedAggregations.add(aggregations);
+ }
+
+ public void setDeduplicatedAggregations(List<String> deduplicatedAggregations) {
+ this.deduplicatedAggregations = deduplicatedAggregations;
+ }
}
diff --git a/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/QueryPlan.java b/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/QueryPlan.java
index 44787b4..cd12d0e 100644
--- a/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/QueryPlan.java
+++ b/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/QueryPlan.java
@@ -18,6 +18,8 @@
*/
package org.apache.iotdb.db.qp.physical.crud;
+import java.util.ArrayList;
+import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
@@ -33,6 +35,11 @@ public class QueryPlan extends PhysicalPlan {
private List<Path> paths = null;
private List<TSDataType> dataTypes = null;
+
+ private List<Path> deduplicatedPaths = new ArrayList<>();
+ private List<TSDataType> deduplicatedDataTypes = new ArrayList<>();
+
+
private IExpression expression = null;
private int rowLimit = 0;
@@ -42,6 +49,7 @@ public class QueryPlan extends PhysicalPlan {
private List<String> measurementColumnList; // for group by device sql
private Map<String, Set<String>> measurementColumnsGroupByDevice; // for group by device sql
private Map<String, TSDataType> dataTypeConsistencyChecker; // for group by device sql
+ private Map<Path, TSDataType> dataTypeMapping = new HashMap<>(); // for group by device sql
public QueryPlan() {
super(true);
@@ -88,6 +96,22 @@ public class QueryPlan extends PhysicalPlan {
this.dataTypes = dataTypes;
}
+ public List<Path> getDeduplicatedPaths() {
+ return deduplicatedPaths;
+ }
+
+ public void addDeduplicatedPaths(Path path) {
+ this.deduplicatedPaths.add(path);
+ }
+
+ public List<TSDataType> getDeduplicatedDataTypes() {
+ return deduplicatedDataTypes;
+ }
+
+ public void addDeduplicatedDataTypes(TSDataType dataType) {
+ this.deduplicatedDataTypes.add(dataType);
+ }
+
public int getRowLimit() {
return rowLimit;
}
@@ -141,4 +165,22 @@ public class QueryPlan extends PhysicalPlan {
public Map<String, TSDataType> getDataTypeConsistencyChecker() {
return dataTypeConsistencyChecker;
}
+
+ public Map<Path, TSDataType> getDataTypeMapping() {
+ return dataTypeMapping;
+ }
+
+ public void addTypeMapping(Path path, TSDataType dataType) {
+ dataTypeMapping.put(path, dataType);
+ }
+
+ public void setDeduplicatedPaths(
+ List<Path> deduplicatedPaths) {
+ this.deduplicatedPaths = deduplicatedPaths;
+ }
+
+ public void setDeduplicatedDataTypes(
+ List<TSDataType> deduplicatedDataTypes) {
+ this.deduplicatedDataTypes = deduplicatedDataTypes;
+ }
}
diff --git a/server/src/main/java/org/apache/iotdb/db/qp/strategy/PhysicalGenerator.java b/server/src/main/java/org/apache/iotdb/db/qp/strategy/PhysicalGenerator.java
index 077d256..da49472 100644
--- a/server/src/main/java/org/apache/iotdb/db/qp/strategy/PhysicalGenerator.java
+++ b/server/src/main/java/org/apache/iotdb/db/qp/strategy/PhysicalGenerator.java
@@ -28,6 +28,7 @@ import java.util.Set;
import java.util.TreeSet;
import org.apache.iotdb.db.auth.AuthException;
import org.apache.iotdb.db.exception.metadata.MetadataException;
+import org.apache.iotdb.db.exception.path.PathException;
import org.apache.iotdb.db.exception.query.LogicalOperatorException;
import org.apache.iotdb.db.exception.query.LogicalOptimizeException;
import org.apache.iotdb.db.exception.query.QueryProcessException;
@@ -329,12 +330,14 @@ public class PhysicalGenerator {
queryPlan.setMeasurementColumnsGroupByDevice(measurementColumnsGroupByDevice);
queryPlan.setDataTypeConsistencyChecker(dataTypeConsistencyChecker);
queryPlan.setPaths(new ArrayList<>(allSelectPaths));
+ List<Path> paths = queryPlan.getPaths();
+ queryPlan.setDeduplicatedPaths(paths);
} else {
List<Path> paths = queryOperator.getSelectedPaths();
queryPlan.setPaths(paths);
}
-
- queryPlan.checkPaths(executor);
+ generateDataTypes(queryPlan);
+ deduplicate(queryPlan);
// transform filter operator to expression
FilterOperator filterOperator = queryOperator.getFilterOperator();
@@ -350,6 +353,42 @@ public class PhysicalGenerator {
return queryPlan;
}
+ private void generateDataTypes(QueryPlan queryPlan) throws PathException {
+ List<Path> paths = queryPlan.getPaths();
+ List<TSDataType> dataTypes = new ArrayList<>(paths.size());
+ for (int i = 0; i < paths.size(); i++) {
+ Path path = paths.get(i);
+ TSDataType seriesType = executor.getSeriesType(path);
+ dataTypes.add(seriesType);
+ queryPlan.addTypeMapping(path, seriesType);
+ }
+ queryPlan.setDataTypes(dataTypes);
+ }
+
+ private void deduplicate(QueryPlan queryPlan) {
+ if (queryPlan instanceof AggregationPlan) {
+ if (!queryPlan.isGroupByDevice()) {
+ AggregationPlan aggregationPlan = (AggregationPlan) queryPlan;
+ deduplicateAggregation(aggregationPlan);
+ }
+ return;
+ }
+ List<Path> paths = queryPlan.getPaths();
+
+ Set<String> columnSet = new HashSet<>();
+ Map<Path, TSDataType> dataTypeMapping = queryPlan.getDataTypeMapping();
+ for (int i = 0; i < paths.size(); i++) {
+ Path path = paths.get(i);
+ String column = path.toString();
+ if (!columnSet.contains(column)) {
+ TSDataType seriesType = dataTypeMapping.get(path);
+ queryPlan.addDeduplicatedPaths(path);
+ queryPlan.addDeduplicatedDataTypes(seriesType);
+ columnSet.add(column);
+ }
+ }
+ }
+
private List<String> slimitTrimColumn(List<String> columnList, int seriesLimit, int seriesOffset)
throws QueryProcessException {
@@ -368,5 +407,24 @@ public class PhysicalGenerator {
return new ArrayList<>(columnList.subList(seriesOffset, endPosition));
}
+
+ private void deduplicateAggregation(AggregationPlan queryPlan) {
+ List<Path> paths = queryPlan.getPaths();
+ List<String> aggregations = queryPlan.getAggregations();
+
+ Set<String> columnSet = new HashSet<>();
+ Map<Path, TSDataType> dataTypeMapping = queryPlan.getDataTypeMapping();
+ for (int i = 0; i < paths.size(); i++) {
+ Path path = paths.get(i);
+ String column = aggregations.get(i) + "(" + path.toString() + ")";
+ if (!columnSet.contains(column)) {
+ queryPlan.addDeduplicatedPaths(path);
+ TSDataType seriesType = dataTypeMapping.get(path);
+ queryPlan.addDeduplicatedDataTypes(seriesType);
+ queryPlan.addDeduplicatedAggregations(aggregations.get(i));
+ columnSet.add(column);
+ }
+ }
+ }
}
diff --git a/server/src/main/java/org/apache/iotdb/db/query/dataset/DeviceIterateDataSet.java b/server/src/main/java/org/apache/iotdb/db/query/dataset/DeviceIterateDataSet.java
index c348a35..2610738 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/dataset/DeviceIterateDataSet.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/dataset/DeviceIterateDataSet.java
@@ -25,7 +25,9 @@ import java.util.List;
import java.util.Map;
import java.util.Set;
import org.apache.iotdb.db.exception.StorageEngineException;
+import org.apache.iotdb.db.exception.path.PathException;
import org.apache.iotdb.db.exception.query.QueryProcessException;
+import org.apache.iotdb.db.metadata.MManager;
import org.apache.iotdb.db.qp.physical.crud.AggregationPlan;
import org.apache.iotdb.db.qp.physical.crud.FillQueryPlan;
import org.apache.iotdb.db.qp.physical.crud.GroupByPlan;
@@ -39,7 +41,6 @@ import org.apache.iotdb.tsfile.read.common.Field;
import org.apache.iotdb.tsfile.read.common.Path;
import org.apache.iotdb.tsfile.read.common.RowRecord;
import org.apache.iotdb.tsfile.read.expression.IExpression;
-import org.apache.iotdb.tsfile.read.expression.QueryExpression;
import org.apache.iotdb.tsfile.read.query.dataset.QueryDataSet;
import org.apache.iotdb.tsfile.utils.Binary;
@@ -72,6 +73,7 @@ public class DeviceIterateDataSet extends QueryDataSet {
private String currentDevice;
private QueryDataSet currentDataSet;
private int[] currentColumnMapRelation;
+ private Map<Path, TSDataType> tsDataTypeMap;
public DeviceIterateDataSet(QueryPlan queryPlan, QueryContext context,
IEngineQueryRouter queryRouter) {
@@ -79,7 +81,7 @@ public class DeviceIterateDataSet extends QueryDataSet {
// get deduplicated measurement columns (already deduplicated in TSServiceImpl.executeDataQuery)
this.deduplicatedMeasurementColumns = queryPlan.getMeasurementColumnList();
-
+ this.tsDataTypeMap = queryPlan.getDataTypeMapping();
this.queryRouter = queryRouter;
this.context = context;
this.measurementColumnsGroupByDevice = queryPlan.getMeasurementColumnsGroupByDevice();
@@ -148,35 +150,57 @@ public class DeviceIterateDataSet extends QueryDataSet {
}
// extract paths and aggregations if exist from executeColumns
List<Path> executePaths = new ArrayList<>();
+ List<TSDataType> tsDataTypes = new ArrayList<>();
List<String> executeAggregations = new ArrayList<>();
for (String column : executeColumns) {
if (dataSetType == DataSetType.GROUPBY || dataSetType == DataSetType.AGGREGATE) {
- executePaths.add(new Path(currentDevice,
- column.substring(column.indexOf("(") + 1, column.indexOf(")"))));
+ Path path = new Path(currentDevice,
+ column.substring(column.indexOf("(") + 1, column.indexOf(")")));
+ tsDataTypes.add(tsDataTypeMap.get(path));
+ executePaths.add(path);
executeAggregations.add(column.substring(0, column.indexOf("(")));
} else {
- executePaths.add(new Path(currentDevice, column));
+ Path path = new Path(currentDevice, column);
+ tsDataTypes.add(tsDataTypeMap.get(path));
+ executePaths.add(path);
}
}
try {
switch (dataSetType) {
case GROUPBY:
- currentDataSet = queryRouter
- .groupBy(executePaths, executeAggregations, expression, unit, slidingStep,
- startTime, endTime, context);
+ GroupByPlan groupByPlan = new GroupByPlan();
+ groupByPlan.setEndTime(endTime);
+ groupByPlan.setStartTime(startTime);
+ groupByPlan.setSlidingStep(slidingStep);
+ groupByPlan.setUnit(unit);
+ groupByPlan.setDeduplicatedPaths(executePaths);
+ groupByPlan.setDeduplicatedDataTypes(dataTypes);
+ groupByPlan.setDeduplicatedAggregations(executeAggregations);
+ currentDataSet = queryRouter.groupBy(groupByPlan, context);
break;
case AGGREGATE:
- currentDataSet = queryRouter
- .aggregate(executePaths, executeAggregations, expression, context);
+ AggregationPlan aggregationPlan = new AggregationPlan();
+ aggregationPlan.setDeduplicatedPaths(executePaths);
+ aggregationPlan.setDeduplicatedAggregations(executeAggregations);
+ aggregationPlan.setDeduplicatedDataTypes(dataTypes);
+ aggregationPlan.setExpression(expression);
+ currentDataSet = queryRouter.aggregate(aggregationPlan, context);
break;
case FILL:
- currentDataSet = queryRouter.fill(executePaths, queryTime, fillType, context);
+ FillQueryPlan fillQueryPlan = new FillQueryPlan();
+ fillQueryPlan.setFillType(fillType);
+ fillQueryPlan.setQueryTime(queryTime);
+ fillQueryPlan.setDeduplicatedDataTypes(tsDataTypes);
+ fillQueryPlan.setDeduplicatedPaths(executePaths);
+ currentDataSet = queryRouter.fill(fillQueryPlan, context);
break;
case QUERY:
- QueryExpression queryExpression = QueryExpression.create()
- .setSelectSeries(executePaths).setExpression(expression);
- currentDataSet = queryRouter.query(queryExpression, context);
+ QueryPlan queryPlan = new QueryPlan();
+ queryPlan.setDeduplicatedPaths(executePaths);
+ queryPlan.setDeduplicatedDataTypes(tsDataTypes);
+ queryPlan.setExpression(expression);
+ currentDataSet = queryRouter.query(queryPlan, context);
break;
default:
throw new IOException("unsupported DataSetType");
diff --git a/server/src/main/java/org/apache/iotdb/db/query/dataset/groupby/GroupByEngineDataSet.java b/server/src/main/java/org/apache/iotdb/db/query/dataset/groupby/GroupByEngineDataSet.java
index 44a9d3a..6342130 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/dataset/groupby/GroupByEngineDataSet.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/dataset/groupby/GroupByEngineDataSet.java
@@ -18,21 +18,19 @@
*/
package org.apache.iotdb.db.query.dataset.groupby;
+import java.util.ArrayList;
+import java.util.List;
import org.apache.iotdb.db.exception.path.PathException;
-import org.apache.iotdb.db.metadata.MManager;
+import org.apache.iotdb.db.qp.physical.crud.GroupByPlan;
import org.apache.iotdb.db.query.aggregation.AggreResultData;
import org.apache.iotdb.db.query.aggregation.AggregateFunction;
+import org.apache.iotdb.db.query.context.QueryContext;
import org.apache.iotdb.db.query.factory.AggreFuncFactory;
import org.apache.iotdb.tsfile.exception.write.UnSupportedDataTypeException;
-import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
import org.apache.iotdb.tsfile.read.common.Field;
-import org.apache.iotdb.tsfile.read.common.Path;
import org.apache.iotdb.tsfile.read.query.dataset.QueryDataSet;
import org.apache.iotdb.tsfile.utils.Pair;
-import java.util.ArrayList;
-import java.util.List;
-
public abstract class GroupByEngineDataSet extends QueryDataSet {
protected long queryId;
@@ -50,14 +48,13 @@ public abstract class GroupByEngineDataSet extends QueryDataSet {
/**
* groupBy query.
*/
- public GroupByEngineDataSet(long queryId, List<Path> paths, long unit,
- long slidingStep, long startTime, long endTime) {
- super(paths);
- this.queryId = queryId;
- this.unit = unit;
- this.slidingStep = slidingStep;
- this.intervalStartTime = startTime;
- this.intervalEndTime = endTime;
+ public GroupByEngineDataSet(QueryContext context, GroupByPlan groupByPlan) {
+ super(groupByPlan.getDeduplicatedPaths(), groupByPlan.getDeduplicatedDataTypes());
+ this.queryId = context.getQueryId();
+ this.unit = groupByPlan.getUnit();
+ this.slidingStep = groupByPlan.getSlidingStep();
+ this.intervalStartTime = groupByPlan.getStartTime();
+ this.intervalEndTime = groupByPlan.getEndTime();
this.functions = new ArrayList<>();
// init group by time partition
@@ -66,18 +63,15 @@ public abstract class GroupByEngineDataSet extends QueryDataSet {
this.endTime = -1;
}
- protected void initAggreFuction(List<String> aggres) throws PathException {
- List<TSDataType> types = new ArrayList<>();
+ protected void initAggreFuction(GroupByPlan groupByPlan) throws PathException {
// construct AggregateFunctions
for (int i = 0; i < paths.size(); i++) {
- TSDataType tsDataType = MManager.getInstance()
- .getSeriesType(paths.get(i).getFullPath());
- AggregateFunction function = AggreFuncFactory.getAggrFuncByName(aggres.get(i), tsDataType);
+ AggregateFunction function = AggreFuncFactory
+ .getAggrFuncByName(groupByPlan.getDeduplicatedAggregations().get(i),
+ groupByPlan.getDeduplicatedDataTypes().get(i));
function.init();
functions.add(function);
- types.add(function.getResultDataType());
}
- super.setDataTypes(types);
}
@Override
@@ -91,7 +85,7 @@ public abstract class GroupByEngineDataSet extends QueryDataSet {
usedIndex++;
if (startTime <= intervalEndTime) {
hasCachedTimeInterval = true;
- endTime = Math.min(startTime + unit, intervalEndTime+1);
+ endTime = Math.min(startTime + unit, intervalEndTime + 1);
return true;
} else {
return false;
diff --git a/server/src/main/java/org/apache/iotdb/db/query/dataset/groupby/GroupByWithValueFilterDataSet.java b/server/src/main/java/org/apache/iotdb/db/query/dataset/groupby/GroupByWithValueFilterDataSet.java
index 2d2ac2a..a6c8262 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/dataset/groupby/GroupByWithValueFilterDataSet.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/dataset/groupby/GroupByWithValueFilterDataSet.java
@@ -19,9 +19,13 @@
package org.apache.iotdb.db.query.dataset.groupby;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
import org.apache.iotdb.db.conf.IoTDBDescriptor;
import org.apache.iotdb.db.exception.StorageEngineException;
-import org.apache.iotdb.db.exception.query.QueryProcessException;
+import org.apache.iotdb.db.exception.path.PathException;
+import org.apache.iotdb.db.qp.physical.crud.GroupByPlan;
import org.apache.iotdb.db.query.aggregation.AggregateFunction;
import org.apache.iotdb.db.query.context.QueryContext;
import org.apache.iotdb.db.query.reader.IReaderByTimestamp;
@@ -29,13 +33,8 @@ import org.apache.iotdb.db.query.reader.seriesRelated.SeriesReaderByTimestamp;
import org.apache.iotdb.db.query.timegenerator.EngineTimeGenerator;
import org.apache.iotdb.tsfile.read.common.Path;
import org.apache.iotdb.tsfile.read.common.RowRecord;
-import org.apache.iotdb.tsfile.read.expression.IExpression;
import org.apache.iotdb.tsfile.read.query.timegenerator.TimeGenerator;
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.List;
-
public class GroupByWithValueFilterDataSet extends GroupByEngineDataSet {
private List<IReaderByTimestamp> allDataReaderList;
@@ -57,9 +56,17 @@ public class GroupByWithValueFilterDataSet extends GroupByEngineDataSet {
/**
* constructor.
*/
- public GroupByWithValueFilterDataSet(long queryId, List<Path> paths, long unit,
- long slidingStep, long startTime, long endTime) {
- super(queryId, paths, unit, slidingStep, startTime, endTime);
+ public GroupByWithValueFilterDataSet(QueryContext context, GroupByPlan groupByPlan)
+ throws PathException, IOException, StorageEngineException {
+ super(context, groupByPlan);
+ this.allDataReaderList = new ArrayList<>();
+ this.timeStampFetchSize = IoTDBDescriptor.getInstance().getConfig().getBatchSize();
+ initGroupBy(context, groupByPlan);
+ }
+
+ public GroupByWithValueFilterDataSet(long queryId, GroupByPlan groupByPlan)
+ throws PathException, IOException, StorageEngineException {
+ super(new QueryContext(queryId), groupByPlan);
this.allDataReaderList = new ArrayList<>();
this.timeStampFetchSize = IoTDBDescriptor.getInstance().getConfig().getBatchSize();
}
@@ -67,11 +74,10 @@ public class GroupByWithValueFilterDataSet extends GroupByEngineDataSet {
/**
* init reader and aggregate function.
*/
- public void initGroupBy(QueryContext context, List<String> aggres, IExpression expression)
- throws StorageEngineException, QueryProcessException, IOException {
- initAggreFuction(aggres);
-
- this.timestampGenerator = new EngineTimeGenerator(expression, context);
+ private void initGroupBy(QueryContext context, GroupByPlan groupByPlan)
+ throws StorageEngineException, IOException, PathException {
+ initAggreFuction(groupByPlan);
+ this.timestampGenerator = new EngineTimeGenerator(groupByPlan.getExpression(), context);
this.allDataReaderList = new ArrayList<>();
for (Path path : paths) {
SeriesReaderByTimestamp seriesReaderByTimestamp = new SeriesReaderByTimestamp(path, context);
@@ -134,7 +140,7 @@ public class GroupByWithValueFilterDataSet extends GroupByEngineDataSet {
/**
* construct an array of timestamps for one batch of a group by partition calculating.
*
- * @param timestampArray timestamp array
+ * @param timestampArray timestamp array
* @param timeArrayLength the current size of timestamp array
* @return time array size
*/
diff --git a/server/src/main/java/org/apache/iotdb/db/query/dataset/groupby/GroupByWithoutValueFilterDataSet.java b/server/src/main/java/org/apache/iotdb/db/query/dataset/groupby/GroupByWithoutValueFilterDataSet.java
index d44ce26..9d9ed28 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/dataset/groupby/GroupByWithoutValueFilterDataSet.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/dataset/groupby/GroupByWithoutValueFilterDataSet.java
@@ -23,6 +23,7 @@ import org.apache.iotdb.db.engine.querycontext.QueryDataSource;
import org.apache.iotdb.db.exception.StorageEngineException;
import org.apache.iotdb.db.exception.path.PathException;
import org.apache.iotdb.db.exception.query.QueryProcessException;
+import org.apache.iotdb.db.qp.physical.crud.GroupByPlan;
import org.apache.iotdb.db.query.aggregation.AggreResultData;
import org.apache.iotdb.db.query.aggregation.AggregateFunction;
import org.apache.iotdb.db.query.context.QueryContext;
@@ -31,6 +32,7 @@ import org.apache.iotdb.db.query.reader.IPointReader;
import org.apache.iotdb.db.query.reader.resourceRelated.OldUnseqResourceMergeReader;
import org.apache.iotdb.db.query.reader.resourceRelated.SeqResourceIterateReader;
import org.apache.iotdb.tsfile.file.header.PageHeader;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
import org.apache.iotdb.tsfile.read.common.*;
import org.apache.iotdb.tsfile.read.expression.IExpression;
import org.apache.iotdb.tsfile.read.expression.impl.GlobalTimeExpression;
@@ -52,9 +54,9 @@ public class GroupByWithoutValueFilterDataSet extends GroupByEngineDataSet {
/**
* constructor.
*/
- public GroupByWithoutValueFilterDataSet(long queryId, List<Path> paths, long unit,
- long slidingStep, long startTime, long endTime) {
- super(queryId, paths, unit, slidingStep, startTime, endTime);
+ public GroupByWithoutValueFilterDataSet(QueryContext context, GroupByPlan groupByPlan)
+ throws PathException, IOException, StorageEngineException {
+ super(context, groupByPlan);
this.unSequenceReaderList = new ArrayList<>();
this.sequenceReaderList = new ArrayList<>();
@@ -65,14 +67,16 @@ public class GroupByWithoutValueFilterDataSet extends GroupByEngineDataSet {
hasCachedSequenceDataList.add(false);
batchDataList.add(null);
}
+ initGroupBy(context, groupByPlan);
}
/**
* init reader and aggregate function.
*/
- public void initGroupBy(QueryContext context, List<String> aggres, IExpression expression)
- throws StorageEngineException, PathException, IOException {
- initAggreFuction(aggres);
+ private void initGroupBy(QueryContext context, GroupByPlan groupByPlan)
+ throws StorageEngineException, IOException, PathException {
+ IExpression expression = groupByPlan.getExpression();
+ initAggreFuction(groupByPlan);
// init reader
if (expression != null) {
timeFilter = ((GlobalTimeExpression) expression).getFilter();
@@ -184,8 +188,8 @@ public class GroupByWithoutValueFilterDataSet extends GroupByEngineDataSet {
/**
* calculate groupBy's result in batch data.
*
- * @param idx series index
- * @param function aggregate function of the series
+ * @param idx series index
+ * @param function aggregate function of the series
* @param unsequenceReader unsequence reader of the series
* @return if all sequential data been computed
*/
@@ -213,8 +217,8 @@ public class GroupByWithoutValueFilterDataSet extends GroupByEngineDataSet {
/**
* skip the points with timestamp less than startTime.
*
- * @param idx the index of series
- * @param sequenceReader sequence Reader
+ * @param idx the index of series
+ * @param sequenceReader sequence Reader
* @param unsequenceReader unsequence Reader
* @throws IOException exception when reading file
*/
diff --git a/server/src/main/java/org/apache/iotdb/db/query/executor/AggregateEngineExecutor.java b/server/src/main/java/org/apache/iotdb/db/query/executor/AggregateEngineExecutor.java
index 2dc1b4b..3732123 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/executor/AggregateEngineExecutor.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/executor/AggregateEngineExecutor.java
@@ -27,7 +27,7 @@ import org.apache.iotdb.db.engine.querycontext.QueryDataSource;
import org.apache.iotdb.db.exception.StorageEngineException;
import org.apache.iotdb.db.exception.path.PathException;
import org.apache.iotdb.db.exception.query.QueryProcessException;
-import org.apache.iotdb.db.metadata.MManager;
+import org.apache.iotdb.db.qp.physical.crud.AggregationPlan;
import org.apache.iotdb.db.query.aggregation.AggreResultData;
import org.apache.iotdb.db.query.aggregation.AggregateFunction;
import org.apache.iotdb.db.query.aggregation.impl.LastValueAggrFunc;
@@ -37,10 +37,9 @@ import org.apache.iotdb.db.query.control.QueryResourceManager;
import org.apache.iotdb.db.query.dataset.AggreResultDataPointReader;
import org.apache.iotdb.db.query.dataset.OldEngineDataSetWithoutValueFilter;
import org.apache.iotdb.db.query.factory.AggreFuncFactory;
-import org.apache.iotdb.db.query.reader.resourceRelated.OldUnseqResourceMergeReader;
-import org.apache.iotdb.tsfile.read.reader.IAggregateReader;
import org.apache.iotdb.db.query.reader.IPointReader;
import org.apache.iotdb.db.query.reader.IReaderByTimestamp;
+import org.apache.iotdb.db.query.reader.resourceRelated.OldUnseqResourceMergeReader;
import org.apache.iotdb.db.query.reader.resourceRelated.SeqResourceIterateReader;
import org.apache.iotdb.db.query.reader.seriesRelated.SeriesReaderByTimestamp;
import org.apache.iotdb.db.query.timegenerator.EngineTimeGenerator;
@@ -52,10 +51,12 @@ import org.apache.iotdb.tsfile.read.expression.IExpression;
import org.apache.iotdb.tsfile.read.expression.impl.GlobalTimeExpression;
import org.apache.iotdb.tsfile.read.filter.basic.Filter;
import org.apache.iotdb.tsfile.read.query.dataset.QueryDataSet;
+import org.apache.iotdb.tsfile.read.reader.IAggregateReader;
public class AggregateEngineExecutor {
private List<Path> selectedSeries;
+ private List<TSDataType> dataTypes;
private List<String> aggres;
private IExpression expression;
@@ -67,11 +68,11 @@ public class AggregateEngineExecutor {
/**
* constructor.
*/
- public AggregateEngineExecutor(List<Path> selectedSeries, List<String> aggres,
- IExpression expression) {
- this.selectedSeries = selectedSeries;
- this.aggres = aggres;
- this.expression = expression;
+ public AggregateEngineExecutor(AggregationPlan aggregationPlan) {
+ this.selectedSeries = aggregationPlan.getDeduplicatedPaths();
+ this.dataTypes = aggregationPlan.getDeduplicatedDataTypes();
+ this.aggres = aggregationPlan.getDeduplicatedAggregations();
+ this.expression = aggregationPlan.getExpression();
this.aggregateFetchSize = IoTDBDescriptor.getInstance().getConfig().getBatchSize();
}
@@ -92,8 +93,7 @@ public class AggregateEngineExecutor {
List<AggregateFunction> aggregateFunctions = new ArrayList<>();
for (int i = 0; i < selectedSeries.size(); i++) {
// construct AggregateFunction
- TSDataType tsDataType = MManager.getInstance()
- .getSeriesType(selectedSeries.get(i).getFullPath());
+ TSDataType tsDataType = dataTypes.get(i);
AggregateFunction function = AggreFuncFactory.getAggrFuncByName(aggres.get(i), tsDataType);
function.init();
aggregateFunctions.add(function);
@@ -134,10 +134,10 @@ public class AggregateEngineExecutor {
/**
* calculation aggregate result with only time filter or no filter for one series.
*
- * @param function aggregate function
- * @param sequenceReader sequence data reader
+ * @param function aggregate function
+ * @param sequenceReader sequence data reader
* @param unSequenceReader unsequence data reader
- * @param filter time filter or null
+ * @param filter time filter or null
* @return one series aggregate result data
*/
private AggreResultData aggregateWithoutValueFilter(AggregateFunction function,
@@ -202,8 +202,8 @@ public class AggregateEngineExecutor {
/**
* handle last and max_time aggregate function with only time filter or no filter.
*
- * @param function aggregate function
- * @param sequenceReader sequence data reader
+ * @param function aggregate function
+ * @param sequenceReader sequence data reader
* @param unSequenceReader unsequence data reader
* @return BatchData-aggregate result
*/
@@ -271,7 +271,7 @@ public class AggregateEngineExecutor {
List<AggregateFunction> aggregateFunctions = new ArrayList<>();
for (int i = 0; i < selectedSeries.size(); i++) {
- TSDataType type = MManager.getInstance().getSeriesType(selectedSeries.get(i).getFullPath());
+ TSDataType type = dataTypes.get(i);
AggregateFunction function = AggreFuncFactory.getAggrFuncByName(aggres.get(i), type);
function.init();
aggregateFunctions.add(function);
@@ -330,6 +330,7 @@ public class AggregateEngineExecutor {
dataTypes.add(resultData.getDataType());
resultDataPointReaders.add(new AggreResultDataPointReader(resultData));
}
- return new OldEngineDataSetWithoutValueFilter(selectedSeries, dataTypes, resultDataPointReaders);
+ return new OldEngineDataSetWithoutValueFilter(selectedSeries, dataTypes,
+ resultDataPointReaders);
}
}
diff --git a/server/src/main/java/org/apache/iotdb/db/query/executor/EngineExecutor.java b/server/src/main/java/org/apache/iotdb/db/query/executor/EngineExecutor.java
index 64c9f3e..68236d4 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/executor/EngineExecutor.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/executor/EngineExecutor.java
@@ -21,9 +21,7 @@ package org.apache.iotdb.db.query.executor;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
-import org.apache.iotdb.db.exception.path.PathException;
import org.apache.iotdb.db.exception.StorageEngineException;
-import org.apache.iotdb.db.metadata.MManager;
import org.apache.iotdb.db.query.context.QueryContext;
import org.apache.iotdb.db.query.dataset.EngineDataSetWithValueFilter;
import org.apache.iotdb.db.query.dataset.NewEngineDataSetWithoutValueFilter;
@@ -33,7 +31,7 @@ import org.apache.iotdb.db.query.reader.seriesRelated.SeriesReaderWithoutValueFi
import org.apache.iotdb.db.query.timegenerator.EngineTimeGenerator;
import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
import org.apache.iotdb.tsfile.read.common.Path;
-import org.apache.iotdb.tsfile.read.expression.QueryExpression;
+import org.apache.iotdb.tsfile.read.expression.IExpression;
import org.apache.iotdb.tsfile.read.expression.impl.GlobalTimeExpression;
import org.apache.iotdb.tsfile.read.filter.basic.Filter;
import org.apache.iotdb.tsfile.read.query.dataset.QueryDataSet;
@@ -44,10 +42,20 @@ import org.apache.iotdb.tsfile.read.reader.IBatchReader;
*/
public class EngineExecutor {
- private QueryExpression queryExpression;
+ private List<Path> deduplicatedPaths;
+ private List<TSDataType> deduplicatedDataTypes;
+ private IExpression optimizedExpression;
- public EngineExecutor(QueryExpression queryExpression) {
- this.queryExpression = queryExpression;
+ public EngineExecutor(List<Path> deduplicatedPaths, List<TSDataType> deduplicatedDataTypes,
+ IExpression optimizedExpression) {
+ this.deduplicatedPaths = deduplicatedPaths;
+ this.deduplicatedDataTypes = deduplicatedDataTypes;
+ this.optimizedExpression = optimizedExpression;
+ }
+
+ public EngineExecutor(List<Path> deduplicatedPaths, List<TSDataType> deduplicatedDataTypes) {
+ this.deduplicatedPaths = deduplicatedPaths;
+ this.deduplicatedDataTypes = deduplicatedDataTypes;
}
/**
@@ -57,28 +65,22 @@ public class EngineExecutor {
throws StorageEngineException, IOException {
Filter timeFilter = null;
- if (queryExpression.hasQueryFilter()) {
- timeFilter = ((GlobalTimeExpression) queryExpression.getExpression()).getFilter();
+ if (optimizedExpression != null) {
+ timeFilter = ((GlobalTimeExpression) optimizedExpression).getFilter();
}
List<IBatchReader> readersOfSelectedSeries = new ArrayList<>();
- List<TSDataType> dataTypes = new ArrayList<>();
- for (Path path : queryExpression.getSelectedSeries()) {
- TSDataType dataType;
- try {
- // add data type
- dataType = MManager.getInstance().getSeriesType(path.getFullPath());
- dataTypes.add(dataType);
- } catch (PathException e) {
- throw new StorageEngineException(e);
- }
+ for (int i = 0; i < deduplicatedPaths.size(); i++) {
+ Path path = deduplicatedPaths.get(i);
+ TSDataType dataType = deduplicatedDataTypes.get(i);
- IBatchReader reader = new SeriesReaderWithoutValueFilter(path, dataType, timeFilter, context, true);
+ IBatchReader reader = new SeriesReaderWithoutValueFilter(path, dataType, timeFilter, context,
+ true);
readersOfSelectedSeries.add(reader);
}
try {
- return new NewEngineDataSetWithoutValueFilter(queryExpression.getSelectedSeries(), dataTypes,
+ return new NewEngineDataSetWithoutValueFilter(deduplicatedPaths, deduplicatedDataTypes,
readersOfSelectedSeries);
} catch (IOException e) {
throw new StorageEngineException(e.getMessage());
@@ -91,25 +93,18 @@ public class EngineExecutor {
* @return QueryDataSet object
* @throws StorageEngineException StorageEngineException
*/
- public QueryDataSet executeWithValueFilter(QueryContext context) throws StorageEngineException, IOException {
+ public QueryDataSet executeWithValueFilter(QueryContext context)
+ throws StorageEngineException, IOException {
EngineTimeGenerator timestampGenerator = new EngineTimeGenerator(
- queryExpression.getExpression(), context);
+ optimizedExpression, context);
List<IReaderByTimestamp> readersOfSelectedSeries = new ArrayList<>();
- List<TSDataType> dataTypes = new ArrayList<>();
- for (Path path : queryExpression.getSelectedSeries()) {
- try {
- // add data type
- dataTypes.add(MManager.getInstance().getSeriesType(path.getFullPath()));
- } catch (PathException e) {
- throw new StorageEngineException(e);
- }
-
+ for (Path path : deduplicatedPaths) {
SeriesReaderByTimestamp seriesReaderByTimestamp = new SeriesReaderByTimestamp(path, context);
readersOfSelectedSeries.add(seriesReaderByTimestamp);
}
- return new EngineDataSetWithValueFilter(queryExpression.getSelectedSeries(), dataTypes,
+ return new EngineDataSetWithValueFilter(deduplicatedPaths, deduplicatedDataTypes,
timestampGenerator, readersOfSelectedSeries);
}
diff --git a/server/src/main/java/org/apache/iotdb/db/query/executor/EngineQueryRouter.java b/server/src/main/java/org/apache/iotdb/db/query/executor/EngineQueryRouter.java
index 76f449d..a5cab17 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/executor/EngineQueryRouter.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/executor/EngineQueryRouter.java
@@ -21,6 +21,10 @@ package org.apache.iotdb.db.query.executor;
import org.apache.iotdb.db.exception.StorageEngineException;
import org.apache.iotdb.db.exception.query.QueryProcessException;
+import org.apache.iotdb.db.qp.physical.crud.AggregationPlan;
+import org.apache.iotdb.db.qp.physical.crud.FillQueryPlan;
+import org.apache.iotdb.db.qp.physical.crud.GroupByPlan;
+import org.apache.iotdb.db.qp.physical.crud.QueryPlan;
import org.apache.iotdb.db.query.context.QueryContext;
import org.apache.iotdb.db.query.dataset.groupby.GroupByWithValueFilterDataSet;
import org.apache.iotdb.db.query.dataset.groupby.GroupByWithoutValueFilterDataSet;
@@ -35,7 +39,6 @@ import org.apache.iotdb.tsfile.read.expression.impl.BinaryExpression;
import org.apache.iotdb.tsfile.read.expression.impl.GlobalTimeExpression;
import org.apache.iotdb.tsfile.read.expression.util.ExpressionOptimizer;
import org.apache.iotdb.tsfile.read.filter.GroupByFilter;
-import org.apache.iotdb.tsfile.read.filter.factory.FilterType;
import org.apache.iotdb.tsfile.read.query.dataset.QueryDataSet;
import java.io.IOException;
@@ -49,15 +52,18 @@ import java.util.Map;
public class EngineQueryRouter implements IEngineQueryRouter {
@Override
- public QueryDataSet query(QueryExpression queryExpression, QueryContext context)
+ public QueryDataSet query(QueryPlan queryPlan, QueryContext context)
throws StorageEngineException {
+ IExpression expression = queryPlan.getExpression();
+ List<Path> deduplicatedPaths = queryPlan.getDeduplicatedPaths();
+ List<TSDataType> deduplicatedDataTypes = queryPlan.getDeduplicatedDataTypes();
- if (queryExpression.hasQueryFilter()) {
+ if (expression != null) {
try {
IExpression optimizedExpression = ExpressionOptimizer.getInstance()
- .optimize(queryExpression.getExpression(), queryExpression.getSelectedSeries());
- queryExpression.setExpression(optimizedExpression);
- EngineExecutor engineExecutor = new EngineExecutor(queryExpression);
+ .optimize(expression, deduplicatedPaths);
+ EngineExecutor engineExecutor = new EngineExecutor(deduplicatedPaths, deduplicatedDataTypes,
+ optimizedExpression);
if (optimizedExpression.getType() == ExpressionType.GLOBAL_TIME) {
return engineExecutor.executeWithoutValueFilter(context);
} else {
@@ -68,7 +74,7 @@ public class EngineQueryRouter implements IEngineQueryRouter {
throw new StorageEngineException(e.getMessage());
}
} else {
- EngineExecutor engineExecutor = new EngineExecutor(queryExpression);
+ EngineExecutor engineExecutor = new EngineExecutor(deduplicatedPaths, deduplicatedDataTypes);
try {
return engineExecutor.executeWithoutValueFilter(context);
} catch (IOException e) {
@@ -78,14 +84,16 @@ public class EngineQueryRouter implements IEngineQueryRouter {
}
@Override
- public QueryDataSet aggregate(List<Path> selectedSeries, List<String> aggres,
- IExpression expression, QueryContext context) throws QueryFilterOptimizationException,
- StorageEngineException, QueryProcessException, IOException {
+ public QueryDataSet aggregate(AggregationPlan aggregationPlan, QueryContext context)
+ throws QueryFilterOptimizationException, StorageEngineException, QueryProcessException, IOException {
+ IExpression expression = aggregationPlan.getExpression();
+ List<Path> selectedSeries = aggregationPlan.getDeduplicatedPaths();
+
if (expression != null) {
IExpression optimizedExpression = ExpressionOptimizer.getInstance()
.optimize(expression, selectedSeries);
AggregateEngineExecutor engineExecutor = new AggregateEngineExecutor(
- selectedSeries, aggres, optimizedExpression);
+ aggregationPlan);
if (optimizedExpression.getType() == ExpressionType.GLOBAL_TIME) {
return engineExecutor.executeWithoutValueFilter(context);
} else {
@@ -93,22 +101,25 @@ public class EngineQueryRouter implements IEngineQueryRouter {
}
} else {
AggregateEngineExecutor engineExecutor = new AggregateEngineExecutor(
- selectedSeries, aggres, null);
+ aggregationPlan);
return engineExecutor.executeWithoutValueFilter(context);
}
}
@Override
- public QueryDataSet groupBy(List<Path> selectedSeries, List<String> aggres,
- IExpression expression, long unit, long slidingStep, long startTime, long endTime,
- QueryContext context)
- throws QueryFilterOptimizationException, StorageEngineException,
- QueryProcessException, IOException {
+ public QueryDataSet groupBy(GroupByPlan groupByPlan, QueryContext context)
+ throws QueryFilterOptimizationException, StorageEngineException, QueryProcessException, IOException {
+ long unit = groupByPlan.getUnit();
+ long slidingStep = groupByPlan.getSlidingStep();
+ long startTime = groupByPlan.getStartTime();
+ long endTime = groupByPlan.getEndTime();
- long queryId = context.getQueryId();
+ IExpression expression = groupByPlan.getExpression();
+ List<Path> selectedSeries = groupByPlan.getDeduplicatedPaths();
- GlobalTimeExpression timeExpression = new GlobalTimeExpression(new GroupByFilter(unit, slidingStep, startTime, endTime));
+ GlobalTimeExpression timeExpression = new GlobalTimeExpression(
+ new GroupByFilter(unit, slidingStep, startTime, endTime));
if (expression == null) {
expression = timeExpression;
@@ -119,24 +130,25 @@ public class EngineQueryRouter implements IEngineQueryRouter {
IExpression optimizedExpression = ExpressionOptimizer.getInstance()
.optimize(expression, selectedSeries);
if (optimizedExpression.getType() == ExpressionType.GLOBAL_TIME) {
- GroupByWithoutValueFilterDataSet groupByEngine = new GroupByWithoutValueFilterDataSet(
- queryId, selectedSeries, unit, slidingStep, startTime, endTime);
- groupByEngine.initGroupBy(context, aggres, optimizedExpression);
+ GroupByWithoutValueFilterDataSet groupByEngine = new GroupByWithoutValueFilterDataSet(context,
+ groupByPlan);
return groupByEngine;
} else {
- GroupByWithValueFilterDataSet groupByEngine = new GroupByWithValueFilterDataSet(
- queryId, selectedSeries, unit, slidingStep, startTime, endTime);
- groupByEngine.initGroupBy(context, aggres, optimizedExpression);
+ GroupByWithValueFilterDataSet groupByEngine = new GroupByWithValueFilterDataSet(context,
+ groupByPlan);
return groupByEngine;
}
}
@Override
- public QueryDataSet fill(List<Path> fillPaths, long queryTime, Map<TSDataType, IFill> fillType,
- QueryContext context)
+ public QueryDataSet fill(FillQueryPlan fillQueryPlan, QueryContext context)
throws StorageEngineException, QueryProcessException, IOException {
+ List<Path> fillPaths = fillQueryPlan.getDeduplicatedPaths();
+ List<TSDataType> dataTypes = fillQueryPlan.getDeduplicatedDataTypes();
+ long queryTime = fillQueryPlan.getQueryTime();
+ Map<TSDataType, IFill> fillType = fillQueryPlan.getFillType();
- FillEngineExecutor fillEngineExecutor = new FillEngineExecutor(fillPaths, queryTime,
+ FillEngineExecutor fillEngineExecutor = new FillEngineExecutor(fillPaths, dataTypes, queryTime,
fillType);
return fillEngineExecutor.execute(context);
}
diff --git a/server/src/main/java/org/apache/iotdb/db/query/executor/FillEngineExecutor.java b/server/src/main/java/org/apache/iotdb/db/query/executor/FillEngineExecutor.java
index 761d3d8..8163c7c 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/executor/FillEngineExecutor.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/executor/FillEngineExecutor.java
@@ -38,14 +38,18 @@ import org.apache.iotdb.tsfile.read.query.dataset.QueryDataSet;
public class FillEngineExecutor {
private List<Path> selectedSeries;
+ private List<TSDataType> dataTypes;
private long queryTime;
private Map<TSDataType, IFill> typeIFillMap;
- public FillEngineExecutor(List<Path> selectedSeries, long queryTime,
+ public FillEngineExecutor(List<Path> selectedSeries,
+ List<TSDataType> dataTypes,
+ long queryTime,
Map<TSDataType, IFill> typeIFillMap) {
this.selectedSeries = selectedSeries;
this.queryTime = queryTime;
this.typeIFillMap = typeIFillMap;
+ this.dataTypes = dataTypes;
}
/**
@@ -56,10 +60,10 @@ public class FillEngineExecutor {
public QueryDataSet execute(QueryContext context)
throws StorageEngineException, QueryProcessException, IOException {
List<IFill> fillList = new ArrayList<>();
- List<TSDataType> dataTypeList = new ArrayList<>();
- for (Path path : selectedSeries) {
- TSDataType dataType = MManager.getInstance().getSeriesType(path.getFullPath());
- dataTypeList.add(dataType);
+
+ for (int i = 0; i < selectedSeries.size(); i++) {
+ Path path = selectedSeries.get(i);
+ TSDataType dataType = dataTypes.get(i);
IFill fill;
if (!typeIFillMap.containsKey(dataType)) {
fill = new PreviousFill(dataType, queryTime, 0);
@@ -77,6 +81,6 @@ public class FillEngineExecutor {
readers.add(fill.getFillResult());
}
- return new OldEngineDataSetWithoutValueFilter(selectedSeries, dataTypeList, readers);
+ return new OldEngineDataSetWithoutValueFilter(selectedSeries, dataTypes, readers);
}
}
diff --git a/server/src/main/java/org/apache/iotdb/db/query/executor/IEngineQueryRouter.java b/server/src/main/java/org/apache/iotdb/db/query/executor/IEngineQueryRouter.java
index c3075cf..29db1fc 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/executor/IEngineQueryRouter.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/executor/IEngineQueryRouter.java
@@ -19,61 +19,40 @@
package org.apache.iotdb.db.query.executor;
+import java.io.IOException;
import org.apache.iotdb.db.exception.StorageEngineException;
-import org.apache.iotdb.db.exception.path.PathException;
import org.apache.iotdb.db.exception.query.QueryProcessException;
+import org.apache.iotdb.db.qp.physical.crud.AggregationPlan;
+import org.apache.iotdb.db.qp.physical.crud.FillQueryPlan;
+import org.apache.iotdb.db.qp.physical.crud.GroupByPlan;
+import org.apache.iotdb.db.qp.physical.crud.QueryPlan;
import org.apache.iotdb.db.query.context.QueryContext;
-import org.apache.iotdb.db.query.fill.IFill;
import org.apache.iotdb.tsfile.exception.filter.QueryFilterOptimizationException;
-import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
-import org.apache.iotdb.tsfile.read.common.Path;
-import org.apache.iotdb.tsfile.read.expression.IExpression;
-import org.apache.iotdb.tsfile.read.expression.QueryExpression;
import org.apache.iotdb.tsfile.read.query.dataset.QueryDataSet;
-import java.io.IOException;
-import java.util.List;
-import java.util.Map;
-
public interface IEngineQueryRouter {
/**
* Execute physical plan.
*/
- QueryDataSet query(QueryExpression queryExpression, QueryContext context)
- throws StorageEngineException, PathException;
+ QueryDataSet query(QueryPlan queryPlan, QueryContext context) throws StorageEngineException;
/**
* Execute aggregation query.
*/
- QueryDataSet aggregate(List<Path> selectedSeries, List<String> aggres,
- IExpression expression, QueryContext context)
+ QueryDataSet aggregate(AggregationPlan aggregationPlan, QueryContext context)
throws QueryFilterOptimizationException, StorageEngineException, IOException, QueryProcessException;
/**
* Execute groupBy query.
- *
- * @param selectedSeries select path list
- * @param aggres aggregation name list
- * @param expression filter expression
- * @param unit time granularity for interval partitioning, unit is ms.
- * @param slidingStep the time sliding step, unit is ms
*/
- QueryDataSet groupBy(List<Path> selectedSeries, List<String> aggres,
- IExpression expression, long unit, long slidingStep, long startTime, long endTime,
- QueryContext context)
+ QueryDataSet groupBy(GroupByPlan groupByPlan, QueryContext context)
throws QueryFilterOptimizationException, StorageEngineException,
QueryProcessException, IOException;
/**
* Execute fill query.
- *
- * @param fillPaths select path list
- * @param queryTime timestamp
- * @param fillType type IFill map
*/
- QueryDataSet fill(List<Path> fillPaths, long queryTime, Map<TSDataType, IFill> fillType,
- QueryContext context)
+ QueryDataSet fill(FillQueryPlan fillQueryPlan, QueryContext context)
throws StorageEngineException, QueryProcessException, IOException;
-
}
diff --git a/server/src/test/java/org/apache/iotdb/db/engine/modification/DeletionQueryTest.java b/server/src/test/java/org/apache/iotdb/db/engine/modification/DeletionQueryTest.java
index cbdbf1c..52096ea 100644
--- a/server/src/test/java/org/apache/iotdb/db/engine/modification/DeletionQueryTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/engine/modification/DeletionQueryTest.java
@@ -27,14 +27,15 @@ import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import org.apache.iotdb.db.engine.StorageEngine;
-import org.apache.iotdb.db.exception.metadata.MetadataException;
import org.apache.iotdb.db.exception.StartupException;
import org.apache.iotdb.db.exception.StorageEngineException;
+import org.apache.iotdb.db.exception.metadata.MetadataException;
import org.apache.iotdb.db.exception.path.PathException;
import org.apache.iotdb.db.exception.query.QueryProcessException;
import org.apache.iotdb.db.exception.storageGroup.StorageGroupException;
import org.apache.iotdb.db.metadata.MManager;
import org.apache.iotdb.db.qp.physical.crud.InsertPlan;
+import org.apache.iotdb.db.qp.physical.crud.QueryPlan;
import org.apache.iotdb.db.query.executor.EngineQueryRouter;
import org.apache.iotdb.db.utils.EnvironmentUtils;
import org.apache.iotdb.tsfile.common.conf.TSFileDescriptor;
@@ -108,9 +109,15 @@ public class DeletionQueryTest {
pathList.add(new Path(processorName, measurements[3]));
pathList.add(new Path(processorName, measurements[4]));
pathList.add(new Path(processorName, measurements[5]));
+ List<TSDataType> dataTypes = new ArrayList<>();
+ dataTypes.add(TSDataType.valueOf(dataType));
+ dataTypes.add(TSDataType.valueOf(dataType));
+ dataTypes.add(TSDataType.valueOf(dataType));
- QueryExpression queryExpression = QueryExpression.create(pathList, null);
- QueryDataSet dataSet = router.query(queryExpression, TEST_QUERY_CONTEXT);
+ QueryPlan queryPlan = new QueryPlan();
+ queryPlan.setDeduplicatedDataTypes(dataTypes);
+ queryPlan.setDeduplicatedPaths(pathList);
+ QueryDataSet dataSet = router.query(queryPlan, TEST_QUERY_CONTEXT);
int count = 0;
while (dataSet.hasNext()) {
@@ -141,8 +148,15 @@ public class DeletionQueryTest {
pathList.add(new Path(processorName, measurements[4]));
pathList.add(new Path(processorName, measurements[5]));
- QueryExpression queryExpression = QueryExpression.create(pathList, null);
- QueryDataSet dataSet = router.query(queryExpression, TEST_QUERY_CONTEXT);
+ List<TSDataType> dataTypes = new ArrayList<>();
+ dataTypes.add(TSDataType.valueOf(dataType));
+ dataTypes.add(TSDataType.valueOf(dataType));
+ dataTypes.add(TSDataType.valueOf(dataType));
+
+ QueryPlan queryPlan = new QueryPlan();
+ queryPlan.setDeduplicatedDataTypes(dataTypes);
+ queryPlan.setDeduplicatedPaths(pathList);
+ QueryDataSet dataSet = router.query(queryPlan, TEST_QUERY_CONTEXT);
int count = 0;
while (dataSet.hasNext()) {
@@ -183,9 +197,15 @@ public class DeletionQueryTest {
pathList.add(new Path(processorName, measurements[3]));
pathList.add(new Path(processorName, measurements[4]));
pathList.add(new Path(processorName, measurements[5]));
+ List<TSDataType> dataTypes = new ArrayList<>();
+ dataTypes.add(TSDataType.valueOf(dataType));
+ dataTypes.add(TSDataType.valueOf(dataType));
+ dataTypes.add(TSDataType.valueOf(dataType));
- QueryExpression queryExpression = QueryExpression.create(pathList, null);
- QueryDataSet dataSet = router.query(queryExpression, TEST_QUERY_CONTEXT);
+ QueryPlan queryPlan = new QueryPlan();
+ queryPlan.setDeduplicatedDataTypes(dataTypes);
+ queryPlan.setDeduplicatedPaths(pathList);
+ QueryDataSet dataSet = router.query(queryPlan, TEST_QUERY_CONTEXT);
int count = 0;
while (dataSet.hasNext()) {
@@ -227,8 +247,15 @@ public class DeletionQueryTest {
pathList.add(new Path(processorName, measurements[4]));
pathList.add(new Path(processorName, measurements[5]));
- QueryExpression queryExpression = QueryExpression.create(pathList, null);
- QueryDataSet dataSet = router.query(queryExpression, TEST_QUERY_CONTEXT);
+ List<TSDataType> dataTypes = new ArrayList<>();
+ dataTypes.add(TSDataType.valueOf(dataType));
+ dataTypes.add(TSDataType.valueOf(dataType));
+ dataTypes.add(TSDataType.valueOf(dataType));
+
+ QueryPlan queryPlan = new QueryPlan();
+ queryPlan.setDeduplicatedDataTypes(dataTypes);
+ queryPlan.setDeduplicatedPaths(pathList);
+ QueryDataSet dataSet = router.query(queryPlan, TEST_QUERY_CONTEXT);
int count = 0;
while (dataSet.hasNext()) {
@@ -290,9 +317,15 @@ public class DeletionQueryTest {
pathList.add(new Path(processorName, measurements[3]));
pathList.add(new Path(processorName, measurements[4]));
pathList.add(new Path(processorName, measurements[5]));
-
- QueryExpression queryExpression = QueryExpression.create(pathList, null);
- QueryDataSet dataSet = router.query(queryExpression, TEST_QUERY_CONTEXT);
+ List<TSDataType> dataTypes = new ArrayList<>();
+ dataTypes.add(TSDataType.valueOf(dataType));
+ dataTypes.add(TSDataType.valueOf(dataType));
+ dataTypes.add(TSDataType.valueOf(dataType));
+
+ QueryPlan queryPlan = new QueryPlan();
+ queryPlan.setDeduplicatedDataTypes(dataTypes);
+ queryPlan.setDeduplicatedPaths(pathList);
+ QueryDataSet dataSet = router.query(queryPlan, TEST_QUERY_CONTEXT);
int count = 0;
while (dataSet.hasNext()) {
diff --git a/server/src/test/java/org/apache/iotdb/db/integration/IoTDBSequenceDataQueryIT.java b/server/src/test/java/org/apache/iotdb/db/integration/IoTDBSequenceDataQueryIT.java
index 7d59c56..856f924 100644
--- a/server/src/test/java/org/apache/iotdb/db/integration/IoTDBSequenceDataQueryIT.java
+++ b/server/src/test/java/org/apache/iotdb/db/integration/IoTDBSequenceDataQueryIT.java
@@ -28,8 +28,11 @@ import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.SQLException;
import java.sql.Statement;
+import java.util.ArrayList;
+import java.util.List;
import org.apache.iotdb.db.conf.IoTDBDescriptor;
import org.apache.iotdb.db.exception.StorageEngineException;
+import org.apache.iotdb.db.qp.physical.crud.QueryPlan;
import org.apache.iotdb.db.query.context.QueryContext;
import org.apache.iotdb.db.query.control.QueryResourceManager;
import org.apache.iotdb.db.query.executor.EngineQueryRouter;
@@ -38,6 +41,7 @@ import org.apache.iotdb.db.utils.EnvironmentUtils;
import org.apache.iotdb.jdbc.Config;
import org.apache.iotdb.tsfile.common.conf.TSFileConfig;
import org.apache.iotdb.tsfile.common.conf.TSFileDescriptor;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
import org.apache.iotdb.tsfile.read.common.Path;
import org.apache.iotdb.tsfile.read.common.RowRecord;
import org.apache.iotdb.tsfile.read.expression.QueryExpression;
@@ -51,8 +55,8 @@ import org.junit.BeforeClass;
import org.junit.Test;
/**
- * Notice that, all test begins with "IoTDB" is integration test. All test which will start the IoTDB server should be
- * defined as integration test. In this test case, no unseq insert data.
+ * Notice that, all test begins with "IoTDB" is integration test. All test which will start the
+ * IoTDB server should be defined as integration test. In this test case, no unseq insert data.
*/
public class IoTDBSequenceDataQueryIT {
@@ -165,19 +169,29 @@ public class IoTDBSequenceDataQueryIT {
public void readWithoutFilterTest() throws IOException, StorageEngineException {
EngineQueryRouter engineExecutor = new EngineQueryRouter();
- QueryExpression queryExpression = QueryExpression.create();
- queryExpression.addSelectedPath(new Path(Constant.d0s0));
- queryExpression.addSelectedPath(new Path(Constant.d0s1));
- queryExpression.addSelectedPath(new Path(Constant.d0s2));
- queryExpression.addSelectedPath(new Path(Constant.d0s3));
- queryExpression.addSelectedPath(new Path(Constant.d0s4));
- queryExpression.addSelectedPath(new Path(Constant.d1s0));
- queryExpression.addSelectedPath(new Path(Constant.d1s1));
- queryExpression.setExpression(null);
+ List<Path> pathList = new ArrayList<>();
+ List<TSDataType> dataTypes = new ArrayList<>();
+ pathList.add(new Path(Constant.d0s0));
+ dataTypes.add(TSDataType.INT32);
+ pathList.add(new Path(Constant.d0s1));
+ dataTypes.add(TSDataType.INT64);
+ pathList.add(new Path(Constant.d0s2));
+ dataTypes.add(TSDataType.FLOAT);
+ pathList.add(new Path(Constant.d0s3));
+ dataTypes.add(TSDataType.TEXT);
+ pathList.add(new Path(Constant.d0s4));
+ dataTypes.add(TSDataType.BOOLEAN);
+ pathList.add(new Path(Constant.d1s0));
+ dataTypes.add(TSDataType.INT32);
+ pathList.add(new Path(Constant.d1s1));
+ dataTypes.add(TSDataType.INT64);
TEST_QUERY_JOB_ID = QueryResourceManager.getInstance().assignQueryId(true);
TEST_QUERY_CONTEXT = new QueryContext(TEST_QUERY_JOB_ID);
- QueryDataSet queryDataSet = engineExecutor.query(queryExpression, TEST_QUERY_CONTEXT);
+ QueryPlan queryPlan = new QueryPlan();
+ queryPlan.setDeduplicatedDataTypes(dataTypes);
+ queryPlan.setDeduplicatedPaths(pathList);
+ QueryDataSet queryDataSet = engineExecutor.query(queryPlan, TEST_QUERY_CONTEXT);
int cnt = 0;
while (queryDataSet.hasNext()) {
@@ -193,16 +207,24 @@ public class IoTDBSequenceDataQueryIT {
@Test
public void readWithTimeFilterTest() throws IOException, StorageEngineException {
EngineQueryRouter engineExecutor = new EngineQueryRouter();
- QueryExpression queryExpression = QueryExpression.create();
- queryExpression.addSelectedPath(new Path(Constant.d0s0));
- queryExpression.addSelectedPath(new Path(Constant.d1s0));
- queryExpression.addSelectedPath(new Path(Constant.d1s1));
+ List<Path> pathList = new ArrayList<>();
+ List<TSDataType> dataTypes = new ArrayList<>();
+ pathList.add(new Path(Constant.d0s0));
+ dataTypes.add(TSDataType.INT32);
+ pathList.add(new Path(Constant.d1s0));
+ dataTypes.add(TSDataType.INT32);
+ pathList.add(new Path(Constant.d1s1));
+ dataTypes.add(TSDataType.INT64);
GlobalTimeExpression globalTimeExpression = new GlobalTimeExpression(TimeFilter.gtEq(800L));
- queryExpression.setExpression(globalTimeExpression);
TEST_QUERY_JOB_ID = QueryResourceManager.getInstance().assignQueryId(true);
TEST_QUERY_CONTEXT = new QueryContext(TEST_QUERY_JOB_ID);
- QueryDataSet queryDataSet = engineExecutor.query(queryExpression, TEST_QUERY_CONTEXT);
+
+ QueryPlan queryPlan = new QueryPlan();
+ queryPlan.setDeduplicatedDataTypes(dataTypes);
+ queryPlan.setDeduplicatedPaths(pathList);
+ queryPlan.setExpression(globalTimeExpression);
+ QueryDataSet queryDataSet = engineExecutor.query(queryPlan, TEST_QUERY_CONTEXT);
int cnt = 0;
while (queryDataSet.hasNext()) {
@@ -222,23 +244,35 @@ public class IoTDBSequenceDataQueryIT {
public void readWithValueFilterTest() throws IOException, StorageEngineException {
// select * from root where root.vehicle.d0.s0 >=14
EngineQueryRouter engineExecutor = new EngineQueryRouter();
- QueryExpression queryExpression = QueryExpression.create();
- queryExpression.addSelectedPath(new Path(Constant.d0s0));
- queryExpression.addSelectedPath(new Path(Constant.d0s1));
- queryExpression.addSelectedPath(new Path(Constant.d0s2));
- queryExpression.addSelectedPath(new Path(Constant.d0s3));
- queryExpression.addSelectedPath(new Path(Constant.d0s4));
- queryExpression.addSelectedPath(new Path(Constant.d1s0));
- queryExpression.addSelectedPath(new Path(Constant.d1s1));
+ List<Path> pathList = new ArrayList<>();
+ List<TSDataType> dataTypes = new ArrayList<>();
+ pathList.add(new Path(Constant.d0s0));
+ dataTypes.add(TSDataType.INT32);
+ pathList.add(new Path(Constant.d0s1));
+ dataTypes.add(TSDataType.INT64);
+ pathList.add(new Path(Constant.d0s2));
+ dataTypes.add(TSDataType.FLOAT);
+ pathList.add(new Path(Constant.d0s3));
+ dataTypes.add(TSDataType.TEXT);
+ pathList.add(new Path(Constant.d0s4));
+ dataTypes.add(TSDataType.BOOLEAN);
+ pathList.add(new Path(Constant.d1s0));
+ dataTypes.add(TSDataType.INT32);
+ pathList.add(new Path(Constant.d1s1));
+ dataTypes.add(TSDataType.INT64);
Path queryPath = new Path(Constant.d0s0);
SingleSeriesExpression singleSeriesExpression = new SingleSeriesExpression(queryPath,
ValueFilter.gtEq(14));
- queryExpression.setExpression(singleSeriesExpression);
TEST_QUERY_JOB_ID = QueryResourceManager.getInstance().assignQueryId(true);
TEST_QUERY_CONTEXT = new QueryContext(TEST_QUERY_JOB_ID);
- QueryDataSet queryDataSet = engineExecutor.query(queryExpression, TEST_QUERY_CONTEXT);
+
+ QueryPlan queryPlan = new QueryPlan();
+ queryPlan.setDeduplicatedDataTypes(dataTypes);
+ queryPlan.setDeduplicatedPaths(pathList);
+ queryPlan.setExpression(singleSeriesExpression);
+ QueryDataSet queryDataSet = engineExecutor.query(queryPlan, TEST_QUERY_CONTEXT);
int cnt = 0;
while (queryDataSet.hasNext()) {
diff --git a/server/src/test/java/org/apache/iotdb/db/integration/IoTDBSeriesReaderIT.java b/server/src/test/java/org/apache/iotdb/db/integration/IoTDBSeriesReaderIT.java
index 125fe24..adf68b8 100644
--- a/server/src/test/java/org/apache/iotdb/db/integration/IoTDBSeriesReaderIT.java
+++ b/server/src/test/java/org/apache/iotdb/db/integration/IoTDBSeriesReaderIT.java
@@ -30,8 +30,12 @@ import java.sql.DriverManager;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Statement;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
import org.apache.iotdb.db.conf.IoTDBDescriptor;
import org.apache.iotdb.db.exception.StorageEngineException;
+import org.apache.iotdb.db.qp.physical.crud.QueryPlan;
import org.apache.iotdb.db.query.context.QueryContext;
import org.apache.iotdb.db.query.control.QueryResourceManager;
import org.apache.iotdb.db.query.executor.EngineQueryRouter;
@@ -40,6 +44,7 @@ import org.apache.iotdb.db.utils.EnvironmentUtils;
import org.apache.iotdb.jdbc.Config;
import org.apache.iotdb.tsfile.common.conf.TSFileConfig;
import org.apache.iotdb.tsfile.common.conf.TSFileDescriptor;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
import org.apache.iotdb.tsfile.read.common.Path;
import org.apache.iotdb.tsfile.read.common.RowRecord;
import org.apache.iotdb.tsfile.read.expression.QueryExpression;
@@ -52,8 +57,8 @@ import org.junit.BeforeClass;
import org.junit.Test;
/**
- * Notice that, all test begins with "IoTDB" is integration test. All test which will start the IoTDB server should be
- * defined as integration test.
+ * Notice that, all test begins with "IoTDB" is integration test. All test which will start the
+ * IoTDB server should be defined as integration test.
*/
public class IoTDBSeriesReaderIT {
@@ -107,8 +112,7 @@ public class IoTDBSeriesReaderIT {
Class.forName(Config.JDBC_DRIVER_NAME);
try (Connection connection = DriverManager
.getConnection(Config.IOTDB_URL_PREFIX + "127.0.0.1:6667/", "root", "root");
- Statement statement = connection.createStatement() ) {
-
+ Statement statement = connection.createStatement()) {
for (String sql : Constant.create_sql) {
statement.execute(sql);
@@ -245,20 +249,32 @@ public class IoTDBSeriesReaderIT {
//System.out.println("Test >>> " + selectSql);
EngineQueryRouter engineExecutor = new EngineQueryRouter();
- QueryExpression queryExpression = QueryExpression.create();
- queryExpression.addSelectedPath(new Path(Constant.d0s0));
- queryExpression.addSelectedPath(new Path(Constant.d0s1));
- queryExpression.addSelectedPath(new Path(Constant.d0s2));
- queryExpression.addSelectedPath(new Path(Constant.d0s3));
- queryExpression.addSelectedPath(new Path(Constant.d0s4));
- queryExpression.addSelectedPath(new Path(Constant.d0s5));
- queryExpression.addSelectedPath(new Path(Constant.d1s0));
- queryExpression.addSelectedPath(new Path(Constant.d1s1));
- queryExpression.setExpression(null);
+ List<Path> pathList = new ArrayList<>();
+ List<TSDataType> dataTypes = new ArrayList<>();
+ pathList.add(new Path(Constant.d0s0));
+ dataTypes.add(TSDataType.INT32);
+ pathList.add(new Path(Constant.d0s1));
+ dataTypes.add(TSDataType.INT64);
+ pathList.add(new Path(Constant.d0s2));
+ dataTypes.add(TSDataType.FLOAT);
+ pathList.add(new Path(Constant.d0s3));
+ dataTypes.add(TSDataType.TEXT);
+ pathList.add(new Path(Constant.d0s4));
+ dataTypes.add(TSDataType.BOOLEAN);
+ pathList.add(new Path(Constant.d0s5));
+ dataTypes.add(TSDataType.DOUBLE);
+ pathList.add(new Path(Constant.d1s0));
+ dataTypes.add(TSDataType.INT32);
+ pathList.add(new Path(Constant.d1s1));
+ dataTypes.add(TSDataType.INT64);
TEST_QUERY_JOB_ID = QueryResourceManager.getInstance().assignQueryId(true);
TEST_QUERY_CONTEXT = new QueryContext(TEST_QUERY_JOB_ID);
- QueryDataSet queryDataSet = engineExecutor.query(queryExpression, TEST_QUERY_CONTEXT);
+
+ QueryPlan queryPlan = new QueryPlan();
+ queryPlan.setDeduplicatedDataTypes(dataTypes);
+ queryPlan.setDeduplicatedPaths(pathList);
+ QueryDataSet queryDataSet = engineExecutor.query(queryPlan, TEST_QUERY_CONTEXT);
int cnt = 0;
while (queryDataSet.hasNext()) {
@@ -277,16 +293,22 @@ public class IoTDBSeriesReaderIT {
//System.out.println("Test >>> " + selectSql);
EngineQueryRouter engineExecutor = new EngineQueryRouter();
- QueryExpression queryExpression = QueryExpression.create();
+ List<Path> pathList = new ArrayList<>();
+ List<TSDataType> dataTypes = new ArrayList<>();
Path p = new Path(Constant.d0s0);
- queryExpression.addSelectedPath(p);
+ pathList.add(p);
+ dataTypes.add(TSDataType.INT32);
SingleSeriesExpression singleSeriesExpression = new SingleSeriesExpression(p,
ValueFilter.gtEq(20));
- queryExpression.setExpression(singleSeriesExpression);
TEST_QUERY_JOB_ID = QueryResourceManager.getInstance().assignQueryId(true);
TEST_QUERY_CONTEXT = new QueryContext(TEST_QUERY_JOB_ID);
- QueryDataSet queryDataSet = engineExecutor.query(queryExpression, TEST_QUERY_CONTEXT);
+
+ QueryPlan queryPlan = new QueryPlan();
+ queryPlan.setDeduplicatedDataTypes(dataTypes);
+ queryPlan.setDeduplicatedPaths(pathList);
+ queryPlan.setExpression(singleSeriesExpression);
+ QueryDataSet queryDataSet = engineExecutor.query(queryPlan, TEST_QUERY_CONTEXT);
int cnt = 0;
while (queryDataSet.hasNext()) {
@@ -306,15 +328,18 @@ public class IoTDBSeriesReaderIT {
//System.out.println("Test >>> " + selectSql);
EngineQueryRouter engineExecutor = new EngineQueryRouter();
- QueryExpression queryExpression = QueryExpression.create();
Path path = new Path(Constant.d0s0);
- queryExpression.addSelectedPath(path);
+ List<TSDataType> dataTypes = Collections.singletonList(TSDataType.INT32);
SingleSeriesExpression expression = new SingleSeriesExpression(path, TimeFilter.gt(22987L));
- queryExpression.setExpression(expression);
TEST_QUERY_JOB_ID = QueryResourceManager.getInstance().assignQueryId(true);
TEST_QUERY_CONTEXT = new QueryContext(TEST_QUERY_JOB_ID);
- QueryDataSet queryDataSet = engineExecutor.query(queryExpression, TEST_QUERY_CONTEXT);
+
+ QueryPlan queryPlan = new QueryPlan();
+ queryPlan.setDeduplicatedDataTypes(dataTypes);
+ queryPlan.setDeduplicatedPaths(Collections.singletonList(path));
+ queryPlan.setExpression(expression);
+ QueryDataSet queryDataSet = engineExecutor.query(queryPlan, TEST_QUERY_CONTEXT);
int cnt = 0;
while (queryDataSet.hasNext()) {
@@ -332,18 +357,25 @@ public class IoTDBSeriesReaderIT {
public void crossSeriesReadUpdateTest() throws IOException, StorageEngineException {
//System.out.println("Test >>> select s1 from root.vehicle.d0 where s0 < 111");
EngineQueryRouter engineExecutor = new EngineQueryRouter();
- QueryExpression queryExpression = QueryExpression.create();
Path path1 = new Path(Constant.d0s0);
Path path2 = new Path(Constant.d0s1);
- queryExpression.addSelectedPath(path1);
- queryExpression.addSelectedPath(path2);
SingleSeriesExpression singleSeriesExpression = new SingleSeriesExpression(path1,
ValueFilter.lt(111));
- queryExpression.setExpression(singleSeriesExpression);
+ List<Path> pathList = new ArrayList<>();
+ List<TSDataType> dataTypes = new ArrayList<>();
+ pathList.add(path1);
+ dataTypes.add(TSDataType.INT32);
+ pathList.add(path2);
+ dataTypes.add(TSDataType.INT64);
TEST_QUERY_JOB_ID = QueryResourceManager.getInstance().assignQueryId(true);
TEST_QUERY_CONTEXT = new QueryContext(TEST_QUERY_JOB_ID);
- QueryDataSet queryDataSet = engineExecutor.query(queryExpression, TEST_QUERY_CONTEXT);
+
+ QueryPlan queryPlan = new QueryPlan();
+ queryPlan.setDeduplicatedDataTypes(dataTypes);
+ queryPlan.setDeduplicatedPaths(pathList);
+ queryPlan.setExpression(singleSeriesExpression);
+ QueryDataSet queryDataSet = engineExecutor.query(queryPlan, TEST_QUERY_CONTEXT);
int cnt = 0;
while (queryDataSet.hasNext()) {
@@ -358,9 +390,10 @@ public class IoTDBSeriesReaderIT {
@Test
public void queryEmptySeriesTest() throws SQLException {
Statement statement = connection.createStatement();
- statement.execute("CREATE TIMESERIES root.vehicle.d_empty.s1 WITH DATATYPE=INT64, ENCODING=RLE");
+ statement
+ .execute("CREATE TIMESERIES root.vehicle.d_empty.s1 WITH DATATYPE=INT64, ENCODING=RLE");
ResultSet resultSet = statement.executeQuery("select * from root.vehicle.d_empty");
- assertFalse (resultSet.next());
+ assertFalse(resultSet.next());
resultSet.close();
}
}
diff --git a/server/src/test/java/org/apache/iotdb/db/qp/utils/MemIntQpExecutor.java b/server/src/test/java/org/apache/iotdb/db/qp/utils/MemIntQpExecutor.java
index 2e73c6d..f8e03f2 100644
--- a/server/src/test/java/org/apache/iotdb/db/qp/utils/MemIntQpExecutor.java
+++ b/server/src/test/java/org/apache/iotdb/db/qp/utils/MemIntQpExecutor.java
@@ -25,8 +25,11 @@ import org.apache.iotdb.db.exception.query.QueryProcessException;
import org.apache.iotdb.db.qp.constant.SQLConstant;
import org.apache.iotdb.db.qp.executor.AbstractQueryProcessExecutor;
import org.apache.iotdb.db.qp.physical.PhysicalPlan;
+import org.apache.iotdb.db.qp.physical.crud.AggregationPlan;
import org.apache.iotdb.db.qp.physical.crud.BatchInsertPlan;
import org.apache.iotdb.db.qp.physical.crud.DeletePlan;
+import org.apache.iotdb.db.qp.physical.crud.FillQueryPlan;
+import org.apache.iotdb.db.qp.physical.crud.GroupByPlan;
import org.apache.iotdb.db.qp.physical.crud.InsertPlan;
import org.apache.iotdb.db.qp.physical.crud.UpdatePlan;
import org.apache.iotdb.db.qp.physical.sys.AuthorPlan;
@@ -100,25 +103,19 @@ public class MemIntQpExecutor extends AbstractQueryProcessExecutor {
}
@Override
- public QueryDataSet aggregate(List<Path> paths, List<String> aggres, IExpression expression,
- QueryContext context)
- throws PathException, IOException, StorageEngineException,
- QueryFilterOptimizationException {
+ public QueryDataSet aggregate(AggregationPlan aggregationPlan, QueryContext context)
+ throws PathException, IOException, StorageEngineException, QueryFilterOptimizationException {
return null;
}
@Override
- public QueryDataSet groupBy(List<Path> paths, List<String> aggres, IExpression expression,
- long unit, long slidingStep, long startTime, long endTime,
- QueryContext context)
- throws IOException, PathException,
- StorageEngineException, QueryFilterOptimizationException {
+ public QueryDataSet groupBy(GroupByPlan groupByPlan, QueryContext context)
+ throws IOException, PathException, StorageEngineException, QueryFilterOptimizationException {
return null;
}
@Override
- public QueryDataSet fill(List<Path> fillPaths, long queryTime, Map<TSDataType, IFill> fillTypes,
- QueryContext context)
+ public QueryDataSet fill(FillQueryPlan fillQueryPlan, QueryContext context)
throws IOException, PathException, StorageEngineException {
return null;
}
diff --git a/server/src/test/java/org/apache/iotdb/db/query/executor/GroupByEngineDataSetTest.java b/server/src/test/java/org/apache/iotdb/db/query/executor/GroupByEngineDataSetTest.java
index 802a9e6..8f3aa51 100644
--- a/server/src/test/java/org/apache/iotdb/db/query/executor/GroupByEngineDataSetTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/query/executor/GroupByEngineDataSetTest.java
@@ -18,18 +18,23 @@
*/
package org.apache.iotdb.db.query.executor;
+import java.io.IOException;
+import java.util.ArrayList;
+import org.apache.iotdb.db.exception.StorageEngineException;
+import org.apache.iotdb.db.exception.path.PathException;
+import org.apache.iotdb.db.qp.physical.crud.GroupByPlan;
+import org.apache.iotdb.db.query.aggregation.impl.CountAggrFunc;
+import org.apache.iotdb.db.query.context.QueryContext;
import org.apache.iotdb.db.query.dataset.groupby.GroupByEngineDataSet;
import org.apache.iotdb.db.query.dataset.groupby.GroupByWithValueFilterDataSet;
import org.apache.iotdb.tsfile.utils.Pair;
import org.junit.Assert;
import org.junit.Test;
-import java.io.IOException;
-
public class GroupByEngineDataSetTest {
@Test
- public void test1() throws IOException {
+ public void test1() throws IOException, PathException, StorageEngineException {
long queryId = 1000L;
long unit = 3;
long slidingStep = 5;
@@ -38,7 +43,14 @@ public class GroupByEngineDataSetTest {
long[] startTimeArray = {8, 13, 18, 23, 28};
long[] endTimeArray = {11, 16, 21, 26, 31};
- GroupByEngineDataSet groupByEngine = new GroupByWithValueFilterDataSet(queryId, null, unit, slidingStep, startTime, endTime);
+
+ GroupByPlan groupByPlan = new GroupByPlan();
+ groupByPlan.setUnit(unit);
+ groupByPlan.setSlidingStep(slidingStep);
+ groupByPlan.setStartTime(startTime);
+ groupByPlan.setEndTime(endTime);
+
+ GroupByEngineDataSet groupByEngine = new GroupByWithValueFilterDataSet(queryId, groupByPlan);
int cnt = 0;
while (groupByEngine.hasNext()) {
Pair pair = groupByEngine.nextTimePartition();
@@ -51,7 +63,7 @@ public class GroupByEngineDataSetTest {
}
@Test
- public void test2() throws IOException {
+ public void test2() throws IOException, PathException, StorageEngineException {
long queryId = 1000L;
long unit = 3;
long slidingStep = 5;
@@ -60,7 +72,13 @@ public class GroupByEngineDataSetTest {
long[] startTimeArray = {8, 13, 18, 23, 28};
long[] endTimeArray = {11, 16, 21, 26, 31};
- GroupByEngineDataSet groupByEngine = new GroupByWithValueFilterDataSet(queryId, null, unit, slidingStep, startTime, endTime);
+
+ GroupByPlan groupByPlan = new GroupByPlan();
+ groupByPlan.setUnit(unit);
+ groupByPlan.setSlidingStep(slidingStep);
+ groupByPlan.setStartTime(startTime);
+ groupByPlan.setEndTime(endTime);
+ GroupByEngineDataSet groupByEngine = new GroupByWithValueFilterDataSet(queryId, groupByPlan);
int cnt = 0;
while (groupByEngine.hasNext()) {
Pair pair = groupByEngine.nextTimePartition();
@@ -73,7 +91,7 @@ public class GroupByEngineDataSetTest {
}
@Test
- public void test3() throws IOException {
+ public void test3() throws IOException, PathException, StorageEngineException {
long queryId = 1000L;
long unit = 3;
long slidingStep = 3;
@@ -82,7 +100,13 @@ public class GroupByEngineDataSetTest {
long[] startTimeArray = {8, 11, 14, 17, 20, 23};
long[] endTimeArray = {11, 14, 17, 20, 23, 24};
- GroupByEngineDataSet groupByEngine = new GroupByWithValueFilterDataSet(queryId, null, unit, slidingStep, startTime, endTime);
+
+ GroupByPlan groupByPlan = new GroupByPlan();
+ groupByPlan.setUnit(unit);
+ groupByPlan.setSlidingStep(slidingStep);
+ groupByPlan.setStartTime(startTime);
+ groupByPlan.setEndTime(endTime);
+ GroupByEngineDataSet groupByEngine = new GroupByWithValueFilterDataSet(queryId, groupByPlan);
int cnt = 0;
while (groupByEngine.hasNext()) {
Pair pair = groupByEngine.nextTimePartition();
@@ -95,7 +119,7 @@ public class GroupByEngineDataSetTest {
}
@Test
- public void test4() throws IOException {
+ public void test4() throws IOException, PathException, StorageEngineException {
long queryId = 1000L;
long unit = 3;
long slidingStep = 3;
@@ -104,7 +128,13 @@ public class GroupByEngineDataSetTest {
long[] startTimeArray = {8, 11, 14, 17, 20};
long[] endTimeArray = {11, 14, 17, 20, 23};
- GroupByEngineDataSet groupByEngine = new GroupByWithValueFilterDataSet(queryId, null, unit, slidingStep, startTime, endTime);
+
+ GroupByPlan groupByPlan = new GroupByPlan();
+ groupByPlan.setUnit(unit);
+ groupByPlan.setSlidingStep(slidingStep);
+ groupByPlan.setStartTime(startTime);
+ groupByPlan.setEndTime(endTime);
+ GroupByEngineDataSet groupByEngine = new GroupByWithValueFilterDataSet(queryId, groupByPlan);
int cnt = 0;
while (groupByEngine.hasNext()) {
Pair pair = groupByEngine.nextTimePartition();
@@ -117,7 +147,7 @@ public class GroupByEngineDataSetTest {
}
@Test
- public void test5() throws IOException {
+ public void test5() throws IOException, PathException, StorageEngineException {
long queryId = 1000L;
long unit = 3;
long slidingStep = 3;
@@ -126,7 +156,15 @@ public class GroupByEngineDataSetTest {
long[] startTimeArray = {8, 11, 14, 17, 20, 23};
long[] endTimeArray = {11, 14, 17, 20, 23, 25};
- GroupByEngineDataSet groupByEngine = new GroupByWithValueFilterDataSet(queryId, null, unit, slidingStep, startTime, endTime);
+
+ GroupByPlan groupByPlan = new GroupByPlan();
+ groupByPlan.setUnit(unit);
+ groupByPlan.setSlidingStep(slidingStep);
+ groupByPlan.setStartTime(startTime);
+ groupByPlan.setEndTime(endTime);
+ ArrayList<Object> aggrList = new ArrayList<>();
+ aggrList.add(new CountAggrFunc());
+ GroupByEngineDataSet groupByEngine = new GroupByWithValueFilterDataSet(queryId, groupByPlan);
int cnt = 0;
while (groupByEngine.hasNext()) {
Pair pair = groupByEngine.nextTimePartition();
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/expression/QueryExpression.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/expression/QueryExpression.java
index dc876d4..9d068fc 100644
--- a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/expression/QueryExpression.java
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/expression/QueryExpression.java
@@ -20,11 +20,13 @@ package org.apache.iotdb.tsfile.read.expression;
import java.util.ArrayList;
import java.util.List;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
import org.apache.iotdb.tsfile.read.common.Path;
public class QueryExpression {
private List<Path> selectedSeries;
+ private List<TSDataType> dataTypes;
private IExpression expression;
private boolean hasQueryFilter;
@@ -37,7 +39,8 @@ public class QueryExpression {
return new QueryExpression();
}
- public static QueryExpression create(List<Path> selectedSeries, IExpression expression) {
+ public static QueryExpression create(List<Path> selectedSeries,
+ IExpression expression) {
QueryExpression ret = new QueryExpression();
ret.selectedSeries = selectedSeries;
ret.expression = expression;
@@ -74,11 +77,21 @@ public class QueryExpression {
@Override
public String toString() {
StringBuilder stringBuilder = new StringBuilder("\n\t[Selected Series]:").append(selectedSeries)
- .append("\n\t[expression]:").append(expression);
+ .append("\n\t[TSDataType]:").append(dataTypes).append("\n\t[expression]:")
+ .append(expression);
return stringBuilder.toString();
}
public boolean hasQueryFilter() {
return hasQueryFilter;
}
+
+ public List<TSDataType> getDataTypes() {
+ return dataTypes;
+ }
+
+ public QueryExpression setDataTypes(List<TSDataType> dataTypes) {
+ this.dataTypes = dataTypes;
+ return this;
+ }
}
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/query/dataset/QueryDataSet.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/query/dataset/QueryDataSet.java
index e8419d0..4fd3d11 100644
--- a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/query/dataset/QueryDataSet.java
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/query/dataset/QueryDataSet.java
@@ -41,10 +41,6 @@ public abstract class QueryDataSet {
this.dataTypes = dataTypes;
}
- public QueryDataSet(List<Path> paths) {
- this.paths = paths;
- }
-
public boolean hasNext() throws IOException {
// proceed to the OFFSET row by skipping rows
while (rowOffset > 0) {