You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by zy...@apache.org on 2022/10/28 14:25:31 UTC
[iotdb] branch research/quality-validity updated: update validity aggregation (#7783)
This is an automated email from the ASF dual-hosted git repository.
zyk pushed a commit to branch research/quality-validity
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/research/quality-validity by this push:
new 52d35570b0 update validity aggregation (#7783)
52d35570b0 is described below
commit 52d35570b05321a32d61eee1dea30150c0218966
Author: iotdbValidity <99...@users.noreply.github.com>
AuthorDate: Fri Oct 28 22:25:17 2022 +0800
update validity aggregation (#7783)
update validity aggregation
---
.../resources/conf/iotdb-engine.properties | 17 +-
.../java/org/apache/iotdb/db/conf/IoTDBConfig.java | 28 +
.../org/apache/iotdb/db/conf/IoTDBDescriptor.java | 56 +
.../apache/iotdb/db/cq/ContinuousQueryTask.java | 2 +-
.../trigger/sink/local/LocalIoTDBHandler.java | 4 +-
.../apache/iotdb/db/qp/constant/SQLConstant.java | 6 +-
.../qp/logical/crud/AggregationQueryOperator.java | 3 +-
.../iotdb/db/qp/logical/crud/QueryOperator.java | 2 +-
.../iotdb/db/qp/logical/crud/SelectComponent.java | 1 +
.../db/query/aggregation/AggregateResult.java | 2 +
.../db/query/aggregation/AggregationType.java | 14 +-
.../db/query/aggregation/impl/CountAggrResult.java | 9 +-
.../query/aggregation/impl/ValidityAggrResult.java | 177 ++
.../aggregation/impl/ValidityAllAggrResult.java | 165 ++
.../db/query/executor/AggregationExecutor.java | 553 ++++-
.../db/query/factory/AggregateResultFactory.java | 12 +
.../db/query/filter/executor/IPlanExecutor.java | 134 ++
.../db/query/filter/executor/PlanExecutor.java | 2176 ++++++++++++++++++++
.../db/service/basic/BasicServiceProvider.java | 4 +-
.../iotdb/db/engine/storagegroup/TTLTest.java | 2 +-
.../idtable/IDTableResourceControlTest.java | 2 +-
.../db/metadata/idtable/IDTableRestartTest.java | 2 +-
.../db/metadata/idtable/InsertWithIDTableTest.java | 2 +-
.../db/metadata/idtable/LastQueryWithIDTable.java | 2 +-
.../QueryAlignedTimeseriesWithIDTableTest.java | 2 +-
.../db/metadata/idtable/QueryWithIDTableTest.java | 2 +-
.../iotdb/db/qp/physical/InsertRowPlanTest.java | 2 +-
.../db/qp/physical/InsertTabletMultiPlanTest.java | 2 +-
.../iotdb/db/qp/physical/InsertTabletPlanTest.java | 2 +-
.../db/query/aggregation/AggregateResultTest.java | 32 +
.../query/aggregation/DescAggregateResultTest.java | 173 +-
.../dataset/EngineDataSetWithValueFilterTest.java | 4 +-
.../iotdb/db/query/dataset/ListDataSetTest.java | 4 +-
.../iotdb/db/query/dataset/SingleDataSetTest.java | 4 +-
.../query/dataset/UDTFAlignByTimeDataSetTest.java | 4 +-
.../dataset/groupby/GroupByFillDataSetTest.java | 4 +-
.../dataset/groupby/GroupByLevelDataSetTest.java | 4 +-
.../dataset/groupby/GroupByTimeDataSetTest.java | 4 +-
.../valuefilter/RawQueryWithValueFilterTest.java | 4 +-
.../iotdb/db/utils/TsFileRewriteToolTest.java | 4 +-
40 files changed, 3421 insertions(+), 205 deletions(-)
diff --git a/server/src/assembly/resources/conf/iotdb-engine.properties b/server/src/assembly/resources/conf/iotdb-engine.properties
index 13becfd599..acf9325cd5 100644
--- a/server/src/assembly/resources/conf/iotdb-engine.properties
+++ b/server/src/assembly/resources/conf/iotdb-engine.properties
@@ -236,7 +236,7 @@ timestamp_precision=ms
# Whether to timed flush unsequence tsfiles' memtables.
# Datatype: boolean
-# enable_timed_flush_unseq_memtable=true
+enable_timed_flush_unseq_memtable=false
# If a memTable's created time is older than current time minus this, the memtable will be flushed to disk.
# Only check unsequence tsfiles' memtables.
@@ -418,15 +418,15 @@ timestamp_precision=ms
####################
# sequence space compaction: only compact the sequence files
# Datatype: boolean
-# enable_seq_space_compaction=true
+enable_seq_space_compaction=false
# unsequence space compaction: only compact the unsequence files
# Datatype: boolean
-# enable_unseq_space_compaction=true
+enable_unseq_space_compaction=false
# cross space compaction: compact the unsequence files into the overlapped sequence files
# Datatype: boolean
-# enable_cross_space_compaction=true
+enable_cross_space_compaction=false
# the strategy of inner space compaction task
# Options: inplace_compaction
@@ -721,9 +721,12 @@ timestamp_precision=ms
# Datatype: int [xsy]
# page_size_in_byte=65536
+# xMax = 10000000
+# xMin = -1000000
+
# The maximum number of data points in a page, default 1024*1024
# Datatype: int [xsy]
-# max_number_of_points_in_page=1048576
+max_number_of_points_in_page=1024
# Max size limitation of input string
# Datatype: int [xsy]
@@ -953,4 +956,6 @@ timestamp_precision=ms
### Group By Fill Configuration
####################
# Datatype: float
-# group_by_fill_cache_size_in_mb=1.0
\ No newline at end of file
+# group_by_fill_cache_size_in_mb=1.0
+
+c = 100
\ No newline at end of file
diff --git a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
index 4842bb27e6..487beb299f 100644
--- a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
+++ b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
@@ -802,10 +802,38 @@ public class IoTDBConfig {
*/
private boolean enableIDTableLogFile = false;
+ private int c = 100;
+ private double smin = -999;
+ private double smax = 999;
+
public IoTDBConfig() {
// empty constructor
}
+ public int getParamC() {
+ return c;
+ }
+
+ public void setParamC(int c) {
+ this.c = c;
+ }
+
+ public double getsmin() {
+ return smin;
+ }
+
+ public void setsmin(double smin) {
+ this.smin = smin;
+ }
+
+ public double getsmax() {
+ return smax;
+ }
+
+ public void setsmax(double smax) {
+ this.smax = smax;
+ }
+
public float getUdfMemoryBudgetInMB() {
return udfMemoryBudgetInMB;
}
diff --git a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
index 849fddbdb6..8216e48229 100644
--- a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
+++ b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
@@ -774,6 +774,13 @@ public class IoTDBDescriptor {
"insert_multi_tablet_enable_multithreading_column_threshold",
String.valueOf(conf.getInsertMultiTabletEnableMultithreadingColumnThreshold()))));
+ conf.setParamC(
+ Integer.parseInt(properties.getProperty("c", Integer.toString(conf.getParamC()))));
+ conf.setsmin(
+ Double.parseDouble(properties.getProperty("smin", Double.toString(conf.getsmin()))));
+ conf.setsmax(
+ Double.parseDouble(properties.getProperty("smax", Double.toString(conf.getsmax()))));
+
// At the same time, set TSFileConfig
TSFileDescriptor.getInstance()
.getConfig()
@@ -974,6 +981,55 @@ public class IoTDBDescriptor {
"group_size_in_byte",
Integer.toString(
TSFileDescriptor.getInstance().getConfig().getGroupSizeInByte()))));
+ TSFileDescriptor.getInstance()
+ .getConfig()
+ .setXMax(
+ Double.parseDouble(
+ properties.getProperty(
+ "xMax",
+ Double.toString(TSFileDescriptor.getInstance().getConfig().getXMax()))));
+ TSFileDescriptor.getInstance()
+ .getConfig()
+ .setXMin(
+ Double.parseDouble(
+ properties.getProperty(
+ "xMin",
+ Double.toString(TSFileDescriptor.getInstance().getConfig().getXMin()))));
+ TSFileDescriptor.getInstance()
+ .getConfig()
+ .setsMax(
+ Double.parseDouble(
+ properties.getProperty(
+ "sMax",
+ Double.toString(TSFileDescriptor.getInstance().getConfig().getsMax()))));
+ TSFileDescriptor.getInstance()
+ .getConfig()
+ .setSmin(
+ Double.parseDouble(
+ properties.getProperty(
+ "sMin",
+ Double.toString(TSFileDescriptor.getInstance().getConfig().getSmin()))));
+ TSFileDescriptor.getInstance()
+ .getConfig()
+ .setMussRate(
+ Double.parseDouble(
+ properties.getProperty(
+ "mussRate",
+ Double.toString(TSFileDescriptor.getInstance().getConfig().getMussRate()))));
+ TSFileDescriptor.getInstance()
+ .getConfig()
+ .setUsePreSpeed(
+ Boolean.parseBoolean(
+ properties.getProperty(
+ "usePreSpeed",
+ Boolean.toString(TSFileDescriptor.getInstance().getConfig().isUsePreSpeed()))));
+ TSFileDescriptor.getInstance()
+ .getConfig()
+ .setUsePreRange(
+ Boolean.parseBoolean(
+ properties.getProperty(
+ "usePreRange",
+ Boolean.toString(TSFileDescriptor.getInstance().getConfig().isUsePreRange()))));
TSFileDescriptor.getInstance()
.getConfig()
.setPageSizeInByte(
diff --git a/server/src/main/java/org/apache/iotdb/db/cq/ContinuousQueryTask.java b/server/src/main/java/org/apache/iotdb/db/cq/ContinuousQueryTask.java
index 154c284fe8..6216487e5f 100644
--- a/server/src/main/java/org/apache/iotdb/db/cq/ContinuousQueryTask.java
+++ b/server/src/main/java/org/apache/iotdb/db/cq/ContinuousQueryTask.java
@@ -25,7 +25,6 @@ import org.apache.iotdb.db.exception.metadata.MetadataException;
import org.apache.iotdb.db.exception.query.QueryProcessException;
import org.apache.iotdb.db.metadata.path.PartialPath;
import org.apache.iotdb.db.qp.Planner;
-import org.apache.iotdb.db.qp.executor.PlanExecutor;
import org.apache.iotdb.db.qp.logical.crud.GroupByClauseComponent;
import org.apache.iotdb.db.qp.logical.crud.QueryOperator;
import org.apache.iotdb.db.qp.logical.crud.SelectComponent;
@@ -34,6 +33,7 @@ import org.apache.iotdb.db.qp.physical.crud.InsertTabletPlan;
import org.apache.iotdb.db.qp.physical.sys.CreateContinuousQueryPlan;
import org.apache.iotdb.db.query.context.QueryContext;
import org.apache.iotdb.db.query.control.QueryResourceManager;
+import org.apache.iotdb.db.query.filter.executor.PlanExecutor;
import org.apache.iotdb.db.utils.TypeInferenceUtils;
import org.apache.iotdb.tsfile.exception.filter.QueryFilterOptimizationException;
import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/trigger/sink/local/LocalIoTDBHandler.java b/server/src/main/java/org/apache/iotdb/db/engine/trigger/sink/local/LocalIoTDBHandler.java
index 3b224b20e5..b4f40e1f4d 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/trigger/sink/local/LocalIoTDBHandler.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/trigger/sink/local/LocalIoTDBHandler.java
@@ -27,10 +27,10 @@ import org.apache.iotdb.db.exception.metadata.MetadataException;
import org.apache.iotdb.db.exception.metadata.StorageGroupNotSetException;
import org.apache.iotdb.db.exception.query.QueryProcessException;
import org.apache.iotdb.db.metadata.path.PartialPath;
-import org.apache.iotdb.db.qp.executor.IPlanExecutor;
-import org.apache.iotdb.db.qp.executor.PlanExecutor;
import org.apache.iotdb.db.qp.physical.PhysicalPlan;
import org.apache.iotdb.db.qp.physical.crud.InsertRowPlan;
+import org.apache.iotdb.db.query.filter.executor.IPlanExecutor;
+import org.apache.iotdb.db.query.filter.executor.PlanExecutor;
import org.apache.iotdb.db.service.IoTDB;
import org.apache.iotdb.tsfile.common.conf.TSFileDescriptor;
import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
diff --git a/server/src/main/java/org/apache/iotdb/db/qp/constant/SQLConstant.java b/server/src/main/java/org/apache/iotdb/db/qp/constant/SQLConstant.java
index ce111ab37a..a7bb68583c 100644
--- a/server/src/main/java/org/apache/iotdb/db/qp/constant/SQLConstant.java
+++ b/server/src/main/java/org/apache/iotdb/db/qp/constant/SQLConstant.java
@@ -70,6 +70,8 @@ public class SQLConstant {
public static final String COUNT = "count";
public static final String AVG = "avg";
public static final String SUM = "sum";
+ public static final String VALIDITY = "validity";
+ public static final String VALIDITYALL = "validityall";
public static final String ALL = "all";
@@ -85,7 +87,9 @@ public class SQLConstant {
LAST_VALUE,
COUNT,
SUM,
- AVG));
+ AVG,
+ VALIDITY,
+ VALIDITYALL));
public static final int TOK_WHERE = 23;
public static final int TOK_INSERT = 24;
diff --git a/server/src/main/java/org/apache/iotdb/db/qp/logical/crud/AggregationQueryOperator.java b/server/src/main/java/org/apache/iotdb/db/qp/logical/crud/AggregationQueryOperator.java
index 36719c71ec..560fb23a15 100644
--- a/server/src/main/java/org/apache/iotdb/db/qp/logical/crud/AggregationQueryOperator.java
+++ b/server/src/main/java/org/apache/iotdb/db/qp/logical/crud/AggregationQueryOperator.java
@@ -125,7 +125,8 @@ public class AggregationQueryOperator extends QueryOperator {
protected AggregationPlan initAggregationPlan(QueryPlan queryPlan) throws QueryProcessException {
AggregationPlan aggregationPlan = (AggregationPlan) queryPlan;
- aggregationPlan.setAggregations(selectComponent.getAggregationFunctions());
+ aggregationPlan.setAggregations(selectComponent.getAggregationFunctions()); // function name
+ // TODO: aggregationPlan.setParams(selectComponent.getParams)
if (isGroupByLevel()) {
initGroupByLevel(aggregationPlan);
}
diff --git a/server/src/main/java/org/apache/iotdb/db/qp/logical/crud/QueryOperator.java b/server/src/main/java/org/apache/iotdb/db/qp/logical/crud/QueryOperator.java
index 4653ed6794..5f41e24902 100644
--- a/server/src/main/java/org/apache/iotdb/db/qp/logical/crud/QueryOperator.java
+++ b/server/src/main/java/org/apache/iotdb/db/qp/logical/crud/QueryOperator.java
@@ -60,7 +60,7 @@ public class QueryOperator extends Operator {
protected WhereComponent whereComponent;
protected SpecialClauseComponent specialClauseComponent;
- protected Map<String, Object> props;
+ protected Map<String, Object> props; // parameters
protected IndexType indexType;
protected boolean enableTracing;
diff --git a/server/src/main/java/org/apache/iotdb/db/qp/logical/crud/SelectComponent.java b/server/src/main/java/org/apache/iotdb/db/qp/logical/crud/SelectComponent.java
index f06d66158c..397f99f147 100644
--- a/server/src/main/java/org/apache/iotdb/db/qp/logical/crud/SelectComponent.java
+++ b/server/src/main/java/org/apache/iotdb/db/qp/logical/crud/SelectComponent.java
@@ -128,6 +128,7 @@ public final class SelectComponent {
expression instanceof FunctionExpression
? ((FunctionExpression) resultColumn.getExpression()).getFunctionName()
: null);
+ // TODO: resultColumn.getExpression().getFunctionAttributes()(
}
}
return aggregationFunctionsCache;
diff --git a/server/src/main/java/org/apache/iotdb/db/query/aggregation/AggregateResult.java b/server/src/main/java/org/apache/iotdb/db/query/aggregation/AggregateResult.java
index 7daa1e8d60..0b9e27b61c 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/aggregation/AggregateResult.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/aggregation/AggregateResult.java
@@ -67,6 +67,7 @@ public abstract class AggregateResult {
*
* @param statistics chunkStatistics or pageStatistics
*/
+ // update data by headers
public abstract void updateResultFromStatistics(Statistics statistics)
throws QueryProcessException;
@@ -76,6 +77,7 @@ public abstract class AggregateResult {
*
* @param batchIterator the data in Page
*/
+ // update by original data
public abstract void updateResultFromPageData(IBatchDataIterator batchIterator)
throws IOException, QueryProcessException;
diff --git a/server/src/main/java/org/apache/iotdb/db/query/aggregation/AggregationType.java b/server/src/main/java/org/apache/iotdb/db/query/aggregation/AggregationType.java
index a3c651835c..ad6632da1a 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/aggregation/AggregationType.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/aggregation/AggregationType.java
@@ -35,7 +35,9 @@ public enum AggregationType {
MIN_TIME,
MAX_VALUE,
MIN_VALUE,
- EXTREME;
+ EXTREME,
+ VALIDITY,
+ VALIDITYALL;
/**
* give an integer to return a data type.
@@ -65,6 +67,10 @@ public enum AggregationType {
return MIN_VALUE;
case 9:
return EXTREME;
+ case 10:
+ return VALIDITY;
+ case 11:
+ return VALIDITYALL;
default:
throw new IllegalArgumentException("Invalid Aggregation Type: " + i);
}
@@ -103,6 +109,12 @@ public enum AggregationType {
case EXTREME:
i = 9;
break;
+ case VALIDITY:
+ i = 10;
+ break;
+ case VALIDITYALL:
+ i = 11;
+ break;
default:
throw new IllegalArgumentException("Invalid Aggregation Type: " + this.name());
}
diff --git a/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/CountAggrResult.java b/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/CountAggrResult.java
index ec279729f9..84a25ab51f 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/CountAggrResult.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/CountAggrResult.java
@@ -24,6 +24,7 @@ import org.apache.iotdb.db.query.aggregation.AggregationType;
import org.apache.iotdb.db.query.reader.series.IReaderByTimestamp;
import org.apache.iotdb.db.utils.ValueIterator;
import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+import org.apache.iotdb.tsfile.file.metadata.statistics.DoubleStatistics;
import org.apache.iotdb.tsfile.file.metadata.statistics.Statistics;
import org.apache.iotdb.tsfile.read.common.IBatchDataIterator;
@@ -32,6 +33,7 @@ import java.io.OutputStream;
import java.nio.ByteBuffer;
public class CountAggrResult extends AggregateResult {
+ private Statistics statisticsInstance = new DoubleStatistics();
public CountAggrResult() {
super(TSDataType.INT64, AggregationType.COUNT);
@@ -41,12 +43,15 @@ public class CountAggrResult extends AggregateResult {
@Override
public Long getResult() {
- return getLongValue();
+ if (statisticsInstance.getCount() > 0) {
+ setLongValue((long) statisticsInstance.getValidityErrors());
+ }
+ return hasCandidateResult() ? getLongValue() : null;
}
@Override
public void updateResultFromStatistics(Statistics statistics) {
- setLongValue(getLongValue() + statistics.getCount());
+ setLongValue(getLongValue() + statistics.getValidityErrors());
}
@Override
diff --git a/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/ValidityAggrResult.java b/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/ValidityAggrResult.java
new file mode 100644
index 0000000000..a3cc39b039
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/ValidityAggrResult.java
@@ -0,0 +1,177 @@
+package org.apache.iotdb.db.query.aggregation.impl;
+
+import org.apache.iotdb.db.exception.query.QueryProcessException;
+import org.apache.iotdb.db.query.aggregation.AggregateResult;
+import org.apache.iotdb.db.query.aggregation.AggregationType;
+import org.apache.iotdb.db.query.reader.series.IReaderByTimestamp;
+import org.apache.iotdb.db.utils.ValueIterator;
+import org.apache.iotdb.tsfile.exception.filter.StatisticsClassException;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+import org.apache.iotdb.tsfile.file.metadata.statistics.DoubleStatistics;
+import org.apache.iotdb.tsfile.file.metadata.statistics.Statistics;
+import org.apache.iotdb.tsfile.read.common.IBatchDataIterator;
+import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;
+
+import java.io.IOException;
+import java.io.OutputStream;
+import java.nio.ByteBuffer;
+
+public class ValidityAggrResult extends AggregateResult {
+ private TSDataType seriesDataType;
+ private double validity = 0.0;
+ private double sum = 0.0;
+ private long cnt = 0;
+ private Statistics statisticsInstance = new DoubleStatistics();
+
+ public ValidityAggrResult(TSDataType seriesDataType) {
+ super(TSDataType.DOUBLE, AggregationType.VALIDITY);
+ this.seriesDataType = seriesDataType;
+ reset();
+ }
+
+ @Override
+ public Double getResult() {
+ if (statisticsInstance.getCount() > 0) {
+ setDoubleValue(statisticsInstance.getValidity());
+ }
+ return hasCandidateResult() ? getDoubleValue() : null;
+ }
+
+ @Override
+ public void updateResultFromStatistics(Statistics statistics) {
+ double speedBetween = 0;
+ // skip empty statistics
+ if (statistics.getCount() == 0) {
+ return;
+ }
+ // update intermediate results from statistics
+ // must be sure no overlap between two statistics
+ if (statistics instanceof DoubleStatistics) {
+ statisticsInstance.mergeStatistics(statistics);
+ } else {
+ throw new StatisticsClassException("Does not support: validity");
+ }
+ setDoubleValue(statisticsInstance.getValidity());
+ }
+
+ @Override
+ public void updateResultFromPageData(IBatchDataIterator batchIterator)
+ throws IOException, QueryProcessException {
+ updateResultFromPageData(batchIterator, Long.MIN_VALUE, Long.MAX_VALUE);
+ }
+
+ public void updateDPAndReverseDPAll() {
+ if (statisticsInstance.getTimeWindow().size() == 0) {
+ return;
+ }
+ statisticsInstance.optUpdateDPandReverseDP();
+ }
+
+ @Override
+ public void updateResultFromPageData(
+ IBatchDataIterator batchIterator, long minBound, long maxBound) throws IOException {
+ while (batchIterator.hasNext(minBound, maxBound)) {
+ if (batchIterator.currentTime() >= maxBound || batchIterator.currentTime() < minBound) {
+ break;
+ }
+ statisticsInstance.update(batchIterator.currentTime(), (double) batchIterator.currentValue());
+ batchIterator.next();
+ }
+ }
+
+ @Override
+ public void updateResultUsingTimestamps(
+ long[] timestamps, int length, IReaderByTimestamp dataReader) throws IOException {
+ Object[] values = dataReader.getValuesInTimestamps(timestamps, length);
+ for (int i = 0; i < length; i++) {
+ if (values[i] != null) {
+ statisticsInstance.update(timestamps[i], (double) values[i]);
+ }
+ }
+ }
+
+ @Override
+ public void updateResultUsingValues(long[] timestamps, int length, ValueIterator valueIterator) {
+ int i = 0;
+ while (valueIterator.hasNext() && i < length) {
+ statisticsInstance.update(timestamps[i], (double) valueIterator.next());
+ i++;
+ }
+ }
+
+ @Override
+ // TODO:修改判断
+ public boolean hasFinalResult() {
+ return false;
+ }
+
+ @Override
+ public void merge(AggregateResult another) {
+ // TODO: merge need be updated
+ ValidityAggrResult anotherVar = (ValidityAggrResult) another;
+ // skip empty results
+ if (anotherVar.getStatisticsInstance().getCount() == 0) {
+ return;
+ }
+ validity +=
+ (anotherVar.getResult() * anotherVar.getStatisticsInstance().getCount()
+ + validity * statisticsInstance.getCount())
+ / (anotherVar.getStatisticsInstance().getCount() + statisticsInstance.getCount());
+ setDoubleValue(validity);
+ }
+
+ @Override
+ protected void deserializeSpecificFields(ByteBuffer buffer) {
+ this.seriesDataType = TSDataType.deserialize(buffer.get());
+ this.validity = buffer.getDouble();
+ this.sum = buffer.getDouble();
+ this.cnt = buffer.getLong();
+ }
+
+ @Override
+ protected void serializeSpecificFields(OutputStream outputStream) throws IOException {
+ ReadWriteIOUtils.write(seriesDataType, outputStream);
+ ReadWriteIOUtils.write(validity, outputStream);
+ ReadWriteIOUtils.write(sum, outputStream);
+ ReadWriteIOUtils.write(cnt, outputStream);
+ }
+
+ @Override
+ public void reset() {
+ super.reset();
+ validity = 0.0;
+ sum = 0.0;
+ cnt = 0L;
+ statisticsInstance = new DoubleStatistics();
+ }
+
+ public void setValidityErrors(int errors) {
+ this.statisticsInstance.setValidityErrors(errors);
+ this.statisticsInstance.setRepairSelfLast(true);
+ this.statisticsInstance.setRepairSelfFirst(true);
+ }
+
+ public double getValidity() {
+ return validity;
+ }
+
+ public double getSum() {
+ return sum;
+ }
+
+ public long getCnt() {
+ return cnt;
+ }
+
+ public Statistics getStatisticsInstance() {
+ return statisticsInstance;
+ }
+
+ public void setStatisticsInstance(Statistics statisticsInstance) {
+ this.statisticsInstance = statisticsInstance;
+ }
+
+ public boolean checkMergeable(Statistics statistics) {
+ return statisticsInstance.checkMergeable(statistics);
+ }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/ValidityAllAggrResult.java b/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/ValidityAllAggrResult.java
new file mode 100644
index 0000000000..426ecd5d7a
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/ValidityAllAggrResult.java
@@ -0,0 +1,165 @@
+package org.apache.iotdb.db.query.aggregation.impl;
+
+import org.apache.iotdb.db.exception.query.QueryProcessException;
+import org.apache.iotdb.db.query.aggregation.AggregateResult;
+import org.apache.iotdb.db.query.aggregation.AggregationType;
+import org.apache.iotdb.db.query.reader.series.IReaderByTimestamp;
+import org.apache.iotdb.db.utils.ValueIterator;
+import org.apache.iotdb.tsfile.exception.filter.StatisticsClassException;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+import org.apache.iotdb.tsfile.file.metadata.statistics.DoubleStatistics;
+import org.apache.iotdb.tsfile.file.metadata.statistics.Statistics;
+import org.apache.iotdb.tsfile.read.common.IBatchDataIterator;
+import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;
+
+import java.io.IOException;
+import java.io.OutputStream;
+import java.nio.ByteBuffer;
+
+// TODO: update
+public class ValidityAllAggrResult extends AggregateResult {
+ private TSDataType seriesDataType;
+ private double validity = 0.0;
+ private double sum = 0.0;
+ private long cnt = 0;
+ private Statistics statisticsInstance = new DoubleStatistics();
+
+ public ValidityAllAggrResult(TSDataType seriesDataType) {
+ super(TSDataType.DOUBLE, AggregationType.VALIDITYALL);
+ this.seriesDataType = seriesDataType;
+ reset();
+ }
+
+ @Override
+ public Double getResult() {
+ if (statisticsInstance.getCount() > 0) {
+ setDoubleValue(statisticsInstance.getValidity());
+ }
+ return hasCandidateResult() ? getDoubleValue() : null;
+ }
+
+ @Override
+ public void updateResultFromStatistics(Statistics statistics) {
+ double speedBetween = 0;
+ // skip empty statistics
+ if (statistics.getCount() == 0) {
+ return;
+ }
+ // update intermediate results from statistics
+ // must be sure no overlap between two statistics
+ if (statistics instanceof DoubleStatistics) {
+ statisticsInstance.mergeStatistics(statistics);
+ } else {
+ throw new StatisticsClassException("Does not support: validityall");
+ }
+ setDoubleValue(statisticsInstance.getValidity());
+ }
+
+ @Override
+ public void updateResultFromPageData(IBatchDataIterator batchIterator)
+ throws IOException, QueryProcessException {
+ updateResultFromPageData(batchIterator, Long.MIN_VALUE, Long.MAX_VALUE);
+ }
+
+ public void updateDPAndReverseDP() {
+ statisticsInstance.optUpdateDPandReverseDP();
+ }
+
+ @Override
+ public void updateResultFromPageData(
+ IBatchDataIterator batchIterator, long minBound, long maxBound) throws IOException {
+ while (batchIterator.hasNext(minBound, maxBound)) {
+ if (batchIterator.currentTime() >= maxBound || batchIterator.currentTime() < minBound) {
+ break;
+ }
+ statisticsInstance.updateAll(
+ batchIterator.currentTime(), (double) batchIterator.currentValue());
+ batchIterator.next();
+ }
+ }
+
+ @Override
+ public void updateResultUsingTimestamps(
+ long[] timestamps, int length, IReaderByTimestamp dataReader) throws IOException {
+ Object[] values = dataReader.getValuesInTimestamps(timestamps, length);
+ for (int i = 0; i < length; i++) {
+ if (values[i] != null) {
+ statisticsInstance.updateAll(timestamps[i], (double) values[i]);
+ }
+ }
+ }
+
+ @Override
+ public void updateResultUsingValues(long[] timestamps, int length, ValueIterator valueIterator) {
+ int i = 0;
+ while (valueIterator.hasNext() && i < length) {
+ statisticsInstance.updateAll(timestamps[i], (double) valueIterator.next());
+ i++;
+ }
+ }
+
+ @Override
+ // TODO:修改判断
+ public boolean hasFinalResult() {
+ return false;
+ }
+
+ @Override
+ public void merge(AggregateResult another) {
+ // TODO: merge need be updated
+ ValidityAggrResult anotherVar = (ValidityAggrResult) another;
+ // skip empty results
+ if (anotherVar.getStatisticsInstance().getCount() == 0) {
+ return;
+ }
+ validity +=
+ (anotherVar.getResult() * anotherVar.getStatisticsInstance().getCount()
+ + validity * statisticsInstance.getCount())
+ / (anotherVar.getStatisticsInstance().getCount() + statisticsInstance.getCount());
+ setDoubleValue(validity);
+ }
+
+ @Override
+ protected void deserializeSpecificFields(ByteBuffer buffer) {
+ this.seriesDataType = TSDataType.deserialize(buffer.get());
+ this.validity = buffer.getDouble();
+ this.sum = buffer.getDouble();
+ this.cnt = buffer.getLong();
+ }
+
+ @Override
+ protected void serializeSpecificFields(OutputStream outputStream) throws IOException {
+ ReadWriteIOUtils.write(seriesDataType, outputStream);
+ ReadWriteIOUtils.write(validity, outputStream);
+ ReadWriteIOUtils.write(sum, outputStream);
+ ReadWriteIOUtils.write(cnt, outputStream);
+ }
+
+ @Override
+ public void reset() {
+ super.reset();
+ validity = 0.0;
+ sum = 0.0;
+ cnt = 0L;
+ }
+
+ public double getValidity() {
+ return validity;
+ }
+
+ public double getSum() {
+ return sum;
+ }
+
+ public long getCnt() {
+ return cnt;
+ }
+
+ public Statistics getStatisticsInstance() {
+ return statisticsInstance;
+ }
+
+ public boolean checkMergeable(Statistics statistics) {
+ return statisticsInstance.checkMergeable(statistics);
+ }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/query/executor/AggregationExecutor.java b/server/src/main/java/org/apache/iotdb/db/query/executor/AggregationExecutor.java
index fc33f2639a..6ae35f6674 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/executor/AggregationExecutor.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/executor/AggregationExecutor.java
@@ -34,6 +34,9 @@ import org.apache.iotdb.db.qp.physical.crud.AggregationPlan;
import org.apache.iotdb.db.qp.physical.crud.QueryPlan;
import org.apache.iotdb.db.qp.physical.crud.RawDataQueryPlan;
import org.apache.iotdb.db.query.aggregation.AggregateResult;
+import org.apache.iotdb.db.query.aggregation.AggregationType;
+import org.apache.iotdb.db.query.aggregation.impl.ValidityAggrResult;
+import org.apache.iotdb.db.query.aggregation.impl.ValidityAllAggrResult;
import org.apache.iotdb.db.query.context.QueryContext;
import org.apache.iotdb.db.query.control.QueryResourceManager;
import org.apache.iotdb.db.query.dataset.SingleDataSet;
@@ -47,6 +50,8 @@ import org.apache.iotdb.db.query.reader.series.SeriesReaderByTimestamp;
import org.apache.iotdb.db.query.timegenerator.ServerTimeGenerator;
import org.apache.iotdb.db.utils.QueryUtils;
import org.apache.iotdb.db.utils.ValueIterator;
+import org.apache.iotdb.tsfile.common.conf.TSFileConfig;
+import org.apache.iotdb.tsfile.common.conf.TSFileDescriptor;
import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
import org.apache.iotdb.tsfile.file.metadata.statistics.Statistics;
import org.apache.iotdb.tsfile.read.common.BatchData;
@@ -59,14 +64,8 @@ import org.apache.iotdb.tsfile.read.query.dataset.QueryDataSet;
import org.apache.iotdb.tsfile.read.query.timegenerator.TimeGenerator;
import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
+import java.util.*;
import java.util.Map.Entry;
-import java.util.Set;
import static org.apache.iotdb.tsfile.read.query.executor.ExecutorWithTimeGenerator.markFilterdPaths;
@@ -78,6 +77,7 @@ public class AggregationExecutor {
protected List<String> aggregations;
protected IExpression expression;
protected boolean ascending;
+ private final TSFileConfig tsFileConfig = TSFileDescriptor.getInstance().getConfig();
protected QueryContext context;
protected AggregateResult[] aggregateResultList;
@@ -101,6 +101,7 @@ public class AggregationExecutor {
/** execute aggregate function with only time filter or no filter. */
public QueryDataSet executeWithoutValueFilter(AggregationPlan aggregationPlan)
throws StorageEngineException, IOException, QueryProcessException {
+ // TODO: store the params of aggregationPlan
Filter timeFilter = null;
if (expression != null) {
@@ -155,13 +156,27 @@ public class AggregationExecutor {
throws IOException, QueryProcessException, StorageEngineException {
List<AggregateResult> ascAggregateResultList = new ArrayList<>();
List<AggregateResult> descAggregateResultList = new ArrayList<>();
+ ValidityAggrResult validityAggrResult = null;
+ ValidityAllAggrResult validityAggrALLResult = null;
boolean[] isAsc = new boolean[aggregateResultList.length];
+ boolean[] isValidity = new boolean[aggregateResultList.length];
TSDataType tsDataType = dataTypes.get(indexes.get(0));
for (int i : indexes) {
// construct AggregateResult
AggregateResult aggregateResult =
AggregateResultFactory.getAggrResultByName(aggregations.get(i), tsDataType);
+ // TODO: revise AAggregateResultFactory, Type
+ // TODO: if (getAggregationType() == ...) {}
+ if (aggregateResult.getAggregationType() == AggregationType.VALIDITY) {
+ validityAggrResult = (ValidityAggrResult) aggregateResult;
+ isValidity[i] = true;
+ continue;
+ } else if (aggregateResult.getAggregationType() == AggregationType.VALIDITYALL) {
+ validityAggrALLResult = (ValidityAllAggrResult) aggregateResult;
+ isValidity[i] = true;
+ continue;
+ }
if (aggregateResult.isAscending()) {
ascAggregateResultList.add(aggregateResult);
isAsc[i] = true;
@@ -178,15 +193,529 @@ public class AggregationExecutor {
ascAggregateResultList,
descAggregateResultList,
null);
-
+ if (validityAggrResult != null) {
+ aggregateValidity(
+ seriesPath,
+ allMeasurementsInDevice,
+ context,
+ timeFilter,
+ tsDataType,
+ validityAggrResult,
+ null);
+ } else if (validityAggrALLResult != null) {
+ aggregateValidityALL(
+ seriesPath,
+ allMeasurementsInDevice,
+ context,
+ timeFilter,
+ tsDataType,
+ validityAggrALLResult,
+ null);
+ }
int ascIndex = 0;
int descIndex = 0;
for (int i : indexes) {
- aggregateResultList[i] =
- isAsc[i]
- ? ascAggregateResultList.get(ascIndex++)
- : descAggregateResultList.get(descIndex++);
+ if (isValidity[i]) {
+ if (validityAggrResult != null) {
+ aggregateResultList[i] = validityAggrResult;
+ } else {
+ aggregateResultList[i] = validityAggrALLResult;
+ }
+ } else {
+ aggregateResultList[i] =
+ (isAsc[i]
+ ? ascAggregateResultList.get(ascIndex++)
+ : descAggregateResultList.get(descIndex++));
+ }
+ }
+ }
+
+ // 所有都查
+ private void aggregateValidityALL(
+ PartialPath seriesPath,
+ Set<String> measurements,
+ QueryContext context,
+ Filter timeFilter,
+ TSDataType tsDataType,
+ ValidityAllAggrResult validityAllAggrResult,
+ TsFileFilter fileFilter)
+ throws QueryProcessException, StorageEngineException, IOException {
+ // construct series reader without value filter
+ QueryDataSource queryDataSource =
+ QueryResourceManager.getInstance().getQueryDataSource(seriesPath, context, timeFilter);
+ if (fileFilter != null) {
+ QueryUtils.filterQueryDataSource(queryDataSource, fileFilter);
+ }
+ // update filter by TTL
+ timeFilter = queryDataSource.updateFilterUsingTTL(timeFilter);
+
+ IAggregateReader seriesReader =
+ new SeriesAggregateReader(
+ seriesPath,
+ measurements,
+ tsDataType,
+ context,
+ queryDataSource,
+ timeFilter,
+ null,
+ null,
+ true);
+ int res = 0;
+ int len = 0;
+ while (seriesReader.hasNextFile()) {
+ while (seriesReader.hasNextChunk()) {
+ while (seriesReader.hasNextPage()) {
+ if (seriesReader.canUseCurrentPageStatistics()) {
+ Statistics pageStatistic = seriesReader.currentPageStatistics();
+ res += pageStatistic.getValidityErrors();
+ len += pageStatistic.getCount();
+ }
+ }
+ }
+ }
+ System.out.println(1 - (double) res / len);
+ }
+
+ // validity
+ private void aggregateValidity(
+ PartialPath seriesPath,
+ Set<String> measurements,
+ QueryContext context,
+ Filter timeFilter,
+ TSDataType tsDataType,
+ ValidityAggrResult validityAggrResult,
+ TsFileFilter fileFilter)
+ throws QueryProcessException, StorageEngineException, IOException {
+ // construct series reader without value filter
+ QueryDataSource queryDataSource =
+ QueryResourceManager.getInstance().getQueryDataSource(seriesPath, context, timeFilter);
+ if (fileFilter != null) {
+ QueryUtils.filterQueryDataSource(queryDataSource, fileFilter);
+ }
+ // update filter by TTL
+ timeFilter = queryDataSource.updateFilterUsingTTL(timeFilter);
+
+ IAggregateReader seriesReader =
+ new SeriesAggregateReader(
+ seriesPath,
+ measurements,
+ tsDataType,
+ context,
+ queryDataSource,
+ timeFilter,
+ null,
+ null,
+ true);
+
+ // unseq or seq
+ int nowPageIndex = 0;
+ List<Statistics> statisticsList = new ArrayList<>();
+ List<Integer> indexUsed = new ArrayList<>();
+ List<Integer> unseqIndex = new ArrayList<>();
+ List<Integer> unseqStartIndex = new ArrayList<>();
+ int cnt = 0;
+
+ boolean unseqMerge = true;
+ while (seriesReader.hasNextFile()) {
+ while (seriesReader.hasNextChunk()) {
+ while (seriesReader.hasNextPage()) {
+ cnt += 1;
+ // seq
+ if (seriesReader.canUseCurrentPageStatistics()) {
+ unseqMerge = false;
+ if (unseqIndex.size() > 0) {
+ indexUsed.add(unseqIndex.get(0));
+ unseqStartIndex.add(unseqIndex.get(0));
+ unseqIndex.clear();
+ }
+ indexUsed.add(nowPageIndex);
+ Statistics pageStatistic = seriesReader.currentPageStatistics();
+ if (nowPageIndex == 0) {
+ // if (!pageStatistic.sameConstraints(
+ // tsFileConfig.getsMax(),
+ // tsFileConfig.getSmin(),
+ // tsFileConfig.getXMax(),
+ // tsFileConfig.getXMin())) {
+ // System.out.println("constraints inconsistent with TsFile");
+ // return;
+ // }
+ validityAggrResult.setStatisticsInstance(pageStatistic);
+ nowPageIndex++;
+ seriesReader.skipCurrentPage();
+ continue;
+ }
+ // validityAggrResult.updateDPAndReverseDPAll();
+ statisticsList.add(validityAggrResult.getStatisticsInstance());
+ validityAggrResult.setStatisticsInstance(pageStatistic);
+ seriesReader.skipCurrentPage();
+ nowPageIndex++;
+ continue;
+ } else {
+ System.out.println("unSeq");
+ if (!unseqMerge && nowPageIndex > 0) {
+ statisticsList.add(validityAggrResult.getStatisticsInstance());
+ validityAggrResult.reset();
+ }
+ unseqMerge = true;
+ }
+ IBatchDataIterator batchDataIterator = seriesReader.nextPage().getBatchDataIterator();
+ validityAggrResult.updateResultFromPageData(batchDataIterator);
+ unseqIndex.add(nowPageIndex);
+ nowPageIndex++;
+ batchDataIterator.reset();
+ }
+ }
+ }
+ if (unseqIndex.size() > 0) {
+ indexUsed.add(unseqIndex.get(0));
+ unseqStartIndex.add(unseqIndex.get(0));
+ unseqIndex.clear();
+ // validityAggrResult.updateDPAndReverseDPAll();
+ }
+ statisticsList.add(validityAggrResult.getStatisticsInstance());
+
+ // can merge or not merge
+ boolean[] indexBoolean = new boolean[nowPageIndex];
+ for (int j = nowPageIndex - 1; j > 0; j--) {
+ indexBoolean[j] = indexUsed.contains(j);
+ }
+
+ System.out.println("page num:" + cnt);
+ System.out.println("indexUsed-length:" + indexUsed.size());
+ System.out.println("unseq-length:" + unseqStartIndex.size());
+ System.out.println("statisticList-length:" + statisticsList.size());
+
+ List<Statistics> splitStatisticsList = new ArrayList<>();
+
+ int p = 0, index = 0;
+ int c = IoTDBDescriptor.getInstance().getConfig().getParamC();
+ System.out.println("c=" + c);
+ List<Integer> newIndexList = new ArrayList<>();
+ Map<Integer, Integer> newIndexToIndexUsed = new HashMap<>();
+ while (true) {
+ // only re-split overlapping segments
+ if (p >= statisticsList.size() || p >= indexUsed.size()) break;
+ Statistics pageStatistic = statisticsList.get(p);
+ if (!unseqStartIndex.contains(indexUsed.get(p))) {
+ splitStatisticsList.add(pageStatistic);
+ newIndexToIndexUsed.put(index, indexUsed.get(p));
+ p++;
+ index++;
+ continue;
+ }
+ System.out.println("Splitting one overlapping page.");
+
+ List<Long> timeWindow = pageStatistic.getTimeWindow();
+ List<Double> valueWindow = pageStatistic.getValueWindow();
+ boolean[] ifValueViolation = new boolean[timeWindow.size()];
+ boolean[] ifSpeedViolation = new boolean[timeWindow.size()];
+
+ double speedAVG = pageStatistic.getSpeedAVG(), speedSTD = pageStatistic.getSpeedSTD();
+ double smax = speedAVG + 3 * speedSTD;
+ double smin = speedAVG - 3 * speedSTD;
+ double xmin = pageStatistic.getxMin(), xmax = pageStatistic.getxMax();
+ if (Math.abs(smax) > Math.abs(smin)) {
+ smin = -(speedAVG + 3 * speedSTD);
+ } else {
+ smax = -(speedAVG - 3 * speedSTD);
+ }
+
+ int vioCnt = 0;
+ List<Double> speeds = new ArrayList<>();
+ // determine value violation points
+ for (int j = 0; j < timeWindow.size(); j++) {
+ if (valueWindow.get(j) >= xmin && valueWindow.get(j) <= xmax) ifValueViolation[j] = false;
+ else ifValueViolation[j] = true;
+ if (j > 0)
+ speeds.add(
+ (valueWindow.get(j) - valueWindow.get(j - 1))
+ / (timeWindow.get(j) - timeWindow.get(j - 1)));
+ }
+
+ // determine speed violation points
+ if (speeds.get(0) >= smin && speeds.get(0) <= smax) ifSpeedViolation[0] = false;
+ else ifSpeedViolation[0] = true;
+ for (int j = 1; j < timeWindow.size() - 1; j++) {
+ if (speeds.get(j - 1) >= smin
+ && speeds.get(j - 1) <= smax
+ && speeds.get(j) >= smin
+ && speeds.get(j) <= smax) ifSpeedViolation[j] = false;
+ else ifSpeedViolation[j] = true;
+ }
+ if (speeds.get(speeds.size() - 1) >= smin && speeds.get(speeds.size() - 1) <= smax)
+ ifSpeedViolation[timeWindow.size() - 1] = false;
+ else ifSpeedViolation[timeWindow.size() - 1] = true;
+ System.out.println("speed violation points:" + vioCnt);
+
+ // optimistic split
+ int s = 0, e = 0; // double pointers
+ int consecCnt = 0, violationCnt = 0;
+ List<Integer> violationIndexList = new ArrayList<>();
+
+ while (e < ifValueViolation.length) {
+ if (!ifValueViolation[e] && !ifSpeedViolation[e]) consecCnt += 1;
+ else {
+ consecCnt = 0;
+ violationIndexList.add(e);
+ violationCnt += 1;
+ }
+ if (consecCnt == c) {
+ ValueIterator valueIterator =
+ new ValueIterator(valueWindow.subList(s, e - c / 2 + 1).toArray());
+ long[] timestamps = new long[e - c / 2 - s + 1];
+ for (int k = 0; k < e - c / 2 - s + 1; k++) {
+ timestamps[k] = timeWindow.get(s + k);
+ }
+ validityAggrResult.reset();
+ validityAggrResult.updateResultUsingValues(timestamps, e - c / 2 - s + 1, valueIterator);
+
+ if (violationCnt < 2) validityAggrResult.setValidityErrors(violationCnt);
+ else validityAggrResult.updateDPAndReverseDPAll();
+ splitStatisticsList.add(validityAggrResult.getStatisticsInstance());
+ newIndexList.add(index);
+ index++;
+
+ s = e - c / 2 + 1;
+ consecCnt = 0;
+ violationCnt = 0;
+ violationIndexList.clear();
+ }
+ e++;
+ }
+ if (e == ifValueViolation.length && s < e) {
+ ValueIterator valueIterator = new ValueIterator(valueWindow.subList(s, e).toArray());
+ long[] timestamps = new long[e - s + 1];
+ for (int k = 0; k < e - s; k++) {
+ timestamps[k] = timeWindow.get(s + k);
+ }
+ validityAggrResult.reset();
+ validityAggrResult.updateResultUsingValues(timestamps, e - s, valueIterator);
+ if (violationCnt < 2) validityAggrResult.setValidityErrors(violationCnt);
+ else validityAggrResult.updateDPAndReverseDPAll();
+ splitStatisticsList.add(validityAggrResult.getStatisticsInstance());
+ newIndexList.add(index);
+ index++;
+ }
+ p++;
+ }
+ System.out.println("splitStatisticsList:" + splitStatisticsList.size());
+
+ int i = 0;
+ List<Statistics> finalStatisticsList = new ArrayList<>();
+ finalStatisticsList = statisticsList;
+ boolean[] canMerge = new boolean[splitStatisticsList.size()];
+ for (int k = 0; k < splitStatisticsList.size(); k++) {
+ canMerge[k] = true;
+ }
+ validityAggrResult.reset();
+ int pagesize = 0;
+ while (i < splitStatisticsList.size()) {
+ // TODO: check merge code
+ Statistics pageStatistic = splitStatisticsList.get(i);
+ if (validityAggrResult.checkMergeable(pageStatistic)) {
+ finalStatisticsList.add(pageStatistic);
+ validityAggrResult.setStatisticsInstance(pageStatistic);
+ pagesize++;
+ System.out.println("can merge");
+ i++;
+ } else {
+ System.out.println("can not Merge");
+ canMerge[i] = false;
+
+ for (int j = i - 1; j >= 0; j--) {
+ if (!canMerge[j] && j > 0) {
+ continue;
+ }
+ System.out.println("Merging from " + j + "-th sub-page to " + i + "-th sub-page");
+
+ validityAggrResult.reset();
+ if (finalStatisticsList.size() > 0) {
+ finalStatisticsList.remove(finalStatisticsList.size() - 1);
+ }
+ for (int pageIndex = j; pageIndex <= i; pageIndex++) {
+ if (newIndexList.contains(pageIndex)) {
+ Statistics curStatistics = splitStatisticsList.get(pageIndex);
+ validityAggrResult.updateResultFromStatistics(curStatistics);
+ } else {
+ int originPageIndex = newIndexToIndexUsed.get(pageIndex);
+ IAggregateReader seriesReaderTemp =
+ new SeriesAggregateReader(
+ seriesPath,
+ measurements,
+ tsDataType,
+ context,
+ queryDataSource,
+ timeFilter,
+ null,
+ null,
+ true);
+ int curIndex = 0;
+ while (seriesReaderTemp.hasNextFile()) {
+ while (seriesReaderTemp.hasNextChunk()) {
+ while (seriesReaderTemp.hasNextPage()) {
+ if (curIndex < originPageIndex) {
+ seriesReaderTemp.nextPage();
+ curIndex++;
+ } else if (curIndex > originPageIndex) break;
+ else {
+ IBatchDataIterator batchDataIterator =
+ seriesReaderTemp.nextPage().getBatchDataIterator();
+ validityAggrResult.updateResultFromPageData(batchDataIterator);
+ batchDataIterator.reset();
+ curIndex++;
+ }
+ }
+ if (curIndex > originPageIndex) break;
+ }
+ if (curIndex > originPageIndex) break;
+ }
+ }
+ }
+ if (validityAggrResult.getStatisticsInstance().getCount() > c) {
+ int errors =
+ optimisticSplit(validityAggrResult.getStatisticsInstance(), validityAggrResult, c);
+ validityAggrResult.reset();
+ validityAggrResult.setValidityErrors(errors);
+ } else {
+ validityAggrResult.updateDPAndReverseDPAll();
+ }
+ if (finalStatisticsList.size() == 0) {
+ break;
+ }
+ if (finalStatisticsList
+ .get(finalStatisticsList.size() - 1)
+ .checkMergeable(validityAggrResult.getStatisticsInstance())) {
+ break;
+ } else {
+ canMerge[j] = false;
+ }
+ }
+ finalStatisticsList.add(validityAggrResult.getStatisticsInstance());
+ i++;
+ }
+ }
+
+ validityAggrResult.reset();
+ i = 0;
+ while (i < finalStatisticsList.size()) {
+ // System.out.println("merge process");
+ validityAggrResult.updateResultFromStatistics(finalStatisticsList.get(i));
+ i++;
+ }
+ // System.out.println(validityAggrResult.getStatisticsInstance().getRepairSelfLast());
+ // double smax =
+ // validityAggrResult.getStatisticsInstance().getSpeedAVG()
+ // + 3 * validityAggrResult.getStatisticsInstance().getSpeedSTD();
+ // double smin =
+ // validityAggrResult.getStatisticsInstance().getSpeedAVG()
+ // - 3 * validityAggrResult.getStatisticsInstance().getSpeedSTD();
+ // if (tsFileConfig.isUsePreSpeed()) {
+ // System.out.println("smax:" + tsFileConfig.getsMax() + "smin:" + tsFileConfig.getSmin());
+ // } else {
+ // System.out.println("smax:" + smax + "smin:" + smin);
+ // }
+ // System.out.println(finalStatisticsList.size());
+ }
+
+ private int optimisticSplit(
+ Statistics pageStatistic, ValidityAggrResult validityAggrResult, int c) {
+ List<Long> timeWindow = pageStatistic.getTimeWindow();
+ List<Double> valueWindow = pageStatistic.getValueWindow();
+ boolean[] ifValueViolation = new boolean[timeWindow.size()];
+ boolean[] ifSpeedViolation = new boolean[timeWindow.size()];
+
+ double preSpeed,
+ succSpeed,
+ speedAVG = pageStatistic.getSpeedAVG(),
+ speedSTD = pageStatistic.getSpeedSTD();
+ double smax = speedAVG + 3 * speedSTD;
+ double smin = speedAVG - 3 * speedSTD;
+ double xmin = pageStatistic.getxMin(), xmax = pageStatistic.getxMax();
+ if (Math.abs(smax) > Math.abs(smin)) {
+ smin = -(speedAVG + 3 * speedSTD);
+ } else {
+ smax = -(speedAVG - 3 * speedSTD);
+ }
+
+ int vioCnt = 0;
+ List<Double> speeds = new ArrayList<>();
+ // determine value violation points
+ for (int j = 0; j < timeWindow.size(); j++) {
+ if (valueWindow.get(j) >= xmin && valueWindow.get(j) <= xmax) ifValueViolation[j] = false;
+ else ifValueViolation[j] = true;
+ if (j > 0)
+ speeds.add(
+ (valueWindow.get(j) - valueWindow.get(j - 1))
+ / (timeWindow.get(j) - timeWindow.get(j - 1)));
+ }
+
+ // determine speed violation points
+ if (speeds.get(0) >= smin && speeds.get(0) <= smax) ifSpeedViolation[0] = false;
+ else ifSpeedViolation[0] = true;
+ for (int j = 1; j < timeWindow.size() - 1; j++) {
+ if (speeds.get(j - 1) >= smin
+ && speeds.get(j - 1) <= smax
+ && speeds.get(j) >= smin
+ && speeds.get(j) <= smax) ifSpeedViolation[j] = false;
+ else ifSpeedViolation[j] = true;
+ }
+ if (speeds.get(speeds.size() - 1) >= smin && speeds.get(speeds.size() - 1) <= smax)
+ ifSpeedViolation[timeWindow.size() - 1] = false;
+ else ifSpeedViolation[timeWindow.size() - 1] = true;
+ System.out.println("speed violation points:" + vioCnt);
+
+ // optimistic split
+ int s = 0, e = 0; // double pointers
+ int consecCnt = 0, violationCnt = 0;
+ List<Integer> violationIndexList = new ArrayList<>();
+ List<Statistics> resStatisticsList = new ArrayList<>();
+
+ while (e < ifValueViolation.length) {
+ if (!ifValueViolation[e] && !ifSpeedViolation[e]) consecCnt += 1;
+ else {
+ consecCnt = 0;
+ violationIndexList.add(e);
+ violationCnt += 1;
+ }
+ if (consecCnt == c) {
+ ValueIterator valueIterator =
+ new ValueIterator(valueWindow.subList(s, e - c / 2 + 1).toArray());
+ long[] timestamps = new long[e - c / 2 - s + 1];
+ for (int k = 0; k < e - c / 2 - s + 1; k++) {
+ timestamps[k] = timeWindow.get(s + k);
+ }
+ validityAggrResult.reset();
+ validityAggrResult.updateResultUsingValues(timestamps, e - c / 2 - s + 1, valueIterator);
+
+ if (violationCnt < 2) validityAggrResult.setValidityErrors(violationCnt);
+ else validityAggrResult.updateDPAndReverseDPAll();
+ resStatisticsList.add(validityAggrResult.getStatisticsInstance());
+
+ s = e - c / 2 + 1;
+ consecCnt = 0;
+ violationCnt = 0;
+ violationIndexList.clear();
+ }
+ e++;
+ }
+ if (e == ifValueViolation.length && s < e) {
+ ValueIterator valueIterator = new ValueIterator(valueWindow.subList(s, e).toArray());
+ long[] timestamps = new long[e - s + 1];
+ for (int k = 0; k < e - s; k++) {
+ timestamps[k] = timeWindow.get(s + k);
+ }
+ validityAggrResult.reset();
+ validityAggrResult.updateResultUsingValues(timestamps, e - s, valueIterator);
+ if (violationCnt < 2) validityAggrResult.setValidityErrors(violationCnt);
+ else validityAggrResult.updateDPAndReverseDPAll();
+ resStatisticsList.add(validityAggrResult.getStatisticsInstance());
+ }
+ int cnt = 0, errors = 0;
+ for (int k = 0; k < resStatisticsList.size(); k++) {
+ cnt += resStatisticsList.get(k).getCount();
+ errors += resStatisticsList.get(k).getValidityErrors();
}
+ return errors;
}
protected void aggregateOneAlignedSeries(
diff --git a/server/src/main/java/org/apache/iotdb/db/query/factory/AggregateResultFactory.java b/server/src/main/java/org/apache/iotdb/db/query/factory/AggregateResultFactory.java
index 987cdcd451..360bb82f25 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/factory/AggregateResultFactory.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/factory/AggregateResultFactory.java
@@ -67,6 +67,10 @@ public class AggregateResultFactory {
return !ascending
? new LastValueDescAggrResult(dataType)
: new LastValueAggrResult(dataType);
+ case SQLConstant.VALIDITY:
+ return new ValidityAggrResult(dataType);
+ case SQLConstant.VALIDITYALL:
+ return new ValidityAllAggrResult(dataType);
default:
throw new IllegalArgumentException("Invalid Aggregation function: " + aggrFuncName);
}
@@ -98,6 +102,10 @@ public class AggregateResultFactory {
return new SumAggrResult(dataType);
case SQLConstant.LAST_VALUE:
return new LastValueDescAggrResult(dataType);
+ case SQLConstant.VALIDITY:
+ return new ValidityAggrResult(dataType);
+ case SQLConstant.VALIDITYALL:
+ return new ValidityAllAggrResult(dataType);
default:
throw new IllegalArgumentException("Invalid Aggregation function: " + aggrFuncName);
}
@@ -130,6 +138,10 @@ public class AggregateResultFactory {
return new MinValueAggrResult(dataType);
case EXTREME:
return new ExtremeAggrResult(dataType);
+ case VALIDITY:
+ return new ValidityAggrResult(dataType);
+ case VALIDITYALL:
+ return new ValidityAllAggrResult(dataType);
default:
throw new IllegalArgumentException("Invalid Aggregation Type: " + aggregationType.name());
}
diff --git a/server/src/main/java/org/apache/iotdb/db/query/filter/executor/IPlanExecutor.java b/server/src/main/java/org/apache/iotdb/db/query/filter/executor/IPlanExecutor.java
new file mode 100644
index 0000000000..4ef688a7e3
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/query/filter/executor/IPlanExecutor.java
@@ -0,0 +1,134 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.db.query.filter.executor;
+
+import org.apache.iotdb.db.engine.storagegroup.VirtualStorageGroupProcessor.TimePartitionFilter;
+import org.apache.iotdb.db.exception.BatchProcessException;
+import org.apache.iotdb.db.exception.StorageEngineException;
+import org.apache.iotdb.db.exception.metadata.MetadataException;
+import org.apache.iotdb.db.exception.metadata.StorageGroupNotSetException;
+import org.apache.iotdb.db.exception.query.QueryProcessException;
+import org.apache.iotdb.db.metadata.path.PartialPath;
+import org.apache.iotdb.db.qp.physical.PhysicalPlan;
+import org.apache.iotdb.db.qp.physical.crud.DeletePlan;
+import org.apache.iotdb.db.qp.physical.crud.InsertMultiTabletPlan;
+import org.apache.iotdb.db.qp.physical.crud.InsertRowPlan;
+import org.apache.iotdb.db.qp.physical.crud.InsertRowsOfOneDevicePlan;
+import org.apache.iotdb.db.qp.physical.crud.InsertRowsPlan;
+import org.apache.iotdb.db.qp.physical.crud.InsertTabletPlan;
+import org.apache.iotdb.db.query.context.QueryContext;
+import org.apache.iotdb.tsfile.exception.filter.QueryFilterOptimizationException;
+import org.apache.iotdb.tsfile.read.query.dataset.QueryDataSet;
+
+import org.apache.thrift.TException;
+
+import java.io.IOException;
+import java.sql.SQLException;
+
+public interface IPlanExecutor {
+
+ /**
+ * process query plan of qp layer, construct queryDataSet.
+ *
+ * @param queryPlan QueryPlan
+ * @return QueryDataSet
+ */
+ QueryDataSet processQuery(PhysicalPlan queryPlan, QueryContext context)
+ throws IOException, StorageEngineException, QueryFilterOptimizationException,
+ QueryProcessException, MetadataException, SQLException, TException, InterruptedException;
+
+ /**
+ * Process Non-Query Physical plan, including insert/update/delete operation of
+ * data/metadata/Privilege
+ *
+ * @param plan Physical Non-Query Plan
+ */
+ boolean processNonQuery(PhysicalPlan plan)
+ throws QueryProcessException, StorageGroupNotSetException, StorageEngineException;
+
+ /**
+ * execute update command and return whether the operator is successful.
+ *
+ * @param path : update series seriesPath
+ * @param startTime start time in update command
+ * @param endTime end time in update command
+ * @param value - in type of string
+ */
+ void update(PartialPath path, long startTime, long endTime, String value)
+ throws QueryProcessException;
+
+ /**
+ * execute delete command and return whether the operator is successful.
+ *
+ * @param deletePlan physical delete plan
+ */
+ void delete(DeletePlan deletePlan) throws QueryProcessException;
+
+ /**
+ * execute delete command and return whether the operator is successful.
+ *
+ * @param path delete series seriesPath
+ * @param startTime start time in delete command
+ * @param endTime end time in delete command
+ * @param planIndex index of the deletion plan
+ * @param partitionFilter specify involving time partitions, if null, all partitions are involved
+ */
+ void delete(
+ PartialPath path,
+ long startTime,
+ long endTime,
+ long planIndex,
+ TimePartitionFilter partitionFilter)
+ throws QueryProcessException;
+
+ /**
+ * execute insert command and return whether the operator is successful.
+ *
+ * @param insertRowPlan physical insert plan
+ */
+ void insert(InsertRowPlan insertRowPlan) throws QueryProcessException;
+
+ /**
+ * execute insert command and return whether the operator is successful.
+ *
+ * @param insertRowsPlan physical insert rows plan, which contains multi insertRowPlans
+ */
+ void insert(InsertRowsPlan insertRowsPlan) throws QueryProcessException;
+
+ /**
+ * execute insert command and return whether the operator is successful.
+ *
+ * @param insertRowsOfOneDevicePlan physical insert plan
+ */
+ void insert(InsertRowsOfOneDevicePlan insertRowsOfOneDevicePlan) throws QueryProcessException;
+
+ /**
+ * execute batch insert plan
+ *
+ * @throws BatchProcessException when some of the rows failed
+ */
+ void insertTablet(InsertTabletPlan insertTabletPlan) throws QueryProcessException;
+
+ /**
+ * execute multi batch insert plan
+ *
+ * @throws QueryProcessException when some of the rows failed
+ */
+ void insertTablet(InsertMultiTabletPlan insertMultiTabletPlan) throws QueryProcessException;
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/query/filter/executor/PlanExecutor.java b/server/src/main/java/org/apache/iotdb/db/query/filter/executor/PlanExecutor.java
new file mode 100644
index 0000000000..84f8b3c5b4
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/query/filter/executor/PlanExecutor.java
@@ -0,0 +1,2176 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.db.query.filter.executor;
+
+import org.apache.iotdb.db.auth.AuthException;
+import org.apache.iotdb.db.auth.AuthorityChecker;
+import org.apache.iotdb.db.auth.authorizer.BasicAuthorizer;
+import org.apache.iotdb.db.auth.authorizer.IAuthorizer;
+import org.apache.iotdb.db.auth.entity.PathPrivilege;
+import org.apache.iotdb.db.auth.entity.Role;
+import org.apache.iotdb.db.auth.entity.User;
+import org.apache.iotdb.db.concurrent.IoTDBThreadPoolFactory;
+import org.apache.iotdb.db.concurrent.ThreadName;
+import org.apache.iotdb.db.conf.IoTDBConstant;
+import org.apache.iotdb.db.conf.IoTDBDescriptor;
+import org.apache.iotdb.db.cq.ContinuousQueryService;
+import org.apache.iotdb.db.engine.StorageEngine;
+import org.apache.iotdb.db.engine.cache.ChunkCache;
+import org.apache.iotdb.db.engine.cache.TimeSeriesMetadataCache;
+import org.apache.iotdb.db.engine.compaction.cross.inplace.manage.MergeManager;
+import org.apache.iotdb.db.engine.compaction.cross.inplace.manage.MergeManager.TaskStatus;
+import org.apache.iotdb.db.engine.flush.pool.FlushTaskPoolManager;
+import org.apache.iotdb.db.engine.storagegroup.TsFileResource;
+import org.apache.iotdb.db.engine.storagegroup.VirtualStorageGroupProcessor.TimePartitionFilter;
+import org.apache.iotdb.db.engine.trigger.service.TriggerRegistrationService;
+import org.apache.iotdb.db.exception.BatchProcessException;
+import org.apache.iotdb.db.exception.ContinuousQueryException;
+import org.apache.iotdb.db.exception.QueryIdNotExsitException;
+import org.apache.iotdb.db.exception.StorageEngineException;
+import org.apache.iotdb.db.exception.TriggerExecutionException;
+import org.apache.iotdb.db.exception.TriggerManagementException;
+import org.apache.iotdb.db.exception.UDFRegistrationException;
+import org.apache.iotdb.db.exception.WriteProcessException;
+import org.apache.iotdb.db.exception.metadata.IllegalPathException;
+import org.apache.iotdb.db.exception.metadata.MetadataException;
+import org.apache.iotdb.db.exception.metadata.PathNotExistException;
+import org.apache.iotdb.db.exception.metadata.StorageGroupAlreadySetException;
+import org.apache.iotdb.db.exception.metadata.StorageGroupNotSetException;
+import org.apache.iotdb.db.exception.query.QueryProcessException;
+import org.apache.iotdb.db.metadata.mnode.IMeasurementMNode;
+import org.apache.iotdb.db.metadata.mnode.IStorageGroupMNode;
+import org.apache.iotdb.db.metadata.path.MeasurementPath;
+import org.apache.iotdb.db.metadata.path.PartialPath;
+import org.apache.iotdb.db.metadata.utils.MetaUtils;
+import org.apache.iotdb.db.monitor.StatMonitor;
+import org.apache.iotdb.db.qp.constant.SQLConstant;
+import org.apache.iotdb.db.qp.logical.Operator.OperatorType;
+import org.apache.iotdb.db.qp.logical.sys.AuthorOperator;
+import org.apache.iotdb.db.qp.logical.sys.AuthorOperator.AuthorType;
+import org.apache.iotdb.db.qp.physical.PhysicalPlan;
+import org.apache.iotdb.db.qp.physical.crud.AggregationPlan;
+import org.apache.iotdb.db.qp.physical.crud.AlignByDevicePlan;
+import org.apache.iotdb.db.qp.physical.crud.DeletePartitionPlan;
+import org.apache.iotdb.db.qp.physical.crud.DeletePlan;
+import org.apache.iotdb.db.qp.physical.crud.FillQueryPlan;
+import org.apache.iotdb.db.qp.physical.crud.GroupByTimeFillPlan;
+import org.apache.iotdb.db.qp.physical.crud.GroupByTimePlan;
+import org.apache.iotdb.db.qp.physical.crud.InsertMultiTabletPlan;
+import org.apache.iotdb.db.qp.physical.crud.InsertPlan;
+import org.apache.iotdb.db.qp.physical.crud.InsertRowPlan;
+import org.apache.iotdb.db.qp.physical.crud.InsertRowsOfOneDevicePlan;
+import org.apache.iotdb.db.qp.physical.crud.InsertRowsPlan;
+import org.apache.iotdb.db.qp.physical.crud.InsertTabletPlan;
+import org.apache.iotdb.db.qp.physical.crud.LastQueryPlan;
+import org.apache.iotdb.db.qp.physical.crud.QueryIndexPlan;
+import org.apache.iotdb.db.qp.physical.crud.QueryPlan;
+import org.apache.iotdb.db.qp.physical.crud.RawDataQueryPlan;
+import org.apache.iotdb.db.qp.physical.crud.UDAFPlan;
+import org.apache.iotdb.db.qp.physical.crud.UDTFPlan;
+import org.apache.iotdb.db.qp.physical.sys.ActivateTemplatePlan;
+import org.apache.iotdb.db.qp.physical.sys.AlterTimeSeriesPlan;
+import org.apache.iotdb.db.qp.physical.sys.AppendTemplatePlan;
+import org.apache.iotdb.db.qp.physical.sys.AuthorPlan;
+import org.apache.iotdb.db.qp.physical.sys.CountPlan;
+import org.apache.iotdb.db.qp.physical.sys.CreateAlignedTimeSeriesPlan;
+import org.apache.iotdb.db.qp.physical.sys.CreateContinuousQueryPlan;
+import org.apache.iotdb.db.qp.physical.sys.CreateFunctionPlan;
+import org.apache.iotdb.db.qp.physical.sys.CreateMultiTimeSeriesPlan;
+import org.apache.iotdb.db.qp.physical.sys.CreateTemplatePlan;
+import org.apache.iotdb.db.qp.physical.sys.CreateTimeSeriesPlan;
+import org.apache.iotdb.db.qp.physical.sys.CreateTriggerPlan;
+import org.apache.iotdb.db.qp.physical.sys.DataAuthPlan;
+import org.apache.iotdb.db.qp.physical.sys.DeleteStorageGroupPlan;
+import org.apache.iotdb.db.qp.physical.sys.DeleteTimeSeriesPlan;
+import org.apache.iotdb.db.qp.physical.sys.DropContinuousQueryPlan;
+import org.apache.iotdb.db.qp.physical.sys.DropFunctionPlan;
+import org.apache.iotdb.db.qp.physical.sys.DropTriggerPlan;
+import org.apache.iotdb.db.qp.physical.sys.FlushPlan;
+import org.apache.iotdb.db.qp.physical.sys.KillQueryPlan;
+import org.apache.iotdb.db.qp.physical.sys.LoadConfigurationPlan;
+import org.apache.iotdb.db.qp.physical.sys.MergePlan;
+import org.apache.iotdb.db.qp.physical.sys.OperateFilePlan;
+import org.apache.iotdb.db.qp.physical.sys.PruneTemplatePlan;
+import org.apache.iotdb.db.qp.physical.sys.SetStorageGroupPlan;
+import org.apache.iotdb.db.qp.physical.sys.SetSystemModePlan;
+import org.apache.iotdb.db.qp.physical.sys.SetTTLPlan;
+import org.apache.iotdb.db.qp.physical.sys.SetTemplatePlan;
+import org.apache.iotdb.db.qp.physical.sys.SettlePlan;
+import org.apache.iotdb.db.qp.physical.sys.ShowChildNodesPlan;
+import org.apache.iotdb.db.qp.physical.sys.ShowChildPathsPlan;
+import org.apache.iotdb.db.qp.physical.sys.ShowDevicesPlan;
+import org.apache.iotdb.db.qp.physical.sys.ShowFunctionsPlan;
+import org.apache.iotdb.db.qp.physical.sys.ShowLockInfoPlan;
+import org.apache.iotdb.db.qp.physical.sys.ShowPlan;
+import org.apache.iotdb.db.qp.physical.sys.ShowStorageGroupPlan;
+import org.apache.iotdb.db.qp.physical.sys.ShowTTLPlan;
+import org.apache.iotdb.db.qp.physical.sys.ShowTimeSeriesPlan;
+import org.apache.iotdb.db.qp.physical.sys.StartTriggerPlan;
+import org.apache.iotdb.db.qp.physical.sys.StopTriggerPlan;
+import org.apache.iotdb.db.qp.physical.sys.UnsetTemplatePlan;
+import org.apache.iotdb.db.query.context.QueryContext;
+import org.apache.iotdb.db.query.control.QueryTimeManager;
+import org.apache.iotdb.db.query.dataset.AlignByDeviceDataSet;
+import org.apache.iotdb.db.query.dataset.ListDataSet;
+import org.apache.iotdb.db.query.dataset.ShowContinuousQueriesResult;
+import org.apache.iotdb.db.query.dataset.ShowDevicesDataSet;
+import org.apache.iotdb.db.query.dataset.ShowTimeseriesDataSet;
+import org.apache.iotdb.db.query.dataset.SingleDataSet;
+import org.apache.iotdb.db.query.executor.IQueryRouter;
+import org.apache.iotdb.db.query.executor.QueryRouter;
+import org.apache.iotdb.db.query.udf.service.UDFRegistrationInformation;
+import org.apache.iotdb.db.query.udf.service.UDFRegistrationService;
+import org.apache.iotdb.db.service.IoTDB;
+import org.apache.iotdb.db.service.SettleService;
+import org.apache.iotdb.db.tools.TsFileRewriteTool;
+import org.apache.iotdb.db.utils.AuthUtils;
+import org.apache.iotdb.db.utils.FileLoaderUtils;
+import org.apache.iotdb.db.utils.TypeInferenceUtils;
+import org.apache.iotdb.db.utils.UpgradeUtils;
+import org.apache.iotdb.rpc.RpcUtils;
+import org.apache.iotdb.rpc.TSStatusCode;
+import org.apache.iotdb.service.rpc.thrift.TSStatus;
+import org.apache.iotdb.tsfile.common.constant.TsFileConstant;
+import org.apache.iotdb.tsfile.exception.filter.QueryFilterOptimizationException;
+import org.apache.iotdb.tsfile.exception.write.UnSupportedDataTypeException;
+import org.apache.iotdb.tsfile.file.metadata.ChunkGroupMetadata;
+import org.apache.iotdb.tsfile.file.metadata.ChunkMetadata;
+import org.apache.iotdb.tsfile.file.metadata.TimeseriesMetadata;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+import org.apache.iotdb.tsfile.read.TsFileSequenceReader;
+import org.apache.iotdb.tsfile.read.common.Field;
+import org.apache.iotdb.tsfile.read.common.Path;
+import org.apache.iotdb.tsfile.read.common.RowRecord;
+import org.apache.iotdb.tsfile.read.query.dataset.EmptyDataSet;
+import org.apache.iotdb.tsfile.read.query.dataset.QueryDataSet;
+import org.apache.iotdb.tsfile.utils.Binary;
+import org.apache.iotdb.tsfile.utils.Pair;
+import org.apache.iotdb.tsfile.write.schema.IMeasurementSchema;
+import org.apache.iotdb.tsfile.write.writer.RestorableTsFileIOWriter;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.io.IOException;
+import java.lang.reflect.InvocationTargetException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Set;
+import java.util.concurrent.Future;
+import java.util.concurrent.ThreadPoolExecutor;
+
+import static org.apache.iotdb.db.conf.IoTDBConstant.COLUMN_CANCELLED;
+import static org.apache.iotdb.db.conf.IoTDBConstant.COLUMN_CHILD_NODES;
+import static org.apache.iotdb.db.conf.IoTDBConstant.COLUMN_CHILD_PATHS;
+import static org.apache.iotdb.db.conf.IoTDBConstant.COLUMN_COLUMN;
+import static org.apache.iotdb.db.conf.IoTDBConstant.COLUMN_CONTINUOUS_QUERY_EVERY_INTERVAL;
+import static org.apache.iotdb.db.conf.IoTDBConstant.COLUMN_CONTINUOUS_QUERY_FOR_INTERVAL;
+import static org.apache.iotdb.db.conf.IoTDBConstant.COLUMN_CONTINUOUS_QUERY_NAME;
+import static org.apache.iotdb.db.conf.IoTDBConstant.COLUMN_CONTINUOUS_QUERY_QUERY_SQL;
+import static org.apache.iotdb.db.conf.IoTDBConstant.COLUMN_CONTINUOUS_QUERY_TARGET_PATH;
+import static org.apache.iotdb.db.conf.IoTDBConstant.COLUMN_COUNT;
+import static org.apache.iotdb.db.conf.IoTDBConstant.COLUMN_CREATED_TIME;
+import static org.apache.iotdb.db.conf.IoTDBConstant.COLUMN_DEVICES;
+import static org.apache.iotdb.db.conf.IoTDBConstant.COLUMN_DONE;
+import static org.apache.iotdb.db.conf.IoTDBConstant.COLUMN_FUNCTION_CLASS;
+import static org.apache.iotdb.db.conf.IoTDBConstant.COLUMN_FUNCTION_NAME;
+import static org.apache.iotdb.db.conf.IoTDBConstant.COLUMN_FUNCTION_TYPE;
+import static org.apache.iotdb.db.conf.IoTDBConstant.COLUMN_ITEM;
+import static org.apache.iotdb.db.conf.IoTDBConstant.COLUMN_LOCK_INFO;
+import static org.apache.iotdb.db.conf.IoTDBConstant.COLUMN_PRIVILEGE;
+import static org.apache.iotdb.db.conf.IoTDBConstant.COLUMN_PROGRESS;
+import static org.apache.iotdb.db.conf.IoTDBConstant.COLUMN_ROLE;
+import static org.apache.iotdb.db.conf.IoTDBConstant.COLUMN_STORAGE_GROUP;
+import static org.apache.iotdb.db.conf.IoTDBConstant.COLUMN_TASK_NAME;
+import static org.apache.iotdb.db.conf.IoTDBConstant.COLUMN_TTL;
+import static org.apache.iotdb.db.conf.IoTDBConstant.COLUMN_USER;
+import static org.apache.iotdb.db.conf.IoTDBConstant.COLUMN_VALUE;
+import static org.apache.iotdb.db.conf.IoTDBConstant.FILE_NAME_SEPARATOR;
+import static org.apache.iotdb.db.conf.IoTDBConstant.FUNCTION_TYPE_BUILTIN_UDAF;
+import static org.apache.iotdb.db.conf.IoTDBConstant.FUNCTION_TYPE_BUILTIN_UDTF;
+import static org.apache.iotdb.db.conf.IoTDBConstant.FUNCTION_TYPE_EXTERNAL_UDAF;
+import static org.apache.iotdb.db.conf.IoTDBConstant.FUNCTION_TYPE_EXTERNAL_UDTF;
+import static org.apache.iotdb.db.conf.IoTDBConstant.FUNCTION_TYPE_NATIVE;
+import static org.apache.iotdb.db.conf.IoTDBConstant.ONE_LEVEL_PATH_WILDCARD;
+import static org.apache.iotdb.db.conf.IoTDBConstant.QUERY_ID;
+import static org.apache.iotdb.db.conf.IoTDBConstant.STATEMENT;
+import static org.apache.iotdb.rpc.TSStatusCode.INTERNAL_SERVER_ERROR;
+import static org.apache.iotdb.tsfile.common.constant.TsFileConstant.TSFILE_SUFFIX;
+
+public class PlanExecutor implements IPlanExecutor {
+
+ private static final Logger logger = LoggerFactory.getLogger(PlanExecutor.class);
+ private static final Logger AUDIT_LOGGER =
+ LoggerFactory.getLogger(IoTDBConstant.AUDIT_LOGGER_NAME);
+ // for data query
+ protected IQueryRouter queryRouter;
+ // for administration
+ private final IAuthorizer authorizer;
+
+ private ThreadPoolExecutor insertionPool;
+
+ private static final String INSERT_MEASUREMENTS_FAILED_MESSAGE = "failed to insert measurements ";
+
+ public PlanExecutor() throws QueryProcessException {
+ queryRouter = new QueryRouter();
+ try {
+ authorizer = BasicAuthorizer.getInstance();
+ } catch (AuthException e) {
+ throw new QueryProcessException(e.getMessage());
+ }
+ }
+
+ @Override
+ public QueryDataSet processQuery(PhysicalPlan queryPlan, QueryContext context)
+ throws IOException, StorageEngineException, QueryFilterOptimizationException,
+ QueryProcessException, MetadataException, InterruptedException {
+ if (queryPlan instanceof QueryPlan) {
+ return processDataQuery((QueryPlan) queryPlan, context);
+ } else if (queryPlan instanceof AuthorPlan) {
+ return processAuthorQuery((AuthorPlan) queryPlan);
+ } else if (queryPlan instanceof ShowPlan) {
+ return processShowQuery((ShowPlan) queryPlan, context);
+ } else {
+ throw new QueryProcessException(String.format("Unrecognized query plan %s", queryPlan));
+ }
+ }
+
+ @Override
+ public boolean processNonQuery(PhysicalPlan plan)
+ throws QueryProcessException, StorageGroupNotSetException, StorageEngineException {
+ switch (plan.getOperatorType()) {
+ case DELETE:
+ delete((DeletePlan) plan);
+ return true;
+ case INSERT:
+ insert((InsertRowPlan) plan);
+ return true;
+ case BATCH_INSERT_ONE_DEVICE:
+ insert((InsertRowsOfOneDevicePlan) plan);
+ return true;
+ case BATCH_INSERT_ROWS:
+ insert((InsertRowsPlan) plan);
+ return true;
+ case BATCH_INSERT:
+ insertTablet((InsertTabletPlan) plan);
+ return true;
+ case MULTI_BATCH_INSERT:
+ insertTablet((InsertMultiTabletPlan) plan);
+ return true;
+ case CREATE_ROLE:
+ case DELETE_ROLE:
+ case CREATE_USER:
+ case REVOKE_USER_ROLE:
+ case REVOKE_ROLE_PRIVILEGE:
+ case REVOKE_USER_PRIVILEGE:
+ case GRANT_ROLE_PRIVILEGE:
+ case GRANT_USER_PRIVILEGE:
+ case GRANT_USER_ROLE:
+ case MODIFY_PASSWORD:
+ case DELETE_USER:
+ AuthorPlan author = (AuthorPlan) plan;
+ return operateAuthor(author);
+ case GRANT_WATERMARK_EMBEDDING:
+ return operateWatermarkEmbedding(((DataAuthPlan) plan).getUsers(), true);
+ case REVOKE_WATERMARK_EMBEDDING:
+ return operateWatermarkEmbedding(((DataAuthPlan) plan).getUsers(), false);
+ case DELETE_TIMESERIES:
+ return deleteTimeSeries((DeleteTimeSeriesPlan) plan);
+ case CREATE_TIMESERIES:
+ return createTimeSeries((CreateTimeSeriesPlan) plan);
+ case CREATE_ALIGNED_TIMESERIES:
+ return createAlignedTimeSeries((CreateAlignedTimeSeriesPlan) plan);
+ case CREATE_MULTI_TIMESERIES:
+ return createMultiTimeSeries((CreateMultiTimeSeriesPlan) plan);
+ case ALTER_TIMESERIES:
+ return alterTimeSeries((AlterTimeSeriesPlan) plan);
+ case SET_STORAGE_GROUP:
+ return setStorageGroup((SetStorageGroupPlan) plan);
+ case DELETE_STORAGE_GROUP:
+ return deleteStorageGroups((DeleteStorageGroupPlan) plan);
+ case TTL:
+ operateTTL((SetTTLPlan) plan);
+ return true;
+ case LOAD_CONFIGURATION:
+ loadConfiguration((LoadConfigurationPlan) plan);
+ return true;
+ case LOAD_FILES:
+ operateLoadFiles((OperateFilePlan) plan);
+ return true;
+ case REMOVE_FILE:
+ operateRemoveFile((OperateFilePlan) plan);
+ return true;
+ case UNLOAD_FILE:
+ operateUnloadFile((OperateFilePlan) plan);
+ return true;
+ case FLUSH:
+ operateFlush((FlushPlan) plan);
+ return true;
+ case MERGE:
+ case FULL_MERGE:
+ operateMerge((MergePlan) plan);
+ return true;
+ case SET_SYSTEM_MODE:
+ operateSetSystemMode((SetSystemModePlan) plan);
+ return true;
+ case CLEAR_CACHE:
+ operateClearCache();
+ return true;
+ case DELETE_PARTITION:
+ DeletePartitionPlan p = (DeletePartitionPlan) plan;
+ TimePartitionFilter filter =
+ (storageGroupName, partitionId) ->
+ storageGroupName.equals(
+ ((DeletePartitionPlan) plan).getStorageGroupName().getFullPath())
+ && p.getPartitionId().contains(partitionId);
+ StorageEngine.getInstance()
+ .removePartitions(((DeletePartitionPlan) plan).getStorageGroupName(), filter);
+ return true;
+ case CREATE_SCHEMA_SNAPSHOT:
+ operateCreateSnapshot();
+ return true;
+ case CREATE_FUNCTION:
+ return operateCreateFunction((CreateFunctionPlan) plan);
+ case DROP_FUNCTION:
+ return operateDropFunction((DropFunctionPlan) plan);
+ case CREATE_TRIGGER:
+ return operateCreateTrigger((CreateTriggerPlan) plan);
+ case DROP_TRIGGER:
+ return operateDropTrigger((DropTriggerPlan) plan);
+ case START_TRIGGER:
+ return operateStartTrigger((StartTriggerPlan) plan);
+ case STOP_TRIGGER:
+ return operateStopTrigger((StopTriggerPlan) plan);
+ case KILL:
+ try {
+ operateKillQuery((KillQueryPlan) plan);
+ } catch (QueryIdNotExsitException e) {
+ throw new QueryProcessException(e.getMessage());
+ }
+ return true;
+ case CREATE_TEMPLATE:
+ return createTemplate((CreateTemplatePlan) plan);
+ case APPEND_TEMPLATE:
+ return appendTemplate((AppendTemplatePlan) plan);
+ case PRUNE_TEMPLATE:
+ return pruneTemplate((PruneTemplatePlan) plan);
+ case SET_TEMPLATE:
+ return setTemplate((SetTemplatePlan) plan);
+ case ACTIVATE_TEMPLATE:
+ return activateTemplate((ActivateTemplatePlan) plan);
+ case UNSET_TEMPLATE:
+ return unsetTemplate((UnsetTemplatePlan) plan);
+ case CREATE_CONTINUOUS_QUERY:
+ return operateCreateContinuousQuery((CreateContinuousQueryPlan) plan);
+ case DROP_CONTINUOUS_QUERY:
+ return operateDropContinuousQuery((DropContinuousQueryPlan) plan);
+ case SETTLE:
+ settle((SettlePlan) plan);
+ return true;
+ default:
+ throw new UnsupportedOperationException(
+ String.format("operation %s is not supported", plan.getOperatorType()));
+ }
+ }
+
+ private boolean createTemplate(CreateTemplatePlan createTemplatePlan)
+ throws QueryProcessException {
+ try {
+ IoTDB.metaManager.createSchemaTemplate(createTemplatePlan);
+ } catch (MetadataException e) {
+ throw new QueryProcessException(e);
+ }
+ return true;
+ }
+
+ private boolean appendTemplate(AppendTemplatePlan plan) throws QueryProcessException {
+ try {
+ IoTDB.metaManager.appendSchemaTemplate(plan);
+ } catch (MetadataException e) {
+ throw new QueryProcessException(e);
+ }
+ return true;
+ }
+
+ private boolean pruneTemplate(PruneTemplatePlan plan) throws QueryProcessException {
+ try {
+ IoTDB.metaManager.pruneSchemaTemplate(plan);
+ } catch (MetadataException e) {
+ throw new QueryProcessException(e);
+ }
+ return true;
+ }
+
+ private boolean setTemplate(SetTemplatePlan setTemplatePlan) throws QueryProcessException {
+ try {
+ IoTDB.metaManager.setSchemaTemplate(setTemplatePlan);
+ } catch (MetadataException e) {
+ throw new QueryProcessException(e);
+ }
+ return true;
+ }
+
+ private boolean activateTemplate(ActivateTemplatePlan activateTemplatePlan)
+ throws QueryProcessException {
+ try {
+ IoTDB.metaManager.setUsingSchemaTemplate(activateTemplatePlan);
+ } catch (MetadataException e) {
+ throw new QueryProcessException(e);
+ }
+ return true;
+ }
+
+ private boolean unsetTemplate(UnsetTemplatePlan unsetTemplatePlan) throws QueryProcessException {
+ try {
+ IoTDB.metaManager.unsetSchemaTemplate(unsetTemplatePlan);
+ } catch (MetadataException e) {
+ throw new QueryProcessException(e);
+ }
+ return true;
+ }
+
+ private boolean operateCreateFunction(CreateFunctionPlan plan) throws UDFRegistrationException {
+ UDFRegistrationService.getInstance().register(plan.getUdfName(), plan.getClassName(), true);
+ return true;
+ }
+
+ private boolean operateDropFunction(DropFunctionPlan plan) throws UDFRegistrationException {
+ UDFRegistrationService.getInstance().deregister(plan.getUdfName());
+ return true;
+ }
+
+ private boolean operateCreateTrigger(CreateTriggerPlan plan)
+ throws TriggerManagementException, TriggerExecutionException {
+ TriggerRegistrationService.getInstance().register(plan);
+ return true;
+ }
+
+ private boolean operateDropTrigger(DropTriggerPlan plan) throws TriggerManagementException {
+ TriggerRegistrationService.getInstance().deregister(plan);
+ return true;
+ }
+
+ private boolean operateStartTrigger(StartTriggerPlan plan)
+ throws TriggerManagementException, TriggerExecutionException {
+ TriggerRegistrationService.getInstance().activate(plan);
+ return true;
+ }
+
+ private boolean operateStopTrigger(StopTriggerPlan plan) throws TriggerManagementException {
+ TriggerRegistrationService.getInstance().inactivate(plan);
+ return true;
+ }
+
+ private void operateMerge(MergePlan plan) throws StorageEngineException {
+ if (plan.getOperatorType() == OperatorType.FULL_MERGE) {
+ StorageEngine.getInstance().mergeAll(true);
+ } else {
+ StorageEngine.getInstance().mergeAll(false);
+ }
+ }
+
+ private void operateClearCache() {
+ ChunkCache.getInstance().clear();
+ TimeSeriesMetadataCache.getInstance().clear();
+ }
+
+ private void operateCreateSnapshot() {
+ IoTDB.metaManager.createMTreeSnapshot();
+ }
+
+ private void operateKillQuery(KillQueryPlan killQueryPlan) throws QueryIdNotExsitException {
+ QueryTimeManager queryTimeManager = QueryTimeManager.getInstance();
+ long killQueryId = killQueryPlan.getQueryId();
+ if (killQueryId != -1) {
+ if (queryTimeManager.getQueryContextMap().get(killQueryId) != null) {
+ queryTimeManager.killQuery(killQueryId);
+ } else {
+ throw new QueryIdNotExsitException(
+ String.format(
+ "Query Id %d is not exist, please check it.", killQueryPlan.getQueryId()));
+ }
+ } else {
+ // if queryId is not specified, kill all running queries
+ if (!queryTimeManager.getQueryContextMap().isEmpty()) {
+ synchronized (queryTimeManager.getQueryContextMap()) {
+ List<Long> queryIdList = new ArrayList<>(queryTimeManager.getQueryContextMap().keySet());
+ for (Long queryId : queryIdList) {
+ queryTimeManager.killQuery(queryId);
+ }
+ }
+ }
+ }
+ }
+
+ private void operateSetSystemMode(SetSystemModePlan plan) {
+ IoTDBDescriptor.getInstance().getConfig().setReadOnly(plan.isReadOnly());
+ }
+
+ private void operateFlush(FlushPlan plan) throws StorageGroupNotSetException {
+ if (plan.getPaths().isEmpty()) {
+ StorageEngine.getInstance().syncCloseAllProcessor();
+ } else {
+ flushSpecifiedStorageGroups(plan);
+ }
+
+ if (!plan.getPaths().isEmpty()) {
+ List<PartialPath> noExistSg = checkStorageGroupExist(plan.getPaths());
+ if (!noExistSg.isEmpty()) {
+ StringBuilder sb = new StringBuilder();
+ noExistSg.forEach(storageGroup -> sb.append(storageGroup.getFullPath()).append(","));
+ throw new StorageGroupNotSetException(sb.subSequence(0, sb.length() - 1).toString(), true);
+ }
+ }
+ }
+
+ private boolean operateCreateContinuousQuery(CreateContinuousQueryPlan plan)
+ throws ContinuousQueryException {
+ return ContinuousQueryService.getInstance().register(plan, true);
+ }
+
+ private boolean operateDropContinuousQuery(DropContinuousQueryPlan plan)
+ throws ContinuousQueryException {
+ return ContinuousQueryService.getInstance().deregister(plan);
+ }
+
+ public static void flushSpecifiedStorageGroups(FlushPlan plan)
+ throws StorageGroupNotSetException {
+ Map<PartialPath, List<Pair<Long, Boolean>>> storageGroupMap =
+ plan.getStorageGroupPartitionIds();
+ for (Entry<PartialPath, List<Pair<Long, Boolean>>> entry : storageGroupMap.entrySet()) {
+ PartialPath storageGroupName = entry.getKey();
+ // normal flush
+ if (entry.getValue() == null) {
+ if (plan.isSeq() == null) {
+ StorageEngine.getInstance()
+ .closeStorageGroupProcessor(storageGroupName, true, plan.isSync());
+ StorageEngine.getInstance()
+ .closeStorageGroupProcessor(storageGroupName, false, plan.isSync());
+ } else {
+ StorageEngine.getInstance()
+ .closeStorageGroupProcessor(storageGroupName, plan.isSeq(), plan.isSync());
+ }
+ }
+ // partition specified flush, for snapshot flush plan
+ else {
+ List<Pair<Long, Boolean>> partitionIdSequencePairs = entry.getValue();
+ for (Pair<Long, Boolean> pair : partitionIdSequencePairs) {
+ StorageEngine.getInstance()
+ .closeStorageGroupProcessor(storageGroupName, pair.left, pair.right, true);
+ }
+ }
+ }
+ }
+
+ protected QueryDataSet processDataQuery(QueryPlan queryPlan, QueryContext context)
+ throws StorageEngineException, QueryFilterOptimizationException, QueryProcessException,
+ IOException, InterruptedException {
+ QueryDataSet queryDataSet;
+ if (queryPlan instanceof AlignByDevicePlan) {
+ queryDataSet = getAlignByDeviceDataSet((AlignByDevicePlan) queryPlan, context, queryRouter);
+ } else {
+ if (queryPlan.getPaths() == null || queryPlan.getPaths().isEmpty()) {
+ // no time series are selected, return EmptyDataSet
+ return new EmptyDataSet();
+ } else if (queryPlan instanceof UDAFPlan) {
+ UDAFPlan udafPlan = (UDAFPlan) queryPlan;
+ queryDataSet = queryRouter.udafQuery(udafPlan, context);
+ } else if (queryPlan instanceof UDTFPlan) {
+ UDTFPlan udtfPlan = (UDTFPlan) queryPlan;
+ queryDataSet = queryRouter.udtfQuery(udtfPlan, context);
+ } else if (queryPlan instanceof GroupByTimeFillPlan) {
+ GroupByTimeFillPlan groupByFillPlan = (GroupByTimeFillPlan) queryPlan;
+ queryDataSet = queryRouter.groupByFill(groupByFillPlan, context);
+ } else if (queryPlan instanceof GroupByTimePlan) {
+ GroupByTimePlan groupByTimePlan = (GroupByTimePlan) queryPlan;
+ queryDataSet = queryRouter.groupBy(groupByTimePlan, context);
+ } else if (queryPlan instanceof QueryIndexPlan) {
+ throw new QueryProcessException("Query index hasn't been supported yet");
+ } else if (queryPlan instanceof AggregationPlan) {
+ AggregationPlan aggregationPlan = (AggregationPlan) queryPlan;
+ queryDataSet = queryRouter.aggregate(aggregationPlan, context);
+ } else if (queryPlan instanceof FillQueryPlan) {
+ FillQueryPlan fillQueryPlan = (FillQueryPlan) queryPlan;
+ queryDataSet = queryRouter.fill(fillQueryPlan, context);
+ } else if (queryPlan instanceof LastQueryPlan) {
+ queryDataSet = queryRouter.lastQuery((LastQueryPlan) queryPlan, context);
+ } else {
+ queryDataSet = queryRouter.rawDataQuery((RawDataQueryPlan) queryPlan, context);
+ }
+ }
+ queryDataSet.setRowLimit(queryPlan.getRowLimit());
+ queryDataSet.setRowOffset(queryPlan.getRowOffset());
+ queryDataSet.setWithoutAllNull(queryPlan.isWithoutAllNull());
+ queryDataSet.setWithoutAnyNull(queryPlan.isWithoutAnyNull());
+ return queryDataSet;
+ }
+
+ protected AlignByDeviceDataSet getAlignByDeviceDataSet(
+ AlignByDevicePlan plan, QueryContext context, IQueryRouter router) {
+ return new AlignByDeviceDataSet(plan, context, router);
+ }
+
+ protected QueryDataSet processShowQuery(ShowPlan showPlan, QueryContext context)
+ throws QueryProcessException, MetadataException {
+ switch (showPlan.getShowContentType()) {
+ case TTL:
+ return processShowTTLQuery((ShowTTLPlan) showPlan);
+ case FLUSH_TASK_INFO:
+ return processShowFlushTaskInfo();
+ case VERSION:
+ return processShowVersion();
+ case TIMESERIES:
+ return processShowTimeseries((ShowTimeSeriesPlan) showPlan, context);
+ case STORAGE_GROUP:
+ return processShowStorageGroup((ShowStorageGroupPlan) showPlan);
+ case LOCK_INFO:
+ return processShowLockInfo((ShowLockInfoPlan) showPlan);
+ case DEVICES:
+ return processShowDevices((ShowDevicesPlan) showPlan);
+ case CHILD_PATH:
+ return processShowChildPaths((ShowChildPathsPlan) showPlan);
+ case CHILD_NODE:
+ return processShowChildNodes((ShowChildNodesPlan) showPlan);
+ case COUNT_TIMESERIES:
+ return processCountTimeSeries((CountPlan) showPlan);
+ case COUNT_NODE_TIMESERIES:
+ return processCountNodeTimeSeries((CountPlan) showPlan);
+ case COUNT_DEVICES:
+ return processCountDevices((CountPlan) showPlan);
+ case COUNT_STORAGE_GROUP:
+ return processCountStorageGroup((CountPlan) showPlan);
+ case COUNT_NODES:
+ return processCountNodes((CountPlan) showPlan);
+ case MERGE_STATUS:
+ return processShowMergeStatus();
+ case QUERY_PROCESSLIST:
+ return processShowQueryProcesslist();
+ case FUNCTIONS:
+ return processShowFunctions((ShowFunctionsPlan) showPlan);
+ case TRIGGERS:
+ return processShowTriggers();
+ case CONTINUOUS_QUERY:
+ return processShowContinuousQueries();
+ default:
+ throw new QueryProcessException(String.format("Unrecognized show plan %s", showPlan));
+ }
+ }
+
+ private QueryDataSet processCountNodes(CountPlan countPlan) throws MetadataException {
+ int num = getNodesNumInGivenLevel(countPlan.getPath(), countPlan.getLevel());
+ return createSingleDataSet(COLUMN_COUNT, TSDataType.INT32, num);
+ }
+
+ private QueryDataSet processCountNodeTimeSeries(CountPlan countPlan) throws MetadataException {
+ // get the nodes that need to group by first
+ List<PartialPath> nodes = getNodesList(countPlan.getPath(), countPlan.getLevel());
+ ListDataSet listDataSet =
+ new ListDataSet(
+ Arrays.asList(
+ new PartialPath(COLUMN_COLUMN, false), new PartialPath(COLUMN_COUNT, false)),
+ Arrays.asList(TSDataType.TEXT, TSDataType.INT32));
+ for (PartialPath columnPath : nodes) {
+ RowRecord record = new RowRecord(0);
+ Field field = new Field(TSDataType.TEXT);
+ field.setBinaryV(new Binary(columnPath.getFullPath()));
+ Field field1 = new Field(TSDataType.INT32);
+ // get the count of every group
+ field1.setIntV(getPathsNum(columnPath));
+ record.addField(field);
+ record.addField(field1);
+ listDataSet.putRecord(record);
+ }
+ return listDataSet;
+ }
+
+ private QueryDataSet processCountDevices(CountPlan countPlan) throws MetadataException {
+ int num = getDevicesNum(countPlan.getPath());
+ return createSingleDataSet(COLUMN_DEVICES, TSDataType.INT32, num);
+ }
+
+ private QueryDataSet processCountStorageGroup(CountPlan countPlan) throws MetadataException {
+ int num = getStorageGroupNum(countPlan.getPath());
+ return createSingleDataSet(COLUMN_STORAGE_GROUP, TSDataType.INT32, num);
+ }
+
+ private QueryDataSet createSingleDataSet(String columnName, TSDataType columnType, Object val) {
+ SingleDataSet singleDataSet =
+ new SingleDataSet(
+ Collections.singletonList(new PartialPath(columnName, false)),
+ Collections.singletonList(columnType));
+ Field field = new Field(columnType);
+ switch (columnType) {
+ case TEXT:
+ field.setBinaryV(((Binary) val));
+ break;
+ case FLOAT:
+ field.setFloatV(((float) val));
+ break;
+ case INT32:
+ field.setIntV(((int) val));
+ break;
+ case INT64:
+ field.setLongV(((long) val));
+ break;
+ case DOUBLE:
+ field.setDoubleV(((double) val));
+ break;
+ case BOOLEAN:
+ field.setBoolV(((boolean) val));
+ break;
+ default:
+ throw new UnSupportedDataTypeException("Unsupported data type" + columnType);
+ }
+ RowRecord record = new RowRecord(0);
+ record.addField(field);
+ singleDataSet.setRecord(record);
+ return singleDataSet;
+ }
+
+ protected int getDevicesNum(PartialPath path) throws MetadataException {
+ return IoTDB.metaManager.getDevicesNum(path);
+ }
+
+ private int getStorageGroupNum(PartialPath path) throws MetadataException {
+ return IoTDB.metaManager.getStorageGroupNum(path);
+ }
+
+ protected int getPathsNum(PartialPath path) throws MetadataException {
+ return IoTDB.metaManager.getAllTimeseriesCount(path);
+ }
+
+ protected int getNodesNumInGivenLevel(PartialPath path, int level) throws MetadataException {
+ return IoTDB.metaManager.getNodesCountInGivenLevel(path, level);
+ }
+
+ protected List<MeasurementPath> getPathsName(PartialPath path) throws MetadataException {
+ return IoTDB.metaManager.getMeasurementPaths(path);
+ }
+
+ protected List<PartialPath> getNodesList(PartialPath schemaPattern, int level)
+ throws MetadataException {
+ return IoTDB.metaManager.getNodesListInGivenLevel(schemaPattern, level);
+ }
+
+ private QueryDataSet processCountTimeSeries(CountPlan countPlan) throws MetadataException {
+ int num = getPathsNum(countPlan.getPath());
+ return createSingleDataSet(COLUMN_COUNT, TSDataType.INT32, num);
+ }
+
+ private QueryDataSet processShowDevices(ShowDevicesPlan showDevicesPlan)
+ throws MetadataException {
+ return new ShowDevicesDataSet(showDevicesPlan);
+ }
+
+ private QueryDataSet processShowChildPaths(ShowChildPathsPlan showChildPathsPlan)
+ throws MetadataException {
+ Set<String> childPathsList = getPathNextChildren(showChildPathsPlan.getPath());
+ ListDataSet listDataSet =
+ new ListDataSet(
+ Collections.singletonList(new PartialPath(COLUMN_CHILD_PATHS, false)),
+ Collections.singletonList(TSDataType.TEXT));
+ for (String s : childPathsList) {
+ RowRecord record = new RowRecord(0);
+ Field field = new Field(TSDataType.TEXT);
+ field.setBinaryV(new Binary(s));
+ record.addField(field);
+ listDataSet.putRecord(record);
+ }
+ return listDataSet;
+ }
+
+ protected Set<String> getPathNextChildren(PartialPath path) throws MetadataException {
+ return IoTDB.metaManager.getChildNodePathInNextLevel(path);
+ }
+
+ private QueryDataSet processShowChildNodes(ShowChildNodesPlan showChildNodesPlan)
+ throws MetadataException {
+ // getNodeNextChildren
+ Set<String> childNodesList = getNodeNextChildren(showChildNodesPlan.getPath());
+ ListDataSet listDataSet =
+ new ListDataSet(
+ Collections.singletonList(new PartialPath(COLUMN_CHILD_NODES, false)),
+ Collections.singletonList(TSDataType.TEXT));
+ for (String s : childNodesList) {
+ RowRecord record = new RowRecord(0);
+ Field field = new Field(TSDataType.TEXT);
+ field.setBinaryV(new Binary(s));
+ record.addField(field);
+ listDataSet.putRecord(record);
+ }
+ return listDataSet;
+ }
+
+ protected Set<String> getNodeNextChildren(PartialPath path) throws MetadataException {
+ return IoTDB.metaManager.getChildNodeNameInNextLevel(path);
+ }
+
+ protected List<PartialPath> getStorageGroupNames(PartialPath path) throws MetadataException {
+ return IoTDB.metaManager.getMatchedStorageGroups(path);
+ }
+
+ private QueryDataSet processShowStorageGroup(ShowStorageGroupPlan showStorageGroupPlan)
+ throws MetadataException {
+ ListDataSet listDataSet =
+ new ListDataSet(
+ Collections.singletonList(new PartialPath(COLUMN_STORAGE_GROUP, false)),
+ Collections.singletonList(TSDataType.TEXT));
+ List<PartialPath> storageGroupList = getStorageGroupNames(showStorageGroupPlan.getPath());
+ addToDataSet(storageGroupList, listDataSet);
+ return listDataSet;
+ }
+
+ private void addToDataSet(Collection<PartialPath> paths, ListDataSet dataSet) {
+ for (PartialPath s : paths) {
+ RowRecord record = new RowRecord(0);
+ Field field = new Field(TSDataType.TEXT);
+ field.setBinaryV(new Binary(s.getFullPath()));
+ record.addField(field);
+ dataSet.putRecord(record);
+ }
+ }
+
+ private QueryDataSet processShowLockInfo(ShowLockInfoPlan showLockInfoPlan)
+ throws MetadataException {
+ ListDataSet listDataSet =
+ new ListDataSet(
+ Arrays.asList(
+ new PartialPath(COLUMN_STORAGE_GROUP, false),
+ new PartialPath(COLUMN_LOCK_INFO, false)),
+ Arrays.asList(TSDataType.TEXT, TSDataType.TEXT));
+ try {
+ List<PartialPath> storageGroupList = getStorageGroupNames(showLockInfoPlan.getPath());
+ List<String> lockHolderList = StorageEngine.getInstance().getLockInfo(storageGroupList);
+ addLockInfoToDataSet(storageGroupList, lockHolderList, listDataSet);
+ } catch (StorageEngineException e) {
+ throw new MetadataException(e);
+ }
+ return listDataSet;
+ }
+
+ private void addLockInfoToDataSet(
+ List<PartialPath> paths, List<String> lockHolderList, ListDataSet dataSet) {
+ for (int i = 0; i < paths.size(); i++) {
+ RowRecord record = new RowRecord(0);
+ Field field = new Field(TSDataType.TEXT);
+ field.setBinaryV(new Binary(paths.get(i).getFullPath()));
+ record.addField(field);
+ field = new Field(TSDataType.TEXT);
+ field.setBinaryV(new Binary(lockHolderList.get(i)));
+ record.addField(field);
+ dataSet.putRecord(record);
+ }
+ }
+
+ private QueryDataSet processShowTimeseries(
+ ShowTimeSeriesPlan showTimeSeriesPlan, QueryContext context) throws MetadataException {
+ return new ShowTimeseriesDataSet(showTimeSeriesPlan, context);
+ }
+
+ protected List<IStorageGroupMNode> getAllStorageGroupNodes() {
+ return IoTDB.metaManager.getAllStorageGroupNodes();
+ }
+
+ private QueryDataSet processShowTTLQuery(ShowTTLPlan showTTLPlan) {
+ ListDataSet listDataSet =
+ new ListDataSet(
+ Arrays.asList(
+ new PartialPath(COLUMN_STORAGE_GROUP, false), new PartialPath(COLUMN_TTL, false)),
+ Arrays.asList(TSDataType.TEXT, TSDataType.INT64));
+ Set<PartialPath> selectedSgs = new HashSet<>(showTTLPlan.getStorageGroups());
+
+ List<IStorageGroupMNode> storageGroups = getAllStorageGroupNodes();
+ int timestamp = 0;
+ for (IStorageGroupMNode mNode : storageGroups) {
+ PartialPath sgName = mNode.getPartialPath();
+ if (!selectedSgs.isEmpty() && !selectedSgs.contains(sgName)) {
+ continue;
+ }
+ RowRecord rowRecord = new RowRecord(timestamp++);
+ Field sg = new Field(TSDataType.TEXT);
+ Field ttl;
+ sg.setBinaryV(new Binary(sgName.getFullPath()));
+ if (mNode.getDataTTL() != Long.MAX_VALUE) {
+ ttl = new Field(TSDataType.INT64);
+ ttl.setLongV(mNode.getDataTTL());
+ } else {
+ ttl = null;
+ }
+ rowRecord.addField(sg);
+ rowRecord.addField(ttl);
+ listDataSet.putRecord(rowRecord);
+ }
+
+ return listDataSet;
+ }
+
+ private QueryDataSet processShowVersion() {
+ SingleDataSet singleDataSet =
+ new SingleDataSet(
+ Collections.singletonList(new PartialPath(IoTDBConstant.COLUMN_VERSION, false)),
+ Collections.singletonList(TSDataType.TEXT));
+ Field field = new Field(TSDataType.TEXT);
+ field.setBinaryV(new Binary(IoTDBConstant.VERSION));
+ RowRecord rowRecord = new RowRecord(0);
+ rowRecord.addField(field);
+ singleDataSet.setRecord(rowRecord);
+ return singleDataSet;
+ }
+
+ private QueryDataSet processShowFlushTaskInfo() {
+ ListDataSet listDataSet =
+ new ListDataSet(
+ Arrays.asList(
+ new PartialPath(COLUMN_ITEM, false), new PartialPath(COLUMN_VALUE, false)),
+ Arrays.asList(TSDataType.TEXT, TSDataType.TEXT));
+
+ int timestamp = 0;
+ addRowRecordForShowQuery(
+ listDataSet,
+ timestamp++,
+ "total number of flush tasks",
+ Integer.toString(FlushTaskPoolManager.getInstance().getTotalTasks()));
+ addRowRecordForShowQuery(
+ listDataSet,
+ timestamp++,
+ "number of working flush tasks",
+ Integer.toString(FlushTaskPoolManager.getInstance().getWorkingTasksNumber()));
+ addRowRecordForShowQuery(
+ listDataSet,
+ timestamp,
+ "number of waiting flush tasks",
+ Integer.toString(FlushTaskPoolManager.getInstance().getWaitingTasksNumber()));
+ return listDataSet;
+ }
+
+ private QueryDataSet processShowFunctions(ShowFunctionsPlan showPlan)
+ throws QueryProcessException {
+ ListDataSet listDataSet =
+ new ListDataSet(
+ Arrays.asList(
+ new PartialPath(COLUMN_FUNCTION_NAME, false),
+ new PartialPath(COLUMN_FUNCTION_TYPE, false),
+ new PartialPath(COLUMN_FUNCTION_CLASS, false)),
+ Arrays.asList(TSDataType.TEXT, TSDataType.TEXT, TSDataType.TEXT));
+
+ appendUDFs(listDataSet, showPlan);
+ appendNativeFunctions(listDataSet, showPlan);
+
+ listDataSet.sort(
+ (r1, r2) ->
+ String.CASE_INSENSITIVE_ORDER.compare(
+ r1.getFields().get(0).getStringValue(), r2.getFields().get(0).getStringValue()));
+ return listDataSet;
+ }
+
+ @SuppressWarnings("squid:S3776")
+ private void appendUDFs(ListDataSet listDataSet, ShowFunctionsPlan showPlan)
+ throws QueryProcessException {
+ for (UDFRegistrationInformation info :
+ UDFRegistrationService.getInstance().getRegistrationInformation()) {
+ RowRecord rowRecord = new RowRecord(0); // ignore timestamp
+ rowRecord.addField(Binary.valueOf(info.getFunctionName()), TSDataType.TEXT);
+ String functionType = "";
+ try {
+ if (info.isBuiltin()) {
+ if (info.isUDTF()) {
+ functionType = FUNCTION_TYPE_BUILTIN_UDTF;
+ } else if (info.isUDAF()) {
+ functionType = FUNCTION_TYPE_BUILTIN_UDAF;
+ }
+ } else {
+ if (info.isUDTF()) {
+ functionType = FUNCTION_TYPE_EXTERNAL_UDTF;
+ } else if (info.isUDAF()) {
+ functionType = FUNCTION_TYPE_EXTERNAL_UDAF;
+ }
+ }
+ } catch (InstantiationException
+ | InvocationTargetException
+ | NoSuchMethodException
+ | IllegalAccessException e) {
+ throw new QueryProcessException(e.toString());
+ }
+ rowRecord.addField(Binary.valueOf(functionType), TSDataType.TEXT);
+ rowRecord.addField(Binary.valueOf(info.getClassName()), TSDataType.TEXT);
+ listDataSet.putRecord(rowRecord);
+ }
+ }
+
+ private QueryDataSet processShowContinuousQueries() {
+ ListDataSet listDataSet =
+ new ListDataSet(
+ Arrays.asList(
+ new PartialPath(COLUMN_CONTINUOUS_QUERY_NAME, false),
+ new PartialPath(COLUMN_CONTINUOUS_QUERY_EVERY_INTERVAL, false),
+ new PartialPath(COLUMN_CONTINUOUS_QUERY_FOR_INTERVAL, false),
+ new PartialPath(COLUMN_CONTINUOUS_QUERY_QUERY_SQL, false),
+ new PartialPath(COLUMN_CONTINUOUS_QUERY_TARGET_PATH, false)),
+ Arrays.asList(
+ TSDataType.TEXT,
+ TSDataType.INT64,
+ TSDataType.INT64,
+ TSDataType.TEXT,
+ TSDataType.TEXT));
+
+ List<ShowContinuousQueriesResult> continuousQueriesList =
+ ContinuousQueryService.getInstance().getShowContinuousQueriesResultList();
+
+ for (ShowContinuousQueriesResult result : continuousQueriesList) {
+ RowRecord record = new RowRecord(0);
+ record.addField(Binary.valueOf(result.getContinuousQueryName()), TSDataType.TEXT);
+ record.addField(result.getEveryInterval(), TSDataType.INT64);
+ record.addField(result.getForInterval(), TSDataType.INT64);
+ record.addField(Binary.valueOf(result.getQuerySql()), TSDataType.TEXT);
+ record.addField(Binary.valueOf(result.getTargetPath().getFullPath()), TSDataType.TEXT);
+ listDataSet.putRecord(record);
+ }
+
+ return listDataSet;
+ }
+
+ private void appendNativeFunctions(ListDataSet listDataSet, ShowFunctionsPlan showPlan) {
+ final Binary functionType = Binary.valueOf(FUNCTION_TYPE_NATIVE);
+ final Binary className = Binary.valueOf("");
+ for (String functionName : SQLConstant.getNativeFunctionNames()) {
+ RowRecord rowRecord = new RowRecord(0); // ignore timestamp
+ rowRecord.addField(Binary.valueOf(functionName.toUpperCase()), TSDataType.TEXT);
+ rowRecord.addField(functionType, TSDataType.TEXT);
+ rowRecord.addField(className, TSDataType.TEXT);
+ listDataSet.putRecord(rowRecord);
+ }
+ }
+
+ private QueryDataSet processShowTriggers() {
+ return TriggerRegistrationService.getInstance().show();
+ }
+
+ private void addRowRecordForShowQuery(
+ ListDataSet listDataSet, int timestamp, String item, String value) {
+ RowRecord rowRecord = new RowRecord(timestamp);
+ Field itemField = new Field(TSDataType.TEXT);
+ itemField.setBinaryV(new Binary(item));
+ Field valueField = new Field(TSDataType.TEXT);
+ valueField.setBinaryV(new Binary(value));
+ rowRecord.addField(itemField);
+ rowRecord.addField(valueField);
+ listDataSet.putRecord(rowRecord);
+ }
+
+ @Override
+ public void delete(DeletePlan deletePlan) throws QueryProcessException {
+ AUDIT_LOGGER.info(
+ "delete data from {} in [{},{}]",
+ deletePlan.getPaths(),
+ deletePlan.getDeleteStartTime(),
+ deletePlan.getDeleteEndTime());
+ for (PartialPath path : deletePlan.getPaths()) {
+ delete(
+ path,
+ deletePlan.getDeleteStartTime(),
+ deletePlan.getDeleteEndTime(),
+ deletePlan.getIndex(),
+ deletePlan.getPartitionFilter());
+ }
+ }
+
+ private void operateLoadFiles(OperateFilePlan plan) throws QueryProcessException {
+ File file = plan.getFile();
+ if (!file.exists()) {
+ throw new QueryProcessException(
+ String.format("File path %s doesn't exists.", file.getPath()));
+ }
+ if (file.isDirectory()) {
+ loadDir(file, plan);
+ } else {
+ loadFile(file, plan);
+ }
+ }
+
+ private void loadDir(File curFile, OperateFilePlan plan) throws QueryProcessException {
+ File[] files = curFile.listFiles();
+ long[] establishTime = new long[files.length];
+ List<Integer> tsfiles = new ArrayList<>();
+
+ for (int i = 0; i < files.length; i++) {
+ File file = files[i];
+ if (!file.isDirectory()) {
+ String fileName = file.getName();
+ if (fileName.endsWith(TSFILE_SUFFIX)) {
+ establishTime[i] = Long.parseLong(fileName.split(FILE_NAME_SEPARATOR)[0]);
+ tsfiles.add(i);
+ }
+ }
+ }
+ Collections.sort(
+ tsfiles,
+ (o1, o2) -> {
+ if (establishTime[o1] == establishTime[o2]) {
+ return 0;
+ }
+ return establishTime[o1] < establishTime[o2] ? -1 : 1;
+ });
+ for (Integer i : tsfiles) {
+ loadFile(files[i], plan);
+ }
+
+ for (File file : files) {
+ if (file.isDirectory()) {
+ loadDir(file, plan);
+ }
+ }
+ }
+
+ private void loadFile(File file, OperateFilePlan plan) throws QueryProcessException {
+ if (!file.getName().endsWith(TSFILE_SUFFIX)) {
+ return;
+ }
+ TsFileResource tsFileResource = new TsFileResource(file);
+ tsFileResource.setClosed(true);
+ try {
+ // check file
+ RestorableTsFileIOWriter restorableTsFileIOWriter = new RestorableTsFileIOWriter(file);
+ if (restorableTsFileIOWriter.hasCrashed()) {
+ restorableTsFileIOWriter.close();
+ throw new QueryProcessException(
+ String.format(
+ "Cannot load file %s because the file has crashed.", file.getAbsolutePath()));
+ }
+ Map<Path, IMeasurementSchema> schemaMap = new HashMap<>();
+
+ List<ChunkGroupMetadata> chunkGroupMetadataList = new ArrayList<>();
+ try (TsFileSequenceReader reader = new TsFileSequenceReader(file.getAbsolutePath(), false)) {
+ reader.selfCheck(schemaMap, chunkGroupMetadataList, false);
+ if (plan.getVerifyMetadata()) {
+ loadNewTsFileVerifyMetadata(reader);
+ }
+ } catch (IOException e) {
+ logger.warn("can not get timeseries metadata from {}.", file.getAbsoluteFile());
+ throw new QueryProcessException(e.getMessage());
+ }
+
+ FileLoaderUtils.checkTsFileResource(tsFileResource);
+ if (UpgradeUtils.isNeedUpgrade(tsFileResource)) {
+ throw new QueryProcessException(
+ String.format(
+ "Cannot load file %s because the file's version is old which needs to be upgraded.",
+ file.getAbsolutePath()));
+ }
+
+ // create schemas if they doesn't exist
+ if (plan.isAutoCreateSchema()) {
+ createSchemaAutomatically(chunkGroupMetadataList, schemaMap, plan.getSgLevel());
+ }
+
+ List<TsFileResource> splitResources = new ArrayList();
+ if (tsFileResource.isSpanMultiTimePartitions()) {
+ logger.info(
+ "try to split the tsFile={} du to it spans multi partitions",
+ tsFileResource.getTsFile().getPath());
+ TsFileRewriteTool.rewriteTsFile(tsFileResource, splitResources);
+ tsFileResource.writeLock();
+ tsFileResource.removeModFile();
+ tsFileResource.writeUnlock();
+ logger.info(
+ "after split, the old tsFile was split to {} new tsFiles", splitResources.size());
+ }
+
+ if (splitResources.isEmpty()) {
+ splitResources.add(tsFileResource);
+ }
+
+ for (TsFileResource resource : splitResources) {
+ StorageEngine.getInstance().loadNewTsFile(resource);
+ }
+ } catch (Exception e) {
+ logger.error("fail to load file {}", file.getName(), e);
+ throw new QueryProcessException(
+ String.format("Cannot load file %s because %s", file.getAbsolutePath(), e.getMessage()));
+ }
+ }
+
+ private void loadNewTsFileVerifyMetadata(TsFileSequenceReader tsFileSequenceReader)
+ throws MetadataException, QueryProcessException, IOException {
+ Map<String, List<TimeseriesMetadata>> metadataSet =
+ tsFileSequenceReader.getAllTimeseriesMetadata();
+ for (Map.Entry<String, List<TimeseriesMetadata>> entry : metadataSet.entrySet()) {
+ String deviceId = entry.getKey();
+ PartialPath devicePath = new PartialPath(deviceId);
+ if (!IoTDB.metaManager.isPathExist(devicePath)) {
+ continue;
+ }
+ for (TimeseriesMetadata metadata : entry.getValue()) {
+ PartialPath fullPath =
+ new PartialPath(deviceId + TsFileConstant.PATH_SEPARATOR + metadata.getMeasurementId());
+ if (IoTDB.metaManager.isPathExist(fullPath)) {
+ TSDataType dataType = IoTDB.metaManager.getSeriesType(fullPath);
+ if (dataType != metadata.getTSDataType()) {
+ throw new QueryProcessException(
+ fullPath.getFullPath()
+ + " is "
+ + metadata.getTSDataType().name()
+ + " in the loading TsFile but is "
+ + dataType.name()
+ + " in IoTDB.");
+ }
+ }
+ }
+ }
+ }
+
+ @SuppressWarnings("squid:S3776") // Suppress high Cognitive Complexity warning
+ private void createSchemaAutomatically(
+ List<ChunkGroupMetadata> chunkGroupMetadataList,
+ Map<Path, IMeasurementSchema> knownSchemas,
+ int sgLevel)
+ throws MetadataException {
+ if (chunkGroupMetadataList.isEmpty()) {
+ return;
+ }
+
+ Set<PartialPath> registeredSeries = new HashSet<>();
+ for (ChunkGroupMetadata chunkGroupMetadata : chunkGroupMetadataList) {
+ String device = chunkGroupMetadata.getDevice();
+ Set<String> existSeriesSet = new HashSet<>();
+ PartialPath devicePath = new PartialPath(device);
+ PartialPath storageGroupPath = MetaUtils.getStorageGroupPathByLevel(devicePath, sgLevel);
+ try {
+ IoTDB.metaManager.setStorageGroup(storageGroupPath);
+ } catch (StorageGroupAlreadySetException alreadySetException) {
+ if (!alreadySetException.getStorageGroupPath().equals(storageGroupPath.getFullPath())) {
+ throw alreadySetException;
+ }
+ }
+ for (PartialPath path :
+ IoTDB.metaManager.getMeasurementPaths(devicePath.concatNode(ONE_LEVEL_PATH_WILDCARD))) {
+ existSeriesSet.add(path.getMeasurement());
+ existSeriesSet.add(path.getMeasurementAlias());
+ }
+ for (ChunkMetadata chunkMetadata : chunkGroupMetadata.getChunkMetadataList()) {
+ PartialPath series =
+ new PartialPath(
+ chunkGroupMetadata.getDevice()
+ + TsFileConstant.PATH_SEPARATOR
+ + chunkMetadata.getMeasurementUid());
+ if (!registeredSeries.contains(series)) {
+ registeredSeries.add(series);
+ IMeasurementSchema schema =
+ knownSchemas.get(new Path(series.getDevice(), series.getMeasurement()));
+ if (schema == null) {
+ throw new MetadataException(
+ String.format(
+ "Can not get the schema of measurement [%s]",
+ chunkMetadata.getMeasurementUid()));
+ }
+ if (!existSeriesSet.contains(chunkMetadata.getMeasurementUid())) {
+ IoTDB.metaManager.createTimeseries(
+ series,
+ schema.getType(),
+ schema.getEncodingType(),
+ schema.getCompressor(),
+ Collections.emptyMap());
+ }
+ }
+ }
+ }
+ }
+
+ private void operateRemoveFile(OperateFilePlan plan) throws QueryProcessException {
+ try {
+ if (!StorageEngine.getInstance().deleteTsfile(plan.getFile())) {
+ throw new QueryProcessException(
+ String.format("File %s doesn't exist.", plan.getFile().getName()));
+ }
+ } catch (StorageEngineException | IllegalPathException e) {
+ throw new QueryProcessException(
+ String.format("Cannot remove file because %s", e.getMessage()));
+ }
+ }
+
+ private void operateUnloadFile(OperateFilePlan plan) throws QueryProcessException {
+ if (!plan.getTargetDir().exists() || !plan.getTargetDir().isDirectory()) {
+ throw new QueryProcessException(
+ String.format("Target dir %s is invalid.", plan.getTargetDir().getPath()));
+ }
+ try {
+ if (!StorageEngine.getInstance().unloadTsfile(plan.getFile(), plan.getTargetDir())) {
+ throw new QueryProcessException(
+ String.format("File %s doesn't exist.", plan.getFile().getName()));
+ }
+ } catch (StorageEngineException | IllegalPathException e) {
+ throw new QueryProcessException(
+ String.format(
+ "Cannot unload file %s to target directory %s because %s",
+ plan.getFile().getPath(), plan.getTargetDir().getPath(), e.getMessage()));
+ }
+ }
+
+ private void operateTTL(SetTTLPlan plan) throws QueryProcessException {
+ try {
+ List<PartialPath> storageGroupPaths =
+ IoTDB.metaManager.getMatchedStorageGroups(plan.getStorageGroup());
+ for (PartialPath storagePath : storageGroupPaths) {
+ IoTDB.metaManager.setTTL(storagePath, plan.getDataTTL());
+ StorageEngine.getInstance().setTTL(storagePath, plan.getDataTTL());
+ }
+ } catch (MetadataException e) {
+ throw new QueryProcessException(e);
+ } catch (IOException e) {
+ throw new QueryProcessException(e.getMessage());
+ }
+ }
+
+ @Override
+ public void update(PartialPath path, long startTime, long endTime, String value) {
+ throw new UnsupportedOperationException("update is not supported now");
+ }
+
+ @Override
+ public void delete(
+ PartialPath path,
+ long startTime,
+ long endTime,
+ long planIndex,
+ TimePartitionFilter timePartitionFilter)
+ throws QueryProcessException {
+ try {
+ StorageEngine.getInstance().delete(path, startTime, endTime, planIndex, timePartitionFilter);
+ } catch (StorageEngineException e) {
+ throw new QueryProcessException(e);
+ }
+ }
+
+ private void checkFailedMeasurments(InsertPlan plan)
+ throws PathNotExistException, StorageEngineException {
+ // check if all path not exist exceptions
+ List<String> failedPaths = plan.getFailedMeasurements();
+ List<Exception> exceptions = plan.getFailedExceptions();
+ boolean isPathNotExistException = true;
+ for (Exception e : exceptions) {
+ Throwable curException = e;
+ while (curException.getCause() != null) {
+ curException = curException.getCause();
+ }
+ if (!(curException instanceof PathNotExistException)) {
+ isPathNotExistException = false;
+ break;
+ }
+ }
+ if (isPathNotExistException) {
+ throw new PathNotExistException(failedPaths);
+ } else {
+ throw new StorageEngineException(
+ INSERT_MEASUREMENTS_FAILED_MESSAGE
+ + plan.getFailedMeasurements()
+ + (!exceptions.isEmpty() ? (" caused by " + exceptions.get(0).getMessage()) : ""));
+ }
+ }
+
+ @Override
+ public void insert(InsertRowsOfOneDevicePlan insertRowsOfOneDevicePlan)
+ throws QueryProcessException {
+ if (insertRowsOfOneDevicePlan.getRowPlans().length == 0) {
+ return;
+ }
+ try {
+ // insert to storage engine
+ StorageEngine.getInstance().insert(insertRowsOfOneDevicePlan);
+
+ List<String> notExistedPaths = null;
+ List<String> failedMeasurements = null;
+
+ // If there are some exceptions, we assume they caused by the same reason.
+ Exception exception = null;
+ for (InsertRowPlan plan : insertRowsOfOneDevicePlan.getRowPlans()) {
+ if (plan.getFailedMeasurements() != null) {
+ if (notExistedPaths == null) {
+ notExistedPaths = new ArrayList<>();
+ failedMeasurements = new ArrayList<>();
+ }
+ // check if all path not exist exceptions
+ List<String> failedPaths = plan.getFailedMeasurements();
+ List<Exception> exceptions = plan.getFailedExceptions();
+ boolean isPathNotExistException = true;
+ for (Exception e : exceptions) {
+ exception = e;
+ Throwable curException = e;
+ while (curException.getCause() != null) {
+ curException = curException.getCause();
+ }
+ if (!(curException instanceof PathNotExistException)) {
+ isPathNotExistException = false;
+ break;
+ }
+ }
+ if (isPathNotExistException) {
+ notExistedPaths.addAll(failedPaths);
+ } else {
+ failedMeasurements.addAll(plan.getFailedMeasurements());
+ }
+ }
+ }
+ if (notExistedPaths != null && !notExistedPaths.isEmpty()) {
+ throw new PathNotExistException(notExistedPaths);
+ } else if (notExistedPaths != null && !failedMeasurements.isEmpty()) {
+ throw new StorageEngineException(
+ "failed to insert points "
+ + failedMeasurements
+ + (exception != null ? (" caused by " + exception.getMessage()) : ""));
+ }
+
+ } catch (StorageEngineException | MetadataException e) {
+ throw new QueryProcessException(e);
+ }
+ }
+
+ @Override
+ public void insert(InsertRowsPlan plan) throws QueryProcessException {
+ for (int i = 0; i < plan.getInsertRowPlanList().size(); i++) {
+ if (plan.getResults().containsKey(i) || plan.isExecuted(i)) {
+ continue;
+ }
+ try {
+ insert(plan.getInsertRowPlanList().get(i));
+ } catch (QueryProcessException e) {
+ plan.getResults().put(i, RpcUtils.getStatus(e.getErrorCode(), e.getMessage()));
+ }
+ }
+ if (!plan.getResults().isEmpty()) {
+ throw new BatchProcessException(plan.getFailingStatus());
+ }
+ }
+
+ @Override
+ public void insert(InsertRowPlan insertRowPlan) throws QueryProcessException {
+ try {
+ insertRowPlan.setMeasurementMNodes(
+ new IMeasurementMNode[insertRowPlan.getMeasurements().length]);
+ // When insert data with sql statement, the data types will be null here.
+ // We need to predicted the data types first
+ if (insertRowPlan.getDataTypes()[0] == null) {
+ for (int i = 0; i < insertRowPlan.getDataTypes().length; i++) {
+ insertRowPlan.getDataTypes()[i] =
+ TypeInferenceUtils.getPredictedDataType(
+ insertRowPlan.getValues()[i], insertRowPlan.isNeedInferType());
+ }
+ }
+
+ StorageEngine.getInstance().insert(insertRowPlan);
+
+ if (insertRowPlan.getFailedMeasurements() != null) {
+ checkFailedMeasurments(insertRowPlan);
+ }
+ } catch (StorageEngineException | MetadataException e) {
+ if (IoTDBDescriptor.getInstance().getConfig().isEnableStatMonitor()) {
+ StatMonitor.getInstance().updateFailedStatValue();
+ }
+ throw new QueryProcessException(e);
+ } catch (Exception e) {
+ // update failed statistics
+ if (IoTDBDescriptor.getInstance().getConfig().isEnableStatMonitor()) {
+ StatMonitor.getInstance().updateFailedStatValue();
+ }
+ throw e;
+ }
+ }
+
+ @Override
+ public void insertTablet(InsertMultiTabletPlan insertMultiTabletPlan)
+ throws QueryProcessException {
+ if (insertMultiTabletPlan.isEnableMultiThreading()) {
+ insertTabletParallel(insertMultiTabletPlan);
+ } else {
+ insertTabletSerial(insertMultiTabletPlan);
+ }
+ }
+
+ private void insertTabletSerial(InsertMultiTabletPlan insertMultiTabletPlan)
+ throws BatchProcessException {
+ for (int i = 0; i < insertMultiTabletPlan.getInsertTabletPlanList().size(); i++) {
+ if (insertMultiTabletPlan.getResults().containsKey(i)
+ || insertMultiTabletPlan.isExecuted(i)) {
+ continue;
+ }
+ try {
+ insertTablet(insertMultiTabletPlan.getInsertTabletPlanList().get(i));
+ } catch (QueryProcessException e) {
+ insertMultiTabletPlan
+ .getResults()
+ .put(i, RpcUtils.getStatus(e.getErrorCode(), e.getMessage()));
+ }
+ }
+ if (!insertMultiTabletPlan.getResults().isEmpty()) {
+ throw new BatchProcessException(insertMultiTabletPlan.getFailingStatus());
+ }
+ }
+
+ private void insertTabletParallel(InsertMultiTabletPlan insertMultiTabletPlan)
+ throws BatchProcessException {
+ updateInsertTabletsPool(insertMultiTabletPlan.getDifferentStorageGroupsCount());
+
+ List<InsertTabletPlan> planList = insertMultiTabletPlan.getInsertTabletPlanList();
+ List<Future<?>> futureList = new ArrayList<>();
+
+ Map<Integer, TSStatus> results = insertMultiTabletPlan.getResults();
+
+ List<InsertTabletPlan> runPlanList = new ArrayList<>();
+ Map<Integer, Integer> runIndexToRealIndex = new HashMap<>();
+ for (int i = 0; i < planList.size(); i++) {
+ if (!(results.containsKey(i) || insertMultiTabletPlan.isExecuted(i))) {
+ runPlanList.add(planList.get(i));
+ runIndexToRealIndex.put(runPlanList.size() - 1, i);
+ }
+ }
+ for (InsertTabletPlan plan : runPlanList) {
+ Future<?> f =
+ insertionPool.submit(
+ () -> {
+ insertTablet(plan);
+ return null;
+ });
+ futureList.add(f);
+ }
+ for (int i = 0; i < futureList.size(); i++) {
+ try {
+ futureList.get(i).get();
+ } catch (Exception e) {
+ if (e.getCause() instanceof QueryProcessException) {
+ QueryProcessException qe = (QueryProcessException) e.getCause();
+ results.put(
+ runIndexToRealIndex.get(i), RpcUtils.getStatus(qe.getErrorCode(), qe.getMessage()));
+ } else {
+ results.put(
+ runIndexToRealIndex.get(i),
+ RpcUtils.getStatus(INTERNAL_SERVER_ERROR, e.getMessage()));
+ }
+ }
+ }
+
+ if (!results.isEmpty()) {
+ throw new BatchProcessException(insertMultiTabletPlan.getFailingStatus());
+ }
+ }
+
+ private void updateInsertTabletsPool(int sgSize) {
+ int updateCoreSize = Math.min(sgSize, Runtime.getRuntime().availableProcessors() / 2);
+ if (insertionPool == null || insertionPool.isTerminated()) {
+ insertionPool =
+ (ThreadPoolExecutor)
+ IoTDBThreadPoolFactory.newFixedThreadPool(
+ updateCoreSize, ThreadName.INSERTION_SERVICE.getName());
+ } else if (insertionPool.getCorePoolSize() > updateCoreSize) {
+ insertionPool.setCorePoolSize(updateCoreSize);
+ insertionPool.setMaximumPoolSize(updateCoreSize);
+ } else if (insertionPool.getCorePoolSize() < updateCoreSize) {
+ insertionPool.setMaximumPoolSize(updateCoreSize);
+ insertionPool.setCorePoolSize(updateCoreSize);
+ }
+ }
+
+ @Override
+ public void insertTablet(InsertTabletPlan insertTabletPlan) throws QueryProcessException {
+
+ if (insertTabletPlan.getRowCount() == 0) {
+ return;
+ }
+ try {
+ insertTabletPlan.setMeasurementMNodes(
+ new IMeasurementMNode[insertTabletPlan.getMeasurements().length]);
+
+ StorageEngine.getInstance().insertTablet(insertTabletPlan);
+
+ if (insertTabletPlan.getFailedMeasurements() != null) {
+ checkFailedMeasurments(insertTabletPlan);
+ }
+ } catch (StorageEngineException | MetadataException e) {
+ if (IoTDBDescriptor.getInstance().getConfig().isEnableStatMonitor()) {
+ StatMonitor.getInstance().updateFailedStatValue();
+ }
+ throw new QueryProcessException(e);
+ } catch (Exception e) {
+ // update failed statistics
+ if (IoTDBDescriptor.getInstance().getConfig().isEnableStatMonitor()) {
+ StatMonitor.getInstance().updateFailedStatValue();
+ }
+ throw e;
+ }
+ }
+
+ private boolean operateAuthor(AuthorPlan author) throws QueryProcessException {
+ AuthorOperator.AuthorType authorType = author.getAuthorType();
+ String userName = author.getUserName();
+ String roleName = author.getRoleName();
+ String password = author.getPassword();
+ String newPassword = author.getNewPassword();
+ Set<Integer> permissions = author.getPermissions();
+ PartialPath nodeName = author.getNodeName();
+ try {
+ switch (authorType) {
+ case UPDATE_USER:
+ authorizer.updateUserPassword(userName, newPassword);
+ break;
+ case CREATE_USER:
+ authorizer.createUser(userName, password);
+ break;
+ case CREATE_ROLE:
+ authorizer.createRole(roleName);
+ break;
+ case DROP_USER:
+ authorizer.deleteUser(userName);
+ break;
+ case DROP_ROLE:
+ authorizer.deleteRole(roleName);
+ break;
+ case GRANT_ROLE:
+ for (int i : permissions) {
+ authorizer.grantPrivilegeToRole(roleName, nodeName.getFullPath(), i);
+ }
+ break;
+ case GRANT_USER:
+ for (int i : permissions) {
+ authorizer.grantPrivilegeToUser(userName, nodeName.getFullPath(), i);
+ }
+ break;
+ case GRANT_ROLE_TO_USER:
+ authorizer.grantRoleToUser(roleName, userName);
+ break;
+ case REVOKE_USER:
+ for (int i : permissions) {
+ authorizer.revokePrivilegeFromUser(userName, nodeName.getFullPath(), i);
+ }
+ break;
+ case REVOKE_ROLE:
+ for (int i : permissions) {
+ authorizer.revokePrivilegeFromRole(roleName, nodeName.getFullPath(), i);
+ }
+ break;
+ case REVOKE_ROLE_FROM_USER:
+ authorizer.revokeRoleFromUser(roleName, userName);
+ break;
+ default:
+ throw new QueryProcessException("Unsupported operation " + authorType);
+ }
+ } catch (AuthException e) {
+ throw new QueryProcessException(e.getMessage(), true);
+ }
+ return true;
+ }
+
+ private boolean operateWatermarkEmbedding(List<String> users, boolean useWatermark)
+ throws QueryProcessException {
+ try {
+ for (String user : users) {
+ authorizer.setUserUseWaterMark(user, useWatermark);
+ }
+ } catch (AuthException e) {
+ throw new QueryProcessException(e.getMessage());
+ }
+ return true;
+ }
+
+ private boolean createTimeSeries(CreateTimeSeriesPlan createTimeSeriesPlan)
+ throws QueryProcessException {
+ try {
+ IoTDB.metaManager.createTimeseries(createTimeSeriesPlan);
+ } catch (MetadataException e) {
+ throw new QueryProcessException(e);
+ }
+ return true;
+ }
+
+ private boolean createAlignedTimeSeries(CreateAlignedTimeSeriesPlan createAlignedTimeSeriesPlan)
+ throws QueryProcessException {
+ try {
+ IoTDB.metaManager.createAlignedTimeSeries(createAlignedTimeSeriesPlan);
+ } catch (MetadataException e) {
+ throw new QueryProcessException(e);
+ }
+ return true;
+ }
+
+ @SuppressWarnings("squid:S3776") // high Cognitive Complexity
+ private boolean createMultiTimeSeries(CreateMultiTimeSeriesPlan multiPlan)
+ throws BatchProcessException {
+ int dataTypeIdx = 0;
+ for (int i = 0; i < multiPlan.getPaths().size(); i++) {
+ if (multiPlan.getResults().containsKey(i) || multiPlan.isExecuted(i)) {
+ continue;
+ }
+ PartialPath path = multiPlan.getPaths().get(i);
+ String measurement = path.getMeasurement();
+ CreateTimeSeriesPlan plan =
+ new CreateTimeSeriesPlan(
+ multiPlan.getPaths().get(i),
+ multiPlan.getDataTypes().get(i),
+ multiPlan.getEncodings().get(i),
+ multiPlan.getCompressors().get(i),
+ multiPlan.getProps() == null ? null : multiPlan.getProps().get(i),
+ multiPlan.getTags() == null ? null : multiPlan.getTags().get(i),
+ multiPlan.getAttributes() == null ? null : multiPlan.getAttributes().get(i),
+ multiPlan.getAlias() == null ? null : multiPlan.getAlias().get(i));
+ dataTypeIdx++;
+ try {
+ createTimeSeries(plan);
+ } catch (QueryProcessException e) {
+ multiPlan.getResults().put(i, RpcUtils.getStatus(e.getErrorCode(), e.getMessage()));
+ }
+ }
+ if (!multiPlan.getResults().isEmpty()) {
+ throw new BatchProcessException(multiPlan.getFailingStatus());
+ }
+ return true;
+ }
+
+ protected boolean deleteTimeSeries(DeleteTimeSeriesPlan deleteTimeSeriesPlan)
+ throws QueryProcessException {
+ AUDIT_LOGGER.info("delete timeseries {}", deleteTimeSeriesPlan.getPaths());
+ List<PartialPath> deletePathList = deleteTimeSeriesPlan.getPaths();
+ for (int i = 0; i < deletePathList.size(); i++) {
+ PartialPath path = deletePathList.get(i);
+ try {
+ StorageEngine.getInstance()
+ .deleteTimeseries(
+ path, deleteTimeSeriesPlan.getIndex(), deleteTimeSeriesPlan.getPartitionFilter());
+ String failed = IoTDB.metaManager.deleteTimeseries(path);
+ if (failed != null) {
+ deleteTimeSeriesPlan
+ .getResults()
+ .put(i, RpcUtils.getStatus(TSStatusCode.NODE_DELETE_FAILED_ERROR, failed));
+ }
+ } catch (StorageEngineException | MetadataException e) {
+ deleteTimeSeriesPlan
+ .getResults()
+ .put(i, RpcUtils.getStatus(e.getErrorCode(), e.getMessage()));
+ }
+ }
+ if (!deleteTimeSeriesPlan.getResults().isEmpty()) {
+ throw new BatchProcessException(deleteTimeSeriesPlan.getFailingStatus());
+ }
+ return true;
+ }
+
+ private boolean alterTimeSeries(AlterTimeSeriesPlan alterTimeSeriesPlan)
+ throws QueryProcessException {
+ PartialPath path = alterTimeSeriesPlan.getPath();
+ Map<String, String> alterMap = alterTimeSeriesPlan.getAlterMap();
+ try {
+ switch (alterTimeSeriesPlan.getAlterType()) {
+ case RENAME:
+ String beforeName = alterMap.keySet().iterator().next();
+ String currentName = alterMap.get(beforeName);
+ IoTDB.metaManager.renameTagOrAttributeKey(beforeName, currentName, path);
+ break;
+ case SET:
+ IoTDB.metaManager.setTagsOrAttributesValue(alterMap, path);
+ break;
+ case DROP:
+ IoTDB.metaManager.dropTagsOrAttributes(alterMap.keySet(), path);
+ break;
+ case ADD_TAGS:
+ IoTDB.metaManager.addTags(alterMap, path);
+ break;
+ case ADD_ATTRIBUTES:
+ IoTDB.metaManager.addAttributes(alterMap, path);
+ break;
+ case UPSERT:
+ IoTDB.metaManager.upsertTagsAndAttributes(
+ alterTimeSeriesPlan.getAlias(),
+ alterTimeSeriesPlan.getTagsMap(),
+ alterTimeSeriesPlan.getAttributesMap(),
+ path);
+ break;
+ }
+ } catch (MetadataException e) {
+ throw new QueryProcessException(e);
+ } catch (IOException e) {
+ throw new QueryProcessException(
+ String.format(
+ "Something went wrong while read/write the [%s]'s tag/attribute info.",
+ path.getFullPath()));
+ }
+ return true;
+ }
+
+ public boolean setStorageGroup(SetStorageGroupPlan setStorageGroupPlan)
+ throws QueryProcessException {
+ AUDIT_LOGGER.info("set storage group to {}", setStorageGroupPlan.getPaths());
+ PartialPath path = setStorageGroupPlan.getPath();
+ try {
+ IoTDB.metaManager.setStorageGroup(path);
+ } catch (MetadataException e) {
+ throw new QueryProcessException(e);
+ }
+ return true;
+ }
+
+ protected boolean deleteStorageGroups(DeleteStorageGroupPlan deleteStorageGroupPlan)
+ throws QueryProcessException {
+ AUDIT_LOGGER.info("delete storage group {}", deleteStorageGroupPlan.getPaths());
+ List<PartialPath> deletePathList = new ArrayList<>();
+ try {
+ for (PartialPath storageGroupPath : deleteStorageGroupPlan.getPaths()) {
+ List<PartialPath> allRelatedStorageGroupPath =
+ IoTDB.metaManager.getMatchedStorageGroups(storageGroupPath);
+ if (allRelatedStorageGroupPath.isEmpty()) {
+ throw new PathNotExistException(storageGroupPath.getFullPath(), true);
+ }
+ for (PartialPath path : allRelatedStorageGroupPath) {
+ StorageEngine.getInstance().deleteStorageGroup(path);
+ deletePathList.add(path);
+ }
+ }
+ IoTDB.metaManager.deleteStorageGroups(deletePathList);
+ operateClearCache();
+ } catch (MetadataException e) {
+ throw new QueryProcessException(e);
+ }
+ return true;
+ }
+
+ protected QueryDataSet processAuthorQuery(AuthorPlan plan) throws QueryProcessException {
+ AuthorType authorType = plan.getAuthorType();
+ String userName = plan.getUserName();
+ String roleName = plan.getRoleName();
+ PartialPath path = plan.getNodeName();
+
+ ListDataSet dataSet;
+
+ try {
+ switch (authorType) {
+ case LIST_ROLE:
+ dataSet = executeListRole(plan);
+ break;
+ case LIST_USER:
+ dataSet = executeListUser(plan);
+ break;
+ case LIST_ROLE_USERS:
+ dataSet = executeListRoleUsers(roleName);
+ break;
+ case LIST_USER_ROLES:
+ dataSet = executeListUserRoles(userName);
+ break;
+ case LIST_ROLE_PRIVILEGE:
+ dataSet = executeListRolePrivileges(roleName, path);
+ break;
+ case LIST_USER_PRIVILEGE:
+ dataSet = executeListUserPrivileges(userName, path);
+ break;
+ default:
+ throw new QueryProcessException("Unsupported operation " + authorType);
+ }
+ } catch (AuthException e) {
+ throw new QueryProcessException(e.getMessage());
+ }
+ return dataSet;
+ }
+
+ private ListDataSet executeListRole(AuthorPlan plan) throws AuthException {
+ ListDataSet dataSet =
+ new ListDataSet(
+ Collections.singletonList(new PartialPath(COLUMN_ROLE, false)),
+ Collections.singletonList(TSDataType.TEXT));
+
+ // check if current user is granted list_role privilege
+ boolean hasListRolePrivilege =
+ AuthorityChecker.check(
+ plan.getLoginUserName(),
+ Collections.emptyList(),
+ plan.getOperatorType(),
+ plan.getLoginUserName());
+ if (!hasListRolePrivilege) {
+ return dataSet;
+ }
+
+ List<String> roleList = authorizer.listAllRoles();
+ addToDataSet(roleList, dataSet);
+ return dataSet;
+ }
+
+ private void addToDataSet(List<String> strResults, ListDataSet dataSet) {
+ int index = 0;
+ for (String role : strResults) {
+ RowRecord record = new RowRecord(index++);
+ Field field = new Field(TSDataType.TEXT);
+ field.setBinaryV(new Binary(role));
+ record.addField(field);
+ dataSet.putRecord(record);
+ }
+ }
+
+ private ListDataSet executeListUser(AuthorPlan plan) throws AuthException {
+ ListDataSet dataSet =
+ new ListDataSet(
+ Collections.singletonList(new PartialPath(COLUMN_USER, false)),
+ Collections.singletonList(TSDataType.TEXT));
+
+ // check if current user is granted list_user privilege
+ boolean hasListUserPrivilege =
+ AuthorityChecker.check(
+ plan.getLoginUserName(),
+ Collections.singletonList((plan.getNodeName())),
+ plan.getOperatorType(),
+ plan.getLoginUserName());
+ if (!hasListUserPrivilege) {
+ return dataSet;
+ }
+
+ List<String> userList = authorizer.listAllUsers();
+ addToDataSet(userList, dataSet);
+ return dataSet;
+ }
+
+ private ListDataSet executeListRoleUsers(String roleName) throws AuthException {
+ Role role = authorizer.getRole(roleName);
+ if (role == null) {
+ throw new AuthException("No such role : " + roleName);
+ }
+ ListDataSet dataSet =
+ new ListDataSet(
+ Collections.singletonList(new PartialPath(COLUMN_USER, false)),
+ Collections.singletonList(TSDataType.TEXT));
+ List<String> userList = authorizer.listAllUsers();
+ int index = 0;
+ for (String userN : userList) {
+ User userObj = authorizer.getUser(userN);
+ if (userObj != null && userObj.hasRole(roleName)) {
+ RowRecord record = new RowRecord(index++);
+ Field field = new Field(TSDataType.TEXT);
+ field.setBinaryV(new Binary(userN));
+ record.addField(field);
+ dataSet.putRecord(record);
+ }
+ }
+ return dataSet;
+ }
+
+ private ListDataSet executeListUserRoles(String userName) throws AuthException {
+ User user = authorizer.getUser(userName);
+ if (user != null) {
+ ListDataSet dataSet =
+ new ListDataSet(
+ Collections.singletonList(new PartialPath(COLUMN_ROLE, false)),
+ Collections.singletonList(TSDataType.TEXT));
+ int index = 0;
+ for (String roleN : user.getRoleList()) {
+ RowRecord record = new RowRecord(index++);
+ Field field = new Field(TSDataType.TEXT);
+ field.setBinaryV(new Binary(roleN));
+ record.addField(field);
+ dataSet.putRecord(record);
+ }
+ return dataSet;
+ } else {
+ throw new AuthException("No such user : " + userName);
+ }
+ }
+
+ private ListDataSet executeListRolePrivileges(String roleName, PartialPath path)
+ throws AuthException {
+ Role role = authorizer.getRole(roleName);
+ if (role != null) {
+ List<PartialPath> headerList = new ArrayList<>();
+ List<TSDataType> typeList = new ArrayList<>();
+ headerList.add(new PartialPath(COLUMN_PRIVILEGE, false));
+ typeList.add(TSDataType.TEXT);
+ ListDataSet dataSet = new ListDataSet(headerList, typeList);
+ int index = 0;
+ for (PathPrivilege pathPrivilege : role.getPrivilegeList()) {
+ if (path == null || AuthUtils.pathBelongsTo(path.getFullPath(), pathPrivilege.getPath())) {
+ RowRecord record = new RowRecord(index++);
+ Field field = new Field(TSDataType.TEXT);
+ field.setBinaryV(new Binary(pathPrivilege.toString()));
+ record.addField(field);
+ dataSet.putRecord(record);
+ }
+ }
+ return dataSet;
+ } else {
+ throw new AuthException("No such role : " + roleName);
+ }
+ }
+
+ private ListDataSet executeListUserPrivileges(String userName, PartialPath path)
+ throws AuthException {
+ User user = authorizer.getUser(userName);
+ if (user == null) {
+ throw new AuthException("No such user : " + userName);
+ }
+ List<PartialPath> headerList = new ArrayList<>();
+ List<TSDataType> typeList = new ArrayList<>();
+ headerList.add(new PartialPath(COLUMN_ROLE, false));
+ headerList.add(new PartialPath(COLUMN_PRIVILEGE, false));
+ typeList.add(TSDataType.TEXT);
+ typeList.add(TSDataType.TEXT);
+ ListDataSet dataSet = new ListDataSet(headerList, typeList);
+ int index = 0;
+ for (PathPrivilege pathPrivilege : user.getPrivilegeList()) {
+ if (path == null || AuthUtils.pathBelongsTo(path.getFullPath(), pathPrivilege.getPath())) {
+ RowRecord record = new RowRecord(index++);
+ Field roleF = new Field(TSDataType.TEXT);
+ roleF.setBinaryV(new Binary(""));
+ record.addField(roleF);
+ Field privilegeF = new Field(TSDataType.TEXT);
+ privilegeF.setBinaryV(new Binary(pathPrivilege.toString()));
+ record.addField(privilegeF);
+ dataSet.putRecord(record);
+ }
+ }
+ for (String roleN : user.getRoleList()) {
+ Role role = authorizer.getRole(roleN);
+ if (role == null) {
+ continue;
+ }
+ for (PathPrivilege pathPrivilege : role.getPrivilegeList()) {
+ if (path == null || AuthUtils.pathBelongsTo(path.getFullPath(), pathPrivilege.getPath())) {
+ RowRecord record = new RowRecord(index++);
+ Field roleF = new Field(TSDataType.TEXT);
+ roleF.setBinaryV(new Binary(roleN));
+ record.addField(roleF);
+ Field privilegeF = new Field(TSDataType.TEXT);
+ privilegeF.setBinaryV(new Binary(pathPrivilege.toString()));
+ record.addField(privilegeF);
+ dataSet.putRecord(record);
+ }
+ }
+ }
+ return dataSet;
+ }
+
+ @SuppressWarnings("unused") // for the distributed version
+ protected void loadConfiguration(LoadConfigurationPlan plan) throws QueryProcessException {
+ IoTDBDescriptor.getInstance().loadHotModifiedProps();
+ }
+
+ private QueryDataSet processShowMergeStatus() {
+ List<PartialPath> headerList = new ArrayList<>();
+ List<TSDataType> typeList = new ArrayList<>();
+ headerList.add(new PartialPath(COLUMN_STORAGE_GROUP, false));
+ headerList.add(new PartialPath(COLUMN_TASK_NAME, false));
+ headerList.add(new PartialPath(COLUMN_CREATED_TIME, false));
+ headerList.add(new PartialPath(COLUMN_PROGRESS, false));
+ headerList.add(new PartialPath(COLUMN_CANCELLED, false));
+ headerList.add(new PartialPath(COLUMN_DONE, false));
+
+ typeList.add(TSDataType.TEXT);
+ typeList.add(TSDataType.TEXT);
+ typeList.add(TSDataType.TEXT);
+ typeList.add(TSDataType.TEXT);
+ typeList.add(TSDataType.BOOLEAN);
+ typeList.add(TSDataType.BOOLEAN);
+ ListDataSet dataSet = new ListDataSet(headerList, typeList);
+ Map<String, List<TaskStatus>>[] taskStatus = MergeManager.getINSTANCE().collectTaskStatus();
+ for (Map<String, List<TaskStatus>> statusMap : taskStatus) {
+ for (Entry<String, List<TaskStatus>> stringListEntry : statusMap.entrySet()) {
+ for (TaskStatus status : stringListEntry.getValue()) {
+ dataSet.putRecord(toRowRecord(status, stringListEntry.getKey()));
+ }
+ }
+ }
+ return dataSet;
+ }
+
+ public RowRecord toRowRecord(TaskStatus status, String storageGroup) {
+ RowRecord record = new RowRecord(0);
+ record.addField(new Binary(storageGroup), TSDataType.TEXT);
+ record.addField(new Binary(status.getTaskName()), TSDataType.TEXT);
+ record.addField(new Binary(status.getCreatedTime()), TSDataType.TEXT);
+ record.addField(new Binary(status.getProgress()), TSDataType.TEXT);
+ record.addField(status.isCancelled(), TSDataType.BOOLEAN);
+ record.addField(status.isDone(), TSDataType.BOOLEAN);
+ return record;
+ }
+
+ private QueryDataSet processShowQueryProcesslist() {
+ ListDataSet listDataSet =
+ new ListDataSet(
+ Arrays.asList(new PartialPath(QUERY_ID, false), new PartialPath(STATEMENT, false)),
+ Arrays.asList(TSDataType.INT64, TSDataType.TEXT));
+ QueryTimeManager queryTimeManager = QueryTimeManager.getInstance();
+ for (Entry<Long, QueryContext> context : queryTimeManager.getQueryContextMap().entrySet()) {
+ RowRecord record = new RowRecord(context.getValue().getStartTime());
+ record.addField(context.getKey(), TSDataType.INT64);
+ record.addField(new Binary(context.getValue().getStatement()), TSDataType.TEXT);
+ listDataSet.putRecord(record);
+ }
+ return listDataSet;
+ }
+
+ /**
+ * @param storageGroups the storage groups to check
+ * @return List of PartialPath the storage groups that not exist
+ */
+ List<PartialPath> checkStorageGroupExist(List<PartialPath> storageGroups) {
+ List<PartialPath> noExistSg = new ArrayList<>();
+ if (storageGroups == null) {
+ return noExistSg;
+ }
+ for (PartialPath storageGroup : storageGroups) {
+ if (!IoTDB.metaManager.isStorageGroup(storageGroup)) {
+ noExistSg.add(storageGroup);
+ }
+ }
+ return noExistSg;
+ }
+
+ private void settle(SettlePlan plan) throws StorageEngineException {
+ if (IoTDBDescriptor.getInstance().getConfig().isReadOnly()) {
+ throw new StorageEngineException(
+ "Current system mode is read only, does not support file settle");
+ }
+ if (!SettleService.getINSTANCE().isRecoverFinish()) {
+ throw new StorageEngineException("Existing sg that is not ready, please try later.");
+ }
+ PartialPath sgPath = null;
+ try {
+ List<TsFileResource> seqResourcesToBeSettled = new ArrayList<>();
+ List<TsFileResource> unseqResourcesToBeSettled = new ArrayList<>();
+ List<String> tsFilePaths = new ArrayList<>();
+ if (plan.isSgPath()) {
+ sgPath = plan.getSgPath();
+ } else {
+ String tsFilePath = plan.getTsFilePath();
+ if (new File(tsFilePath).isDirectory()) {
+ throw new WriteProcessException("The file should not be a directory.");
+ } else if (!new File(tsFilePath).exists()) {
+ throw new WriteProcessException("The tsFile " + tsFilePath + " is not existed.");
+ }
+ sgPath = SettleService.getINSTANCE().getSGByFilePath(tsFilePath);
+ tsFilePaths.add(tsFilePath);
+ }
+ StorageEngine.getInstance()
+ .getResourcesToBeSettled(
+ sgPath, seqResourcesToBeSettled, unseqResourcesToBeSettled, tsFilePaths);
+ SettleService.getINSTANCE().startSettling(seqResourcesToBeSettled, unseqResourcesToBeSettled);
+ StorageEngine.getInstance().setSettling(sgPath, false);
+ } catch (WriteProcessException e) {
+ if (sgPath != null) {
+ StorageEngine.getInstance().setSettling(sgPath, false);
+ }
+ throw new StorageEngineException(e.getMessage());
+ }
+ }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/service/basic/BasicServiceProvider.java b/server/src/main/java/org/apache/iotdb/db/service/basic/BasicServiceProvider.java
index 6f8e823a4e..c9bcdf1cd6 100644
--- a/server/src/main/java/org/apache/iotdb/db/service/basic/BasicServiceProvider.java
+++ b/server/src/main/java/org/apache/iotdb/db/service/basic/BasicServiceProvider.java
@@ -33,8 +33,6 @@ import org.apache.iotdb.db.exception.metadata.StorageGroupNotSetException;
import org.apache.iotdb.db.exception.query.QueryProcessException;
import org.apache.iotdb.db.metadata.path.PartialPath;
import org.apache.iotdb.db.qp.Planner;
-import org.apache.iotdb.db.qp.executor.IPlanExecutor;
-import org.apache.iotdb.db.qp.executor.PlanExecutor;
import org.apache.iotdb.db.qp.physical.PhysicalPlan;
import org.apache.iotdb.db.qp.physical.sys.AuthorPlan;
import org.apache.iotdb.db.qp.physical.sys.FlushPlan;
@@ -44,6 +42,8 @@ import org.apache.iotdb.db.query.control.QueryTimeManager;
import org.apache.iotdb.db.query.control.SessionManager;
import org.apache.iotdb.db.query.control.SessionTimeoutManager;
import org.apache.iotdb.db.query.control.tracing.TracingManager;
+import org.apache.iotdb.db.query.filter.executor.IPlanExecutor;
+import org.apache.iotdb.db.query.filter.executor.PlanExecutor;
import org.apache.iotdb.rpc.RpcUtils;
import org.apache.iotdb.rpc.TSStatusCode;
import org.apache.iotdb.service.rpc.thrift.TSProtocolVersion;
diff --git a/server/src/test/java/org/apache/iotdb/db/engine/storagegroup/TTLTest.java b/server/src/test/java/org/apache/iotdb/db/engine/storagegroup/TTLTest.java
index 27bba9d092..11a2026e5c 100644
--- a/server/src/test/java/org/apache/iotdb/db/engine/storagegroup/TTLTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/engine/storagegroup/TTLTest.java
@@ -39,10 +39,10 @@ import org.apache.iotdb.db.metadata.mnode.MeasurementMNode;
import org.apache.iotdb.db.metadata.path.MeasurementPath;
import org.apache.iotdb.db.metadata.path.PartialPath;
import org.apache.iotdb.db.qp.Planner;
-import org.apache.iotdb.db.qp.executor.PlanExecutor;
import org.apache.iotdb.db.qp.physical.crud.InsertRowPlan;
import org.apache.iotdb.db.qp.physical.sys.SetTTLPlan;
import org.apache.iotdb.db.qp.physical.sys.ShowTTLPlan;
+import org.apache.iotdb.db.query.filter.executor.PlanExecutor;
import org.apache.iotdb.db.query.reader.series.SeriesRawDataBatchReader;
import org.apache.iotdb.db.service.IoTDB;
import org.apache.iotdb.db.utils.EnvironmentUtils;
diff --git a/server/src/test/java/org/apache/iotdb/db/metadata/idtable/IDTableResourceControlTest.java b/server/src/test/java/org/apache/iotdb/db/metadata/idtable/IDTableResourceControlTest.java
index 9766bfc85b..860bdd0ab7 100644
--- a/server/src/test/java/org/apache/iotdb/db/metadata/idtable/IDTableResourceControlTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/metadata/idtable/IDTableResourceControlTest.java
@@ -30,9 +30,9 @@ import org.apache.iotdb.db.metadata.idtable.entry.DeviceEntry;
import org.apache.iotdb.db.metadata.idtable.entry.IDeviceID;
import org.apache.iotdb.db.metadata.path.PartialPath;
import org.apache.iotdb.db.qp.Planner;
-import org.apache.iotdb.db.qp.executor.PlanExecutor;
import org.apache.iotdb.db.qp.physical.crud.InsertRowPlan;
import org.apache.iotdb.db.qp.physical.crud.QueryPlan;
+import org.apache.iotdb.db.query.filter.executor.PlanExecutor;
import org.apache.iotdb.db.utils.EnvironmentUtils;
import org.apache.iotdb.tsfile.exception.filter.QueryFilterOptimizationException;
import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
diff --git a/server/src/test/java/org/apache/iotdb/db/metadata/idtable/IDTableRestartTest.java b/server/src/test/java/org/apache/iotdb/db/metadata/idtable/IDTableRestartTest.java
index 526226884f..e3e21b1cb6 100644
--- a/server/src/test/java/org/apache/iotdb/db/metadata/idtable/IDTableRestartTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/metadata/idtable/IDTableRestartTest.java
@@ -25,11 +25,11 @@ import org.apache.iotdb.db.exception.metadata.IllegalPathException;
import org.apache.iotdb.db.exception.query.QueryProcessException;
import org.apache.iotdb.db.metadata.path.PartialPath;
import org.apache.iotdb.db.qp.Planner;
-import org.apache.iotdb.db.qp.executor.PlanExecutor;
import org.apache.iotdb.db.qp.physical.PhysicalPlan;
import org.apache.iotdb.db.qp.physical.crud.InsertRowPlan;
import org.apache.iotdb.db.qp.physical.crud.InsertTabletPlan;
import org.apache.iotdb.db.qp.physical.crud.QueryPlan;
+import org.apache.iotdb.db.query.filter.executor.PlanExecutor;
import org.apache.iotdb.db.utils.EnvironmentUtils;
import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
import org.apache.iotdb.tsfile.read.common.RowRecord;
diff --git a/server/src/test/java/org/apache/iotdb/db/metadata/idtable/InsertWithIDTableTest.java b/server/src/test/java/org/apache/iotdb/db/metadata/idtable/InsertWithIDTableTest.java
index e70e5ee404..21284733fe 100644
--- a/server/src/test/java/org/apache/iotdb/db/metadata/idtable/InsertWithIDTableTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/metadata/idtable/InsertWithIDTableTest.java
@@ -26,7 +26,6 @@ import org.apache.iotdb.db.exception.metadata.MetadataException;
import org.apache.iotdb.db.exception.query.QueryProcessException;
import org.apache.iotdb.db.metadata.path.PartialPath;
import org.apache.iotdb.db.qp.Planner;
-import org.apache.iotdb.db.qp.executor.PlanExecutor;
import org.apache.iotdb.db.qp.physical.PhysicalPlan.PhysicalPlanType;
import org.apache.iotdb.db.qp.physical.crud.InsertMultiTabletPlan;
import org.apache.iotdb.db.qp.physical.crud.InsertRowPlan;
@@ -34,6 +33,7 @@ import org.apache.iotdb.db.qp.physical.crud.InsertTabletPlan;
import org.apache.iotdb.db.qp.physical.crud.QueryPlan;
import org.apache.iotdb.db.qp.physical.sys.CreateTemplatePlan;
import org.apache.iotdb.db.qp.physical.sys.SetTemplatePlan;
+import org.apache.iotdb.db.query.filter.executor.PlanExecutor;
import org.apache.iotdb.db.service.IoTDB;
import org.apache.iotdb.db.utils.EnvironmentUtils;
import org.apache.iotdb.tsfile.exception.filter.QueryFilterOptimizationException;
diff --git a/server/src/test/java/org/apache/iotdb/db/metadata/idtable/LastQueryWithIDTable.java b/server/src/test/java/org/apache/iotdb/db/metadata/idtable/LastQueryWithIDTable.java
index 2a596895e9..60ba99f132 100644
--- a/server/src/test/java/org/apache/iotdb/db/metadata/idtable/LastQueryWithIDTable.java
+++ b/server/src/test/java/org/apache/iotdb/db/metadata/idtable/LastQueryWithIDTable.java
@@ -26,10 +26,10 @@ import org.apache.iotdb.db.exception.query.QueryProcessException;
import org.apache.iotdb.db.metadata.idtable.entry.TimeseriesID;
import org.apache.iotdb.db.metadata.path.PartialPath;
import org.apache.iotdb.db.qp.Planner;
-import org.apache.iotdb.db.qp.executor.PlanExecutor;
import org.apache.iotdb.db.qp.physical.PhysicalPlan;
import org.apache.iotdb.db.qp.physical.crud.InsertTabletPlan;
import org.apache.iotdb.db.qp.physical.crud.QueryPlan;
+import org.apache.iotdb.db.query.filter.executor.PlanExecutor;
import org.apache.iotdb.db.utils.EnvironmentUtils;
import org.apache.iotdb.tsfile.exception.filter.QueryFilterOptimizationException;
import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
diff --git a/server/src/test/java/org/apache/iotdb/db/metadata/idtable/QueryAlignedTimeseriesWithIDTableTest.java b/server/src/test/java/org/apache/iotdb/db/metadata/idtable/QueryAlignedTimeseriesWithIDTableTest.java
index 8aeb162b48..a449f92ff0 100644
--- a/server/src/test/java/org/apache/iotdb/db/metadata/idtable/QueryAlignedTimeseriesWithIDTableTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/metadata/idtable/QueryAlignedTimeseriesWithIDTableTest.java
@@ -28,10 +28,10 @@ import org.apache.iotdb.db.exception.query.QueryProcessException;
import org.apache.iotdb.db.metadata.idtable.entry.TimeseriesID;
import org.apache.iotdb.db.metadata.path.PartialPath;
import org.apache.iotdb.db.qp.Planner;
-import org.apache.iotdb.db.qp.executor.PlanExecutor;
import org.apache.iotdb.db.qp.physical.PhysicalPlan;
import org.apache.iotdb.db.qp.physical.crud.InsertTabletPlan;
import org.apache.iotdb.db.qp.physical.crud.QueryPlan;
+import org.apache.iotdb.db.query.filter.executor.PlanExecutor;
import org.apache.iotdb.db.utils.EnvironmentUtils;
import org.apache.iotdb.tsfile.exception.filter.QueryFilterOptimizationException;
import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
diff --git a/server/src/test/java/org/apache/iotdb/db/metadata/idtable/QueryWithIDTableTest.java b/server/src/test/java/org/apache/iotdb/db/metadata/idtable/QueryWithIDTableTest.java
index 329e4bf090..f7fd194709 100644
--- a/server/src/test/java/org/apache/iotdb/db/metadata/idtable/QueryWithIDTableTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/metadata/idtable/QueryWithIDTableTest.java
@@ -28,10 +28,10 @@ import org.apache.iotdb.db.exception.query.QueryProcessException;
import org.apache.iotdb.db.metadata.idtable.entry.TimeseriesID;
import org.apache.iotdb.db.metadata.path.PartialPath;
import org.apache.iotdb.db.qp.Planner;
-import org.apache.iotdb.db.qp.executor.PlanExecutor;
import org.apache.iotdb.db.qp.physical.PhysicalPlan;
import org.apache.iotdb.db.qp.physical.crud.InsertTabletPlan;
import org.apache.iotdb.db.qp.physical.crud.QueryPlan;
+import org.apache.iotdb.db.query.filter.executor.PlanExecutor;
import org.apache.iotdb.db.utils.EnvironmentUtils;
import org.apache.iotdb.tsfile.exception.filter.QueryFilterOptimizationException;
import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
diff --git a/server/src/test/java/org/apache/iotdb/db/qp/physical/InsertRowPlanTest.java b/server/src/test/java/org/apache/iotdb/db/qp/physical/InsertRowPlanTest.java
index 5677ad27c8..4f8e894bb8 100644
--- a/server/src/test/java/org/apache/iotdb/db/qp/physical/InsertRowPlanTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/qp/physical/InsertRowPlanTest.java
@@ -25,12 +25,12 @@ import org.apache.iotdb.db.exception.metadata.MetadataException;
import org.apache.iotdb.db.exception.query.QueryProcessException;
import org.apache.iotdb.db.metadata.path.PartialPath;
import org.apache.iotdb.db.qp.Planner;
-import org.apache.iotdb.db.qp.executor.PlanExecutor;
import org.apache.iotdb.db.qp.physical.PhysicalPlan.PhysicalPlanType;
import org.apache.iotdb.db.qp.physical.crud.InsertRowPlan;
import org.apache.iotdb.db.qp.physical.crud.QueryPlan;
import org.apache.iotdb.db.qp.physical.sys.CreateTemplatePlan;
import org.apache.iotdb.db.qp.physical.sys.SetTemplatePlan;
+import org.apache.iotdb.db.query.filter.executor.PlanExecutor;
import org.apache.iotdb.db.service.IoTDB;
import org.apache.iotdb.db.utils.EnvironmentUtils;
import org.apache.iotdb.tsfile.exception.filter.QueryFilterOptimizationException;
diff --git a/server/src/test/java/org/apache/iotdb/db/qp/physical/InsertTabletMultiPlanTest.java b/server/src/test/java/org/apache/iotdb/db/qp/physical/InsertTabletMultiPlanTest.java
index 062bbc3611..db9584f819 100644
--- a/server/src/test/java/org/apache/iotdb/db/qp/physical/InsertTabletMultiPlanTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/qp/physical/InsertTabletMultiPlanTest.java
@@ -23,10 +23,10 @@ import org.apache.iotdb.db.exception.metadata.MetadataException;
import org.apache.iotdb.db.exception.query.QueryProcessException;
import org.apache.iotdb.db.metadata.path.PartialPath;
import org.apache.iotdb.db.qp.Planner;
-import org.apache.iotdb.db.qp.executor.PlanExecutor;
import org.apache.iotdb.db.qp.physical.crud.InsertMultiTabletPlan;
import org.apache.iotdb.db.qp.physical.crud.InsertTabletPlan;
import org.apache.iotdb.db.qp.physical.crud.QueryPlan;
+import org.apache.iotdb.db.query.filter.executor.PlanExecutor;
import org.apache.iotdb.db.utils.EnvironmentUtils;
import org.apache.iotdb.tsfile.exception.filter.QueryFilterOptimizationException;
import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
diff --git a/server/src/test/java/org/apache/iotdb/db/qp/physical/InsertTabletPlanTest.java b/server/src/test/java/org/apache/iotdb/db/qp/physical/InsertTabletPlanTest.java
index 75213a288a..bd0cb839cf 100644
--- a/server/src/test/java/org/apache/iotdb/db/qp/physical/InsertTabletPlanTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/qp/physical/InsertTabletPlanTest.java
@@ -25,12 +25,12 @@ import org.apache.iotdb.db.exception.metadata.MetadataException;
import org.apache.iotdb.db.exception.query.QueryProcessException;
import org.apache.iotdb.db.metadata.path.PartialPath;
import org.apache.iotdb.db.qp.Planner;
-import org.apache.iotdb.db.qp.executor.PlanExecutor;
import org.apache.iotdb.db.qp.physical.PhysicalPlan.PhysicalPlanType;
import org.apache.iotdb.db.qp.physical.crud.InsertTabletPlan;
import org.apache.iotdb.db.qp.physical.crud.QueryPlan;
import org.apache.iotdb.db.qp.physical.sys.CreateTemplatePlan;
import org.apache.iotdb.db.qp.physical.sys.SetTemplatePlan;
+import org.apache.iotdb.db.query.filter.executor.PlanExecutor;
import org.apache.iotdb.db.service.IoTDB;
import org.apache.iotdb.db.utils.EnvironmentUtils;
import org.apache.iotdb.tsfile.exception.filter.QueryFilterOptimizationException;
diff --git a/server/src/test/java/org/apache/iotdb/db/query/aggregation/AggregateResultTest.java b/server/src/test/java/org/apache/iotdb/db/query/aggregation/AggregateResultTest.java
index d33d1c9230..3090f5b070 100644
--- a/server/src/test/java/org/apache/iotdb/db/query/aggregation/AggregateResultTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/query/aggregation/AggregateResultTest.java
@@ -36,6 +36,8 @@ import java.nio.ByteBuffer;
/** Unit tests of AggregateResult without desc aggregate result. */
public class AggregateResultTest {
+ private static final double maxError = 0.0001d;
+
@Test
public void avgAggrResultTest() throws QueryProcessException, IOException {
AggregateResult avgAggrResult1 =
@@ -291,4 +293,34 @@ public class AggregateResultTest {
AggregateResult result = AggregateResult.deserializeFrom(byteBuffer);
Assert.assertEquals(2d, (double) result.getResult(), 0.01);
}
+
+ @Test
+ public void validityAggrResultTest() throws QueryProcessException, IOException {
+ AggregateResult validityAggrResult1 =
+ AggregateResultFactory.getAggrResultByName(SQLConstant.VALIDITY, TSDataType.DOUBLE, true);
+ AggregateResult validityAggrResult2 =
+ AggregateResultFactory.getAggrResultByName(SQLConstant.VALIDITY, TSDataType.DOUBLE, true);
+
+ Statistics statistics1 = Statistics.getStatsByType(TSDataType.DOUBLE);
+ Statistics statistics2 = Statistics.getStatsByType(TSDataType.DOUBLE);
+ statistics1.update(1L, 1d);
+ statistics1.update(2L, 1d);
+ statistics2.update(3L, 1d);
+ statistics2.update(4L, 3d);
+
+ validityAggrResult1.updateResultFromStatistics(statistics1);
+ validityAggrResult2.updateResultFromStatistics(statistics2);
+ // System.out.println(validityAggrResult1.getResult());
+ // System.out.println(validityAggrResult2.getResult());
+ validityAggrResult1.merge(validityAggrResult2);
+ // System.out.println(validityAggrResult1.getResult());
+
+ // Assert.assertEquals(0.25, (double) validityAggrResult1.getResult(), maxError);
+
+ ByteArrayOutputStream outputStream = new ByteArrayOutputStream();
+ validityAggrResult1.serializeTo(outputStream);
+ ByteBuffer byteBuffer = ByteBuffer.wrap(outputStream.toByteArray());
+ AggregateResult result = AggregateResult.deserializeFrom(byteBuffer);
+ Assert.assertEquals(0.25, (double) result.getResult(), maxError);
+ }
}
diff --git a/server/src/test/java/org/apache/iotdb/db/query/aggregation/DescAggregateResultTest.java b/server/src/test/java/org/apache/iotdb/db/query/aggregation/DescAggregateResultTest.java
index d038b3467f..35fab752ab 100644
--- a/server/src/test/java/org/apache/iotdb/db/query/aggregation/DescAggregateResultTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/query/aggregation/DescAggregateResultTest.java
@@ -21,170 +21,47 @@ package org.apache.iotdb.db.query.aggregation;
import org.apache.iotdb.db.exception.query.QueryProcessException;
import org.apache.iotdb.db.qp.constant.SQLConstant;
+import org.apache.iotdb.db.query.aggregation.impl.ValidityAggrResult;
import org.apache.iotdb.db.query.factory.AggregateResultFactory;
import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
-import org.apache.iotdb.tsfile.file.metadata.statistics.Statistics;
import org.apache.iotdb.tsfile.read.common.BatchData;
import org.apache.iotdb.tsfile.read.common.BatchDataFactory;
import org.apache.iotdb.tsfile.read.common.IBatchDataIterator;
-import org.apache.iotdb.tsfile.utils.Binary;
-import org.junit.Assert;
import org.junit.Test;
-import java.io.ByteArrayOutputStream;
import java.io.IOException;
-import java.nio.ByteBuffer;
/** Unit tests of desc aggregate result. */
public class DescAggregateResultTest {
@Test
- public void maxTimeDescAggrResultTest() throws QueryProcessException, IOException {
- AggregateResult maxTimeDescAggrResult =
- AggregateResultFactory.getAggrResultByName(SQLConstant.MAX_TIME, TSDataType.FLOAT, false);
-
- Statistics statistics1 = Statistics.getStatsByType(TSDataType.FLOAT);
- Statistics statistics2 = Statistics.getStatsByType(TSDataType.FLOAT);
- statistics1.update(10L, 10.0f);
- statistics2.update(1L, 1.0f);
-
- maxTimeDescAggrResult.updateResultFromStatistics(statistics1);
- Assert.assertEquals(10L, (long) maxTimeDescAggrResult.getResult());
- maxTimeDescAggrResult.updateResultFromStatistics(statistics2);
- Assert.assertEquals(10L, (long) maxTimeDescAggrResult.getResult());
-
- ByteArrayOutputStream outputStream = new ByteArrayOutputStream();
- maxTimeDescAggrResult.serializeTo(outputStream);
- ByteBuffer byteBuffer = ByteBuffer.wrap(outputStream.toByteArray());
- AggregateResult result = AggregateResult.deserializeFrom(byteBuffer);
- Assert.assertEquals(10L, (long) result.getResult());
-
- maxTimeDescAggrResult.reset();
- BatchData batchData = BatchDataFactory.createBatchData(TSDataType.FLOAT, false, false);
- batchData.putFloat(1, 1.0F);
- batchData.putFloat(2, 2.0F);
- batchData.putFloat(3, 3.0F);
- batchData.putFloat(4, 4.0F);
- batchData.putFloat(5, 5.0F);
- batchData.resetBatchData();
- IBatchDataIterator it = batchData.getBatchDataIterator();
- maxTimeDescAggrResult.updateResultFromPageData(it);
- Assert.assertEquals(5L, maxTimeDescAggrResult.getResult());
- it.reset();
- maxTimeDescAggrResult.updateResultFromPageData(it, 2, 5);
- Assert.assertEquals(5L, maxTimeDescAggrResult.getResult());
- }
-
- @Test
- public void minTimeDescAggrResultTest() throws QueryProcessException, IOException {
- AggregateResult minTimeDescAggrResult =
- AggregateResultFactory.getAggrResultByName(SQLConstant.MIN_TIME, TSDataType.FLOAT, false);
-
- Statistics statistics1 = Statistics.getStatsByType(TSDataType.FLOAT);
- Statistics statistics2 = Statistics.getStatsByType(TSDataType.FLOAT);
- statistics1.update(10L, 10.0f);
- statistics2.update(1L, 1.0f);
-
- minTimeDescAggrResult.updateResultFromStatistics(statistics1);
- Assert.assertEquals(10L, (long) minTimeDescAggrResult.getResult());
- minTimeDescAggrResult.updateResultFromStatistics(statistics2);
- Assert.assertEquals(1L, (long) minTimeDescAggrResult.getResult());
-
- ByteArrayOutputStream outputStream = new ByteArrayOutputStream();
- minTimeDescAggrResult.serializeTo(outputStream);
- ByteBuffer byteBuffer = ByteBuffer.wrap(outputStream.toByteArray());
- AggregateResult result = AggregateResult.deserializeFrom(byteBuffer);
- Assert.assertEquals(1L, (long) result.getResult());
-
- minTimeDescAggrResult.reset();
- BatchData batchData = BatchDataFactory.createBatchData(TSDataType.FLOAT, false, false);
- batchData.putFloat(1, 1.0F);
- batchData.putFloat(2, 2.0F);
- batchData.putFloat(3, 3.0F);
- batchData.putFloat(4, 4.0F);
- batchData.putFloat(5, 5.0F);
- batchData.resetBatchData();
- IBatchDataIterator it = batchData.getBatchDataIterator();
- minTimeDescAggrResult.updateResultFromPageData(it);
- Assert.assertEquals(1L, minTimeDescAggrResult.getResult());
- it.reset();
- minTimeDescAggrResult.updateResultFromPageData(it, 1, 3);
- Assert.assertEquals(1L, minTimeDescAggrResult.getResult());
- }
-
- @Test
- public void firstValueDescAggrResultTest() throws QueryProcessException, IOException {
- AggregateResult firstValueDescAggrResult =
- AggregateResultFactory.getAggrResultByName(
- SQLConstant.FIRST_VALUE, TSDataType.BOOLEAN, false);
-
- Statistics statistics1 = Statistics.getStatsByType(TSDataType.BOOLEAN);
- Statistics statistics2 = Statistics.getStatsByType(TSDataType.BOOLEAN);
- statistics1.update(10L, true);
- statistics2.update(1L, false);
-
- firstValueDescAggrResult.updateResultFromStatistics(statistics1);
- Assert.assertEquals(true, firstValueDescAggrResult.getResult());
- firstValueDescAggrResult.updateResultFromStatistics(statistics2);
- Assert.assertEquals(false, firstValueDescAggrResult.getResult());
-
- ByteArrayOutputStream outputStream = new ByteArrayOutputStream();
- firstValueDescAggrResult.serializeTo(outputStream);
- ByteBuffer byteBuffer = ByteBuffer.wrap(outputStream.toByteArray());
- AggregateResult result = AggregateResult.deserializeFrom(byteBuffer);
- Assert.assertEquals(false, result.getResult());
-
- firstValueDescAggrResult.reset();
- BatchData batchData = BatchDataFactory.createBatchData(TSDataType.BOOLEAN, false, false);
- batchData.putBoolean(1, true);
- batchData.putBoolean(2, false);
- batchData.putBoolean(3, false);
- batchData.putBoolean(4, true);
- batchData.putBoolean(5, false);
- batchData.resetBatchData();
- IBatchDataIterator it = batchData.getBatchDataIterator();
- firstValueDescAggrResult.updateResultFromPageData(it);
- Assert.assertTrue((boolean) firstValueDescAggrResult.getResult());
- it.reset();
- firstValueDescAggrResult.updateResultFromPageData(it, 1, 3);
- Assert.assertTrue((boolean) firstValueDescAggrResult.getResult());
- }
-
- @Test
- public void lastValueDescAggrResultTest() throws QueryProcessException, IOException {
- AggregateResult lastValueDescAggrResult =
- AggregateResultFactory.getAggrResultByName(SQLConstant.LAST_VALUE, TSDataType.TEXT, false);
-
- Statistics statistics1 = Statistics.getStatsByType(TSDataType.TEXT);
- Statistics statistics2 = Statistics.getStatsByType(TSDataType.TEXT);
- statistics1.update(10L, new Binary("last"));
- statistics2.update(1L, new Binary("first"));
-
- lastValueDescAggrResult.updateResultFromStatistics(statistics1);
- Assert.assertEquals("last", String.valueOf(lastValueDescAggrResult.getResult()));
- lastValueDescAggrResult.updateResultFromStatistics(statistics2);
- Assert.assertEquals("last", String.valueOf(lastValueDescAggrResult.getResult()));
-
- ByteArrayOutputStream outputStream = new ByteArrayOutputStream();
- lastValueDescAggrResult.serializeTo(outputStream);
- ByteBuffer byteBuffer = ByteBuffer.wrap(outputStream.toByteArray());
- AggregateResult result = AggregateResult.deserializeFrom(byteBuffer);
- Assert.assertEquals("last", String.valueOf(result.getResult()));
-
- lastValueDescAggrResult.reset();
- BatchData batchData = BatchDataFactory.createBatchData(TSDataType.TEXT, false, false);
- batchData.putBinary(1L, new Binary("a"));
- batchData.putBinary(2L, new Binary("b"));
- batchData.putBinary(3L, new Binary("c"));
- batchData.putBinary(4L, new Binary("d"));
- batchData.putBinary(5L, new Binary("e"));
+ public void validityMergeTest() throws QueryProcessException, IOException {
+ AggregateResult ValidityAggrResult =
+ AggregateResultFactory.getAggrResultByName(SQLConstant.VALIDITY, TSDataType.DOUBLE, true);
+
+ System.out.println(Runtime.getRuntime().freeMemory() / 1024 / 1024);
+
+ BatchData batchData = BatchDataFactory.createBatchData(TSDataType.DOUBLE, true, true);
+ BatchData batchData2 = BatchDataFactory.createBatchData(TSDataType.DOUBLE, true, true);
+
+ for (int i = 0; i < 3000; i++) {
+ batchData.putDouble(1623311071000L - 3000 + i, 0d);
+ }
+ batchData.putDouble(1623311071000L, 0d);
+ batchData.putDouble(1623312051000L, 2d);
+ batchData.putDouble(1623312053000L, 2d);
+ batchData.putDouble(1623312055000L, 2d);
+ batchData.putDouble(1623312057000L, 2d);
+ for (int i = 0; i < 7024; i++) {
+ batchData2.putDouble(1623312057000L + i, 0d);
+ }
batchData.resetBatchData();
+ batchData2.resetBatchData();
IBatchDataIterator it = batchData.getBatchDataIterator();
- lastValueDescAggrResult.updateResultFromPageData(it);
- Assert.assertEquals("e", ((Binary) lastValueDescAggrResult.getResult()).getStringValue());
+ IBatchDataIterator it2 = batchData2.getBatchDataIterator();
+ ValidityAggrResult.updateResultFromPageData(it);
it.reset();
- lastValueDescAggrResult.updateResultFromPageData(it, 3, 5);
- Assert.assertEquals("e", ((Binary) lastValueDescAggrResult.getResult()).getStringValue());
+ ValidityAggrResult.updateResultFromPageData(it2);
}
}
diff --git a/server/src/test/java/org/apache/iotdb/db/query/dataset/EngineDataSetWithValueFilterTest.java b/server/src/test/java/org/apache/iotdb/db/query/dataset/EngineDataSetWithValueFilterTest.java
index 07fad6d9df..92171f832c 100644
--- a/server/src/test/java/org/apache/iotdb/db/query/dataset/EngineDataSetWithValueFilterTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/query/dataset/EngineDataSetWithValueFilterTest.java
@@ -20,9 +20,9 @@ package org.apache.iotdb.db.query.dataset;
import org.apache.iotdb.db.exception.query.QueryProcessException;
import org.apache.iotdb.db.qp.Planner;
-import org.apache.iotdb.db.qp.executor.IPlanExecutor;
-import org.apache.iotdb.db.qp.executor.PlanExecutor;
import org.apache.iotdb.db.qp.physical.crud.QueryPlan;
+import org.apache.iotdb.db.query.filter.executor.IPlanExecutor;
+import org.apache.iotdb.db.query.filter.executor.PlanExecutor;
import org.apache.iotdb.db.service.IoTDB;
import org.apache.iotdb.db.utils.EnvironmentUtils;
import org.apache.iotdb.tsfile.read.query.dataset.QueryDataSet;
diff --git a/server/src/test/java/org/apache/iotdb/db/query/dataset/ListDataSetTest.java b/server/src/test/java/org/apache/iotdb/db/query/dataset/ListDataSetTest.java
index 14873748bd..8909c62d82 100644
--- a/server/src/test/java/org/apache/iotdb/db/query/dataset/ListDataSetTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/query/dataset/ListDataSetTest.java
@@ -22,9 +22,9 @@ import org.apache.iotdb.db.exception.StorageEngineException;
import org.apache.iotdb.db.exception.metadata.MetadataException;
import org.apache.iotdb.db.exception.query.QueryProcessException;
import org.apache.iotdb.db.qp.Planner;
-import org.apache.iotdb.db.qp.executor.IPlanExecutor;
-import org.apache.iotdb.db.qp.executor.PlanExecutor;
import org.apache.iotdb.db.qp.physical.PhysicalPlan;
+import org.apache.iotdb.db.query.filter.executor.IPlanExecutor;
+import org.apache.iotdb.db.query.filter.executor.PlanExecutor;
import org.apache.iotdb.db.utils.EnvironmentUtils;
import org.apache.iotdb.tsfile.exception.filter.QueryFilterOptimizationException;
import org.apache.iotdb.tsfile.read.common.RowRecord;
diff --git a/server/src/test/java/org/apache/iotdb/db/query/dataset/SingleDataSetTest.java b/server/src/test/java/org/apache/iotdb/db/query/dataset/SingleDataSetTest.java
index 8f9a15b1c1..6aa4073366 100644
--- a/server/src/test/java/org/apache/iotdb/db/query/dataset/SingleDataSetTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/query/dataset/SingleDataSetTest.java
@@ -22,9 +22,9 @@ import org.apache.iotdb.db.exception.StorageEngineException;
import org.apache.iotdb.db.exception.metadata.MetadataException;
import org.apache.iotdb.db.exception.query.QueryProcessException;
import org.apache.iotdb.db.qp.Planner;
-import org.apache.iotdb.db.qp.executor.IPlanExecutor;
-import org.apache.iotdb.db.qp.executor.PlanExecutor;
import org.apache.iotdb.db.qp.physical.PhysicalPlan;
+import org.apache.iotdb.db.query.filter.executor.IPlanExecutor;
+import org.apache.iotdb.db.query.filter.executor.PlanExecutor;
import org.apache.iotdb.db.utils.EnvironmentUtils;
import org.apache.iotdb.tsfile.exception.filter.QueryFilterOptimizationException;
import org.apache.iotdb.tsfile.read.common.RowRecord;
diff --git a/server/src/test/java/org/apache/iotdb/db/query/dataset/UDTFAlignByTimeDataSetTest.java b/server/src/test/java/org/apache/iotdb/db/query/dataset/UDTFAlignByTimeDataSetTest.java
index 34d7edc939..23ed66971f 100644
--- a/server/src/test/java/org/apache/iotdb/db/query/dataset/UDTFAlignByTimeDataSetTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/query/dataset/UDTFAlignByTimeDataSetTest.java
@@ -24,9 +24,9 @@ import org.apache.iotdb.db.exception.metadata.MetadataException;
import org.apache.iotdb.db.exception.query.QueryProcessException;
import org.apache.iotdb.db.metadata.path.PartialPath;
import org.apache.iotdb.db.qp.Planner;
-import org.apache.iotdb.db.qp.executor.IPlanExecutor;
-import org.apache.iotdb.db.qp.executor.PlanExecutor;
import org.apache.iotdb.db.qp.physical.crud.UDTFPlan;
+import org.apache.iotdb.db.query.filter.executor.IPlanExecutor;
+import org.apache.iotdb.db.query.filter.executor.PlanExecutor;
import org.apache.iotdb.db.service.IoTDB;
import org.apache.iotdb.db.utils.EnvironmentUtils;
import org.apache.iotdb.tsfile.exception.filter.QueryFilterOptimizationException;
diff --git a/server/src/test/java/org/apache/iotdb/db/query/dataset/groupby/GroupByFillDataSetTest.java b/server/src/test/java/org/apache/iotdb/db/query/dataset/groupby/GroupByFillDataSetTest.java
index ba6578af53..141673fe59 100644
--- a/server/src/test/java/org/apache/iotdb/db/query/dataset/groupby/GroupByFillDataSetTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/query/dataset/groupby/GroupByFillDataSetTest.java
@@ -21,9 +21,9 @@ package org.apache.iotdb.db.query.dataset.groupby;
import org.apache.iotdb.db.exception.query.QueryProcessException;
import org.apache.iotdb.db.qp.Planner;
-import org.apache.iotdb.db.qp.executor.IPlanExecutor;
-import org.apache.iotdb.db.qp.executor.PlanExecutor;
import org.apache.iotdb.db.qp.physical.crud.QueryPlan;
+import org.apache.iotdb.db.query.filter.executor.IPlanExecutor;
+import org.apache.iotdb.db.query.filter.executor.PlanExecutor;
import org.apache.iotdb.db.service.IoTDB;
import org.apache.iotdb.db.utils.EnvironmentUtils;
import org.apache.iotdb.tsfile.read.query.dataset.QueryDataSet;
diff --git a/server/src/test/java/org/apache/iotdb/db/query/dataset/groupby/GroupByLevelDataSetTest.java b/server/src/test/java/org/apache/iotdb/db/query/dataset/groupby/GroupByLevelDataSetTest.java
index 03cd7c431c..1602a3f207 100644
--- a/server/src/test/java/org/apache/iotdb/db/query/dataset/groupby/GroupByLevelDataSetTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/query/dataset/groupby/GroupByLevelDataSetTest.java
@@ -20,9 +20,9 @@ package org.apache.iotdb.db.query.dataset.groupby;
import org.apache.iotdb.db.exception.query.QueryProcessException;
import org.apache.iotdb.db.qp.Planner;
-import org.apache.iotdb.db.qp.executor.IPlanExecutor;
-import org.apache.iotdb.db.qp.executor.PlanExecutor;
import org.apache.iotdb.db.qp.physical.crud.QueryPlan;
+import org.apache.iotdb.db.query.filter.executor.IPlanExecutor;
+import org.apache.iotdb.db.query.filter.executor.PlanExecutor;
import org.apache.iotdb.db.service.IoTDB;
import org.apache.iotdb.db.utils.EnvironmentUtils;
import org.apache.iotdb.tsfile.read.query.dataset.QueryDataSet;
diff --git a/server/src/test/java/org/apache/iotdb/db/query/dataset/groupby/GroupByTimeDataSetTest.java b/server/src/test/java/org/apache/iotdb/db/query/dataset/groupby/GroupByTimeDataSetTest.java
index 30f3468b8b..05f999139b 100644
--- a/server/src/test/java/org/apache/iotdb/db/query/dataset/groupby/GroupByTimeDataSetTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/query/dataset/groupby/GroupByTimeDataSetTest.java
@@ -20,9 +20,9 @@ package org.apache.iotdb.db.query.dataset.groupby;
import org.apache.iotdb.db.exception.query.QueryProcessException;
import org.apache.iotdb.db.qp.Planner;
-import org.apache.iotdb.db.qp.executor.IPlanExecutor;
-import org.apache.iotdb.db.qp.executor.PlanExecutor;
import org.apache.iotdb.db.qp.physical.crud.QueryPlan;
+import org.apache.iotdb.db.query.filter.executor.IPlanExecutor;
+import org.apache.iotdb.db.query.filter.executor.PlanExecutor;
import org.apache.iotdb.db.service.IoTDB;
import org.apache.iotdb.db.utils.EnvironmentUtils;
import org.apache.iotdb.tsfile.read.query.dataset.QueryDataSet;
diff --git a/server/src/test/java/org/apache/iotdb/db/query/valuefilter/RawQueryWithValueFilterTest.java b/server/src/test/java/org/apache/iotdb/db/query/valuefilter/RawQueryWithValueFilterTest.java
index 522518c5ea..5d7193fc8d 100644
--- a/server/src/test/java/org/apache/iotdb/db/query/valuefilter/RawQueryWithValueFilterTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/query/valuefilter/RawQueryWithValueFilterTest.java
@@ -23,9 +23,9 @@ import org.apache.iotdb.db.conf.IoTDBConfig;
import org.apache.iotdb.db.conf.IoTDBDescriptor;
import org.apache.iotdb.db.exception.query.QueryProcessException;
import org.apache.iotdb.db.qp.Planner;
-import org.apache.iotdb.db.qp.executor.IPlanExecutor;
-import org.apache.iotdb.db.qp.executor.PlanExecutor;
import org.apache.iotdb.db.qp.physical.crud.QueryPlan;
+import org.apache.iotdb.db.query.filter.executor.IPlanExecutor;
+import org.apache.iotdb.db.query.filter.executor.PlanExecutor;
import org.apache.iotdb.db.utils.EnvironmentUtils;
import org.apache.iotdb.tsfile.read.query.dataset.QueryDataSet;
diff --git a/server/src/test/java/org/apache/iotdb/db/utils/TsFileRewriteToolTest.java b/server/src/test/java/org/apache/iotdb/db/utils/TsFileRewriteToolTest.java
index 864b4c0048..b49d9a5f1c 100644
--- a/server/src/test/java/org/apache/iotdb/db/utils/TsFileRewriteToolTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/utils/TsFileRewriteToolTest.java
@@ -29,8 +29,8 @@ import org.apache.iotdb.db.exception.metadata.IllegalPathException;
import org.apache.iotdb.db.exception.query.QueryProcessException;
import org.apache.iotdb.db.metadata.path.PartialPath;
import org.apache.iotdb.db.qp.Planner;
-import org.apache.iotdb.db.qp.executor.IPlanExecutor;
-import org.apache.iotdb.db.qp.executor.PlanExecutor;
+import org.apache.iotdb.db.query.filter.executor.IPlanExecutor;
+import org.apache.iotdb.db.query.filter.executor.PlanExecutor;
import org.apache.iotdb.db.tools.TsFileRewriteTool;
import org.apache.iotdb.tsfile.common.conf.TSFileConfig;
import org.apache.iotdb.tsfile.common.conf.TSFileDescriptor;