You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by hu...@apache.org on 2023/01/16 09:34:38 UTC
[iotdb] branch research/outlier updated: Add Distance-based Outlier Detection Function (#8885)
This is an automated email from the ASF dual-hosted git repository.
hui pushed a commit to branch research/outlier
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/research/outlier by this push:
new eb39238007 Add Distance-based Outlier Detection Function (#8885)
eb39238007 is described below
commit eb392380070318e2db19cafaee619d50fd0e5cd6
Author: iotdb-lsmod <12...@users.noreply.github.com>
AuthorDate: Mon Jan 16 17:34:29 2023 +0800
Add Distance-based Outlier Detection Function (#8885)
---
.../resources/conf/iotdb-engine.properties | 20 +-
.../java/org/apache/iotdb/db/conf/IoTDBConfig.java | 40 +-
.../org/apache/iotdb/db/conf/IoTDBDescriptor.java | 59 +
.../apache/iotdb/db/cq/ContinuousQueryTask.java | 2 +-
.../trigger/sink/local/LocalIoTDBHandler.java | 4 +-
.../iotdb/db/metadata/path/MeasurementPath.java | 17 +-
.../apache/iotdb/db/metadata/path/PartialPath.java | 9 +
.../apache/iotdb/db/qp/constant/SQLConstant.java | 4 +-
.../qp/logical/crud/AggregationQueryOperator.java | 4 +-
.../iotdb/db/qp/logical/crud/QueryOperator.java | 2 +-
.../iotdb/db/qp/logical/crud/SelectComponent.java | 14 +
.../iotdb/db/qp/physical/crud/AggregationPlan.java | 9 +
.../db/query/aggregation/AggregateResult.java | 13 +
.../db/query/aggregation/AggregationType.java | 8 +-
.../db/query/aggregation/impl/DoddsAggrResult.java | 233 +++
.../db/query/executor/AggregationExecutor.java | 839 +++++++-
.../db/query/factory/AggregateResultFactory.java | 6 +
.../db/query/filter/executor/IPlanExecutor.java | 134 ++
.../db/query/filter/executor/PlanExecutor.java | 2176 ++++++++++++++++++++
.../db/service/basic/BasicServiceProvider.java | 4 +-
.../iotdb/db/engine/storagegroup/TTLTest.java | 2 +-
.../idtable/IDTableResourceControlTest.java | 2 +-
.../db/metadata/idtable/IDTableRestartTest.java | 2 +-
.../db/metadata/idtable/InsertWithIDTableTest.java | 2 +-
.../db/metadata/idtable/LastQueryWithIDTable.java | 2 +-
.../QueryAlignedTimeseriesWithIDTableTest.java | 2 +-
.../db/metadata/idtable/QueryWithIDTableTest.java | 2 +-
.../iotdb/db/qp/physical/InsertRowPlanTest.java | 2 +-
.../db/qp/physical/InsertTabletMultiPlanTest.java | 2 +-
.../iotdb/db/qp/physical/InsertTabletPlanTest.java | 2 +-
.../db/query/aggregation/AggregateResultTest.java | 34 +
.../query/aggregation/DescAggregateResultTest.java | 195 +-
.../dataset/EngineDataSetWithValueFilterTest.java | 4 +-
.../iotdb/db/query/dataset/ListDataSetTest.java | 4 +-
.../iotdb/db/query/dataset/SingleDataSetTest.java | 4 +-
.../query/dataset/UDTFAlignByTimeDataSetTest.java | 4 +-
.../dataset/groupby/GroupByFillDataSetTest.java | 4 +-
.../dataset/groupby/GroupByLevelDataSetTest.java | 4 +-
.../dataset/groupby/GroupByTimeDataSetTest.java | 4 +-
.../valuefilter/RawQueryWithValueFilterTest.java | 4 +-
.../iotdb/db/utils/TsFileRewriteToolTest.java | 4 +-
41 files changed, 3651 insertions(+), 231 deletions(-)
diff --git a/server/src/assembly/resources/conf/iotdb-engine.properties b/server/src/assembly/resources/conf/iotdb-engine.properties
index 13becfd599..0b1026410d 100644
--- a/server/src/assembly/resources/conf/iotdb-engine.properties
+++ b/server/src/assembly/resources/conf/iotdb-engine.properties
@@ -236,7 +236,7 @@ timestamp_precision=ms
# Whether to timed flush unsequence tsfiles' memtables.
# Datatype: boolean
-# enable_timed_flush_unseq_memtable=true
+enable_timed_flush_unseq_memtable=false
# If a memTable's created time is older than current time minus this, the memtable will be flushed to disk.
# Only check unsequence tsfiles' memtables.
@@ -418,15 +418,15 @@ timestamp_precision=ms
####################
# sequence space compaction: only compact the sequence files
# Datatype: boolean
-# enable_seq_space_compaction=true
+enable_seq_space_compaction=false
# unsequence space compaction: only compact the unsequence files
# Datatype: boolean
-# enable_unseq_space_compaction=true
+enable_unseq_space_compaction=false
# cross space compaction: compact the unsequence files into the overlapped sequence files
# Datatype: boolean
-# enable_cross_space_compaction=true
+enable_cross_space_compaction=false
# the strategy of inner space compaction task
# Options: inplace_compaction
@@ -721,9 +721,12 @@ timestamp_precision=ms
# Datatype: int [xsy]
# page_size_in_byte=65536
+# xMax = 10000000
+# xMin = -1000000
+
# The maximum number of data points in a page, default 1024*1024
# Datatype: int [xsy]
-# max_number_of_points_in_page=1048576
+max_number_of_points_in_page=60
# Max size limitation of input string
# Datatype: int [xsy]
@@ -953,4 +956,9 @@ timestamp_precision=ms
### Group By Fill Configuration
####################
# Datatype: float
-# group_by_fill_cache_size_in_mb=1.0
\ No newline at end of file
+# group_by_fill_cache_size_in_mb=1.0
+
+# Datatype: double
+bucket_width = 2
+window_size = 60000
+file_num = 2
diff --git a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
index 4842bb27e6..b674a277a4 100644
--- a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
+++ b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
@@ -283,11 +283,11 @@ public class IoTDBConfig {
/**
* If we enable the memory-control mechanism during index building , {@code indexBufferSize}
* refers to the byte-size of memory buffer threshold. For each index processor, all indexes in
- * one {@linkplain org.apache.iotdb.db.index.IndexFileProcessor IndexFileProcessor} share a total
- * common buffer size. With the memory-control mechanism, the occupied memory of all raw data and
- * index structures will be counted. If the memory buffer size reaches this threshold, the indexes
- * will be flushed to the disk file. As a result, data in one series may be divided into more than
- * one part and indexed separately. Unit: byte
+ * one {@linkplain org.apache.iotdb.db.index IndexFileProcessor} share a total common buffer size.
+ * With the memory-control mechanism, the occupied memory of all raw data and index structures
+ * will be counted. If the memory buffer size reaches this threshold, the indexes will be flushed
+ * to the disk file. As a result, data in one series may be divided into more than one part and
+ * indexed separately. Unit: byte
*/
private long indexBufferSize = 128 * 1024 * 1024L;
@@ -802,6 +802,36 @@ public class IoTDBConfig {
*/
private boolean enableIDTableLogFile = false;
+ public double getGamma() {
+ return gamma;
+ }
+
+ public void setGamma(double gamma) {
+ this.gamma = gamma;
+ }
+
+ public int getFileNum() {
+ return fileNum;
+ }
+
+ public void setFileNum(int fileNum) {
+ this.fileNum = fileNum;
+ }
+
+ public int getDelta() {
+ return delta;
+ }
+
+ public void setDelta(int delta) {
+ this.delta = delta;
+ }
+
+ private double gamma = 2;
+
+ private int fileNum = 1;
+
+ private int delta = 60000;
+
public IoTDBConfig() {
// empty constructor
}
diff --git a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
index 849fddbdb6..f8311a7e72 100644
--- a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
+++ b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
@@ -829,6 +829,16 @@ public class IoTDBDescriptor {
properties.getProperty("kerberos_principal", conf.getKerberosPrincipal()));
TSFileDescriptor.getInstance().getConfig().setBatchSize(conf.getBatchSize());
+ conf.setGamma(
+ Double.parseDouble(
+ properties.getProperty("bucket_width", Double.toString(conf.getGamma()))));
+ conf.setFileNum(
+ Integer.parseInt(
+ properties.getProperty("file_num", Integer.toString(conf.getFileNum()))));
+ conf.setDelta(
+ Integer.parseInt(
+ properties.getProperty("window_size", Integer.toString(conf.getDelta()))));
+
// timed flush memtable, timed close tsfile
loadTimedService(properties);
@@ -974,6 +984,55 @@ public class IoTDBDescriptor {
"group_size_in_byte",
Integer.toString(
TSFileDescriptor.getInstance().getConfig().getGroupSizeInByte()))));
+ TSFileDescriptor.getInstance()
+ .getConfig()
+ .setXMax(
+ Double.parseDouble(
+ properties.getProperty(
+ "xMax",
+ Double.toString(TSFileDescriptor.getInstance().getConfig().getXMax()))));
+ TSFileDescriptor.getInstance()
+ .getConfig()
+ .setXMin(
+ Double.parseDouble(
+ properties.getProperty(
+ "xMin",
+ Double.toString(TSFileDescriptor.getInstance().getConfig().getXMin()))));
+ TSFileDescriptor.getInstance()
+ .getConfig()
+ .setsMax(
+ Double.parseDouble(
+ properties.getProperty(
+ "sMax",
+ Double.toString(TSFileDescriptor.getInstance().getConfig().getsMax()))));
+ TSFileDescriptor.getInstance()
+ .getConfig()
+ .setSmin(
+ Double.parseDouble(
+ properties.getProperty(
+ "sMin",
+ Double.toString(TSFileDescriptor.getInstance().getConfig().getSmin()))));
+ TSFileDescriptor.getInstance()
+ .getConfig()
+ .setMussRate(
+ Double.parseDouble(
+ properties.getProperty(
+ "mussRate",
+ Double.toString(TSFileDescriptor.getInstance().getConfig().getMussRate()))));
+ TSFileDescriptor.getInstance()
+ .getConfig()
+ .setUsePreSpeed(
+ Boolean.parseBoolean(
+ properties.getProperty(
+ "usePreSpeed",
+ Boolean.toString(TSFileDescriptor.getInstance().getConfig().isUsePreSpeed()))));
+ TSFileDescriptor.getInstance()
+ .getConfig()
+ .setUsePreRange(
+ Boolean.parseBoolean(
+ properties.getProperty(
+ "usePreRange",
+ Boolean.toString(TSFileDescriptor.getInstance().getConfig().isUsePreRange()))));
TSFileDescriptor.getInstance()
.getConfig()
.setPageSizeInByte(
diff --git a/server/src/main/java/org/apache/iotdb/db/cq/ContinuousQueryTask.java b/server/src/main/java/org/apache/iotdb/db/cq/ContinuousQueryTask.java
index 154c284fe8..6216487e5f 100644
--- a/server/src/main/java/org/apache/iotdb/db/cq/ContinuousQueryTask.java
+++ b/server/src/main/java/org/apache/iotdb/db/cq/ContinuousQueryTask.java
@@ -25,7 +25,6 @@ import org.apache.iotdb.db.exception.metadata.MetadataException;
import org.apache.iotdb.db.exception.query.QueryProcessException;
import org.apache.iotdb.db.metadata.path.PartialPath;
import org.apache.iotdb.db.qp.Planner;
-import org.apache.iotdb.db.qp.executor.PlanExecutor;
import org.apache.iotdb.db.qp.logical.crud.GroupByClauseComponent;
import org.apache.iotdb.db.qp.logical.crud.QueryOperator;
import org.apache.iotdb.db.qp.logical.crud.SelectComponent;
@@ -34,6 +33,7 @@ import org.apache.iotdb.db.qp.physical.crud.InsertTabletPlan;
import org.apache.iotdb.db.qp.physical.sys.CreateContinuousQueryPlan;
import org.apache.iotdb.db.query.context.QueryContext;
import org.apache.iotdb.db.query.control.QueryResourceManager;
+import org.apache.iotdb.db.query.filter.executor.PlanExecutor;
import org.apache.iotdb.db.utils.TypeInferenceUtils;
import org.apache.iotdb.tsfile.exception.filter.QueryFilterOptimizationException;
import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/trigger/sink/local/LocalIoTDBHandler.java b/server/src/main/java/org/apache/iotdb/db/engine/trigger/sink/local/LocalIoTDBHandler.java
index 3b224b20e5..b4f40e1f4d 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/trigger/sink/local/LocalIoTDBHandler.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/trigger/sink/local/LocalIoTDBHandler.java
@@ -27,10 +27,10 @@ import org.apache.iotdb.db.exception.metadata.MetadataException;
import org.apache.iotdb.db.exception.metadata.StorageGroupNotSetException;
import org.apache.iotdb.db.exception.query.QueryProcessException;
import org.apache.iotdb.db.metadata.path.PartialPath;
-import org.apache.iotdb.db.qp.executor.IPlanExecutor;
-import org.apache.iotdb.db.qp.executor.PlanExecutor;
import org.apache.iotdb.db.qp.physical.PhysicalPlan;
import org.apache.iotdb.db.qp.physical.crud.InsertRowPlan;
+import org.apache.iotdb.db.query.filter.executor.IPlanExecutor;
+import org.apache.iotdb.db.query.filter.executor.PlanExecutor;
import org.apache.iotdb.db.service.IoTDB;
import org.apache.iotdb.tsfile.common.conf.TSFileDescriptor;
import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
diff --git a/server/src/main/java/org/apache/iotdb/db/metadata/path/MeasurementPath.java b/server/src/main/java/org/apache/iotdb/db/metadata/path/MeasurementPath.java
index b481e478a5..756ce3ac5c 100644
--- a/server/src/main/java/org/apache/iotdb/db/metadata/path/MeasurementPath.java
+++ b/server/src/main/java/org/apache/iotdb/db/metadata/path/MeasurementPath.java
@@ -39,6 +39,7 @@ import org.apache.iotdb.db.query.reader.series.SeriesReader;
import org.apache.iotdb.db.utils.QueryUtils;
import org.apache.iotdb.db.utils.TestOnly;
import org.apache.iotdb.db.utils.datastructure.TVList;
+import org.apache.iotdb.tsfile.common.constant.TsFileConstant;
import org.apache.iotdb.tsfile.file.metadata.IChunkMetadata;
import org.apache.iotdb.tsfile.file.metadata.TimeseriesMetadata;
import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
@@ -55,10 +56,7 @@ import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.io.Serializable;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
+import java.util.*;
public class MeasurementPath extends PartialPath {
@@ -93,6 +91,17 @@ public class MeasurementPath extends PartialPath {
this.measurementSchema = measurementSchema;
}
+ public MeasurementPath concat(String[] otherNodes) {
+ int len = nodes.length;
+ String[] newNodes = Arrays.copyOf(nodes, nodes.length + otherNodes.length);
+ System.arraycopy(otherNodes, 0, newNodes, len, otherNodes.length);
+ MeasurementPath measurementPath = new MeasurementPath();
+ measurementPath.nodes = newNodes;
+ measurementPath.fullPath = this.fullPath;
+ measurementPath.fullPath = String.join(TsFileConstant.PATH_SEPARATOR, nodes);
+ return measurementPath;
+ }
+
public IMeasurementSchema getMeasurementSchema() {
return measurementSchema;
}
diff --git a/server/src/main/java/org/apache/iotdb/db/metadata/path/PartialPath.java b/server/src/main/java/org/apache/iotdb/db/metadata/path/PartialPath.java
index 6dbee2febd..9e50cf1fde 100644
--- a/server/src/main/java/org/apache/iotdb/db/metadata/path/PartialPath.java
+++ b/server/src/main/java/org/apache/iotdb/db/metadata/path/PartialPath.java
@@ -125,6 +125,15 @@ public class PartialPath extends Path implements Comparable<Path>, Cloneable {
fullPath = String.join(TsFileConstant.PATH_SEPARATOR, nodes);
}
+ public PartialPath concat(String[] otherNodes) {
+ int len = nodes.length;
+ String[] newNodes = Arrays.copyOf(nodes, nodes.length + otherNodes.length);
+ System.arraycopy(otherNodes, 0, newNodes, len, otherNodes.length);
+ PartialPath partialPath = new PartialPath(newNodes);
+ partialPath.fullPath = String.join(TsFileConstant.PATH_SEPARATOR, nodes);
+ return partialPath;
+ }
+
public PartialPath concatNode(String node) {
String[] newPathNodes = Arrays.copyOf(nodes, nodes.length + 1);
newPathNodes[newPathNodes.length - 1] = node;
diff --git a/server/src/main/java/org/apache/iotdb/db/qp/constant/SQLConstant.java b/server/src/main/java/org/apache/iotdb/db/qp/constant/SQLConstant.java
index ce111ab37a..a4ab046546 100644
--- a/server/src/main/java/org/apache/iotdb/db/qp/constant/SQLConstant.java
+++ b/server/src/main/java/org/apache/iotdb/db/qp/constant/SQLConstant.java
@@ -70,6 +70,7 @@ public class SQLConstant {
public static final String COUNT = "count";
public static final String AVG = "avg";
public static final String SUM = "sum";
+ public static final String DODDS = "dodds";
public static final String ALL = "all";
@@ -85,7 +86,8 @@ public class SQLConstant {
LAST_VALUE,
COUNT,
SUM,
- AVG));
+ AVG,
+ DODDS));
public static final int TOK_WHERE = 23;
public static final int TOK_INSERT = 24;
diff --git a/server/src/main/java/org/apache/iotdb/db/qp/logical/crud/AggregationQueryOperator.java b/server/src/main/java/org/apache/iotdb/db/qp/logical/crud/AggregationQueryOperator.java
index 36719c71ec..9af0487ccb 100644
--- a/server/src/main/java/org/apache/iotdb/db/qp/logical/crud/AggregationQueryOperator.java
+++ b/server/src/main/java/org/apache/iotdb/db/qp/logical/crud/AggregationQueryOperator.java
@@ -125,7 +125,9 @@ public class AggregationQueryOperator extends QueryOperator {
protected AggregationPlan initAggregationPlan(QueryPlan queryPlan) throws QueryProcessException {
AggregationPlan aggregationPlan = (AggregationPlan) queryPlan;
- aggregationPlan.setAggregations(selectComponent.getAggregationFunctions());
+ aggregationPlan.setAggregations(selectComponent.getAggregationFunctions()); // function name
+ aggregationPlan.setParameters(selectComponent.getAggregationAttributes());
+ // TODO: aggregationPlan.setParams(selectComponent.getParams)
if (isGroupByLevel()) {
initGroupByLevel(aggregationPlan);
}
diff --git a/server/src/main/java/org/apache/iotdb/db/qp/logical/crud/QueryOperator.java b/server/src/main/java/org/apache/iotdb/db/qp/logical/crud/QueryOperator.java
index 4653ed6794..5f41e24902 100644
--- a/server/src/main/java/org/apache/iotdb/db/qp/logical/crud/QueryOperator.java
+++ b/server/src/main/java/org/apache/iotdb/db/qp/logical/crud/QueryOperator.java
@@ -60,7 +60,7 @@ public class QueryOperator extends Operator {
protected WhereComponent whereComponent;
protected SpecialClauseComponent specialClauseComponent;
- protected Map<String, Object> props;
+ protected Map<String, Object> props; // parameters
protected IndexType indexType;
protected boolean enableTracing;
diff --git a/server/src/main/java/org/apache/iotdb/db/qp/logical/crud/SelectComponent.java b/server/src/main/java/org/apache/iotdb/db/qp/logical/crud/SelectComponent.java
index f06d66158c..0c3b64d505 100644
--- a/server/src/main/java/org/apache/iotdb/db/qp/logical/crud/SelectComponent.java
+++ b/server/src/main/java/org/apache/iotdb/db/qp/logical/crud/SelectComponent.java
@@ -28,6 +28,7 @@ import org.apache.iotdb.db.query.expression.unary.TimeSeriesOperand;
import java.time.ZoneId;
import java.util.ArrayList;
import java.util.List;
+import java.util.Map;
/** this class maintains information from select clause. */
public final class SelectComponent {
@@ -128,8 +129,21 @@ public final class SelectComponent {
expression instanceof FunctionExpression
? ((FunctionExpression) resultColumn.getExpression()).getFunctionName()
: null);
+ // TODO: resultColumn.getExpression().getFunctionAttributes()(
}
}
return aggregationFunctionsCache;
}
+
+ public List<Map<String, String>> getAggregationAttributes() {
+ List<Map<String, String>> aggregationAttributesCache = new ArrayList<>();
+ for (ResultColumn resultColumn : resultColumns) {
+ Expression expression = resultColumn.getExpression();
+ aggregationAttributesCache.add(
+ expression instanceof FunctionExpression
+ ? ((FunctionExpression) resultColumn.getExpression()).getFunctionAttributes()
+ : null);
+ }
+ return aggregationAttributesCache;
+ }
}
diff --git a/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/AggregationPlan.java b/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/AggregationPlan.java
index c81e12e5f8..fec29da6cf 100644
--- a/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/AggregationPlan.java
+++ b/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/AggregationPlan.java
@@ -46,6 +46,7 @@ public class AggregationPlan extends RawDataQueryPlan {
private List<String> aggregations = new ArrayList<>();
private List<String> deduplicatedAggregations = new ArrayList<>();
+ private List<Map<String, String>> parameters = new ArrayList<>();
private int[] levels;
private GroupByLevelController groupByLevelController;
@@ -106,6 +107,14 @@ public class AggregationPlan extends RawDataQueryPlan {
this.aggregations = aggregations;
}
+ public List<Map<String, String>> getParameters() {
+ return parameters;
+ }
+
+ public void setParameters(List<Map<String, String>> parameters) {
+ this.parameters = parameters;
+ }
+
public List<String> getDeduplicatedAggregations() {
return deduplicatedAggregations;
}
diff --git a/server/src/main/java/org/apache/iotdb/db/query/aggregation/AggregateResult.java b/server/src/main/java/org/apache/iotdb/db/query/aggregation/AggregateResult.java
index 7daa1e8d60..4f8eba913b 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/aggregation/AggregateResult.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/aggregation/AggregateResult.java
@@ -33,6 +33,7 @@ import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;
import java.io.IOException;
import java.io.OutputStream;
import java.nio.ByteBuffer;
+import java.util.List;
public abstract class AggregateResult {
@@ -46,6 +47,7 @@ public abstract class AggregateResult {
private float floatValue;
private double doubleValue;
private Binary binaryValue;
+ private List<Integer> intArrayValue;
protected boolean hasCandidateResult;
@@ -67,6 +69,7 @@ public abstract class AggregateResult {
*
* @param statistics chunkStatistics or pageStatistics
*/
+ // update data by headers
public abstract void updateResultFromStatistics(Statistics statistics)
throws QueryProcessException;
@@ -76,6 +79,7 @@ public abstract class AggregateResult {
*
* @param batchIterator the data in Page
*/
+ // update by original data
public abstract void updateResultFromPageData(IBatchDataIterator batchIterator)
throws IOException, QueryProcessException;
@@ -266,6 +270,15 @@ public abstract class AggregateResult {
this.intValue = intValue;
}
+ protected List<Integer> getIntArrayValue() {
+ return intArrayValue;
+ }
+
+ public void setIntArrayValue(List<Integer> intArrayValue) {
+ this.hasCandidateResult = true;
+ this.intArrayValue = intArrayValue;
+ }
+
protected long getLongValue() {
return longValue;
}
diff --git a/server/src/main/java/org/apache/iotdb/db/query/aggregation/AggregationType.java b/server/src/main/java/org/apache/iotdb/db/query/aggregation/AggregationType.java
index a3c651835c..44e00e3024 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/aggregation/AggregationType.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/aggregation/AggregationType.java
@@ -35,7 +35,8 @@ public enum AggregationType {
MIN_TIME,
MAX_VALUE,
MIN_VALUE,
- EXTREME;
+ EXTREME,
+ DODDS;
/**
* give an integer to return a data type.
@@ -65,6 +66,8 @@ public enum AggregationType {
return MIN_VALUE;
case 9:
return EXTREME;
+ case 10:
+ return DODDS;
default:
throw new IllegalArgumentException("Invalid Aggregation Type: " + i);
}
@@ -103,6 +106,9 @@ public enum AggregationType {
case EXTREME:
i = 9;
break;
+ case DODDS:
+ i = 10;
+ break;
default:
throw new IllegalArgumentException("Invalid Aggregation Type: " + this.name());
}
diff --git a/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/DoddsAggrResult.java b/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/DoddsAggrResult.java
new file mode 100644
index 0000000000..045456c97e
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/DoddsAggrResult.java
@@ -0,0 +1,233 @@
+package org.apache.iotdb.db.query.aggregation.impl;
+
+import org.apache.iotdb.db.exception.query.QueryProcessException;
+import org.apache.iotdb.db.query.aggregation.AggregateResult;
+import org.apache.iotdb.db.query.aggregation.AggregationType;
+import org.apache.iotdb.db.query.reader.series.IReaderByTimestamp;
+import org.apache.iotdb.db.utils.ValueIterator;
+import org.apache.iotdb.tsfile.exception.filter.StatisticsClassException;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+import org.apache.iotdb.tsfile.file.metadata.statistics.DoubleStatistics;
+import org.apache.iotdb.tsfile.file.metadata.statistics.Statistics;
+import org.apache.iotdb.tsfile.read.common.IBatchDataIterator;
+import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;
+
+import java.io.IOException;
+import java.io.OutputStream;
+import java.nio.ByteBuffer;
+import java.util.*;
+
+public class DoddsAggrResult extends AggregateResult {
+ private TSDataType seriesDataType;
+ private int count;
+ private List<Long> outliers = new ArrayList<>(); // store the timestamps of outliers
+ private int k, w, s;
+ private double r;
+ private int delta;
+ private double gamma;
+ private int fileNum;
+ private boolean ifWithUpperBound;
+ private Statistics statisticsInstance = new DoubleStatistics();
+
+ // TODO: 分桶信息存到中间变量buckets中,结果信息存到结果变量outliers中
+
+ public DoddsAggrResult(TSDataType seriesDataType) {
+ super(TSDataType.VECTOR, AggregationType.DODDS);
+ this.seriesDataType = seriesDataType;
+ reset();
+ }
+
+ @Override
+ public List<Integer> getResult() {
+ // TODO: revise statistics to return the detection result
+
+ return hasCandidateResult() ? getIntArrayValue() : null;
+ }
+
+ @Override
+ public void updateResultFromStatistics(Statistics statistics) {
+ if (statistics.getCount() == 0) {
+ return;
+ }
+ if (statistics instanceof DoubleStatistics) {
+ count += statistics.getCount();
+ } else {
+ throw new StatisticsClassException("Does not support: DODDS");
+ }
+ // setIntArrayValue(statisticsInstance.getBuckets());
+ }
+
+ @Override
+ public void updateResultFromPageData(IBatchDataIterator batchIterator)
+ throws IOException, QueryProcessException {
+ updateResultFromPageData(batchIterator, Long.MIN_VALUE, Long.MAX_VALUE);
+ }
+
+ @Override
+ public void updateResultFromPageData(
+ IBatchDataIterator batchIterator, long minBound, long maxBound) {
+ while (batchIterator.hasNext(minBound, maxBound)) {
+ if (batchIterator.currentTime() >= maxBound || batchIterator.currentTime() < minBound) {
+ break;
+ }
+ statisticsInstance.update(batchIterator.currentTime(), (double) batchIterator.currentValue());
+ batchIterator.next();
+ }
+ }
+
+ @Override
+ public void updateResultUsingTimestamps(
+ long[] timestamps, int length, IReaderByTimestamp dataReader) throws IOException {
+ Object[] values = dataReader.getValuesInTimestamps(timestamps, length);
+ for (int i = 0; i < length; i++) {
+ if (values[i] != null) {
+ statisticsInstance.update(timestamps[i], (double) values[i]);
+ }
+ }
+ }
+
+ @Override
+ public void updateResultUsingValues(long[] timestamps, int length, ValueIterator valueIterator) {
+ int i = 0;
+ while (valueIterator.hasNext() && i < length) {
+ statisticsInstance.update(timestamps[i], (double) valueIterator.next());
+ i++;
+ }
+ }
+
+ @Override
+ public boolean hasFinalResult() {
+ // TODO:修改判断
+ return false;
+ }
+
+ @Override
+ public void merge(AggregateResult another) {
+ DoddsAggrResult anotherVar = (DoddsAggrResult) another;
+ if (anotherVar.getStatisticsInstance().getCount() == 0) {
+ return;
+ }
+ // if (another.getResult() instanceof List<?>){
+ // int i = 0;
+ // for (Object o : (List<?>)another.getResult()){
+ // mergedBuckets.set(i, mergedBuckets.get(i) + Integer.class.cast(o));
+ // i += 1;
+ // }
+ // }
+ }
+
+ @Override
+ protected void deserializeSpecificFields(ByteBuffer buffer) {
+ this.seriesDataType = TSDataType.deserialize(buffer.get());
+ // for (int i=0; i<Statistics.bucketCnt; i++){
+ // this.mergedBuckets.set(i, buffer.getInt());
+ // }
+ }
+
+ @Override
+ protected void serializeSpecificFields(OutputStream outputStream) throws IOException {
+ ReadWriteIOUtils.write(seriesDataType, outputStream);
+ // for (int i=0; i<Statistics.bucketCnt; i++){
+ // ReadWriteIOUtils.write(this.mergedBuckets.get(i), outputStream);
+ // }
+ }
+
+ @Override
+ public void reset() {
+ super.reset();
+ // for (int i=0; i<Statistics.bucketCnt; i++){
+ // mergedBuckets.set(i, 0);
+ // }
+ statisticsInstance = new DoubleStatistics();
+ }
+
+ public Statistics getStatisticsInstance() {
+ return statisticsInstance;
+ }
+
+ public void setStatisticsInstance(Statistics statisticsInstance) {
+ this.statisticsInstance = statisticsInstance;
+ }
+
+ public void setParameters(Map<String, String> parameters) {
+ if (parameters.containsKey("k")
+ && parameters.containsKey("r")
+ && parameters.containsKey("w")
+ && parameters.containsKey("s")) {
+ this.k = Integer.parseInt(parameters.get("k"));
+ this.r = Double.parseDouble(parameters.get("r"));
+ this.w = Integer.parseInt(parameters.get("w"));
+ this.s = Integer.parseInt(parameters.get("s"));
+
+ System.out.println("Parameters are successfully set.");
+ } else System.out.println("Parameters are not obtained in AggrResult.");
+
+ if (parameters.containsKey("d")) this.delta = Integer.parseInt(parameters.get("d")) * 1000;
+ else this.delta = 60000;
+ if (parameters.containsKey("g")) this.gamma = Double.parseDouble(parameters.get("g"));
+ else this.gamma = 0.1;
+ if (parameters.containsKey("f")) this.fileNum = Integer.parseInt(parameters.get("f"));
+ else this.fileNum = 1;
+ if (parameters.containsKey("b"))
+ this.ifWithUpperBound = Boolean.parseBoolean(parameters.get("b"));
+ else this.ifWithUpperBound = true;
+ }
+
+ public double getR() {
+ return r;
+ }
+
+ public int getK() {
+ return k;
+ }
+
+ public int getW() {
+ return w;
+ }
+
+ public int getS() {
+ return s;
+ }
+
+ public int getDelta() {
+ return delta;
+ }
+
+ public double getGamma() {
+ return gamma;
+ }
+
+ public int getFileNum() {
+ return fileNum;
+ }
+
+ public boolean getIfWithUpperBound() {
+ return ifWithUpperBound;
+ }
+
+ public void detectOutliers() {
+ // int j = (int) Math.ceil(r / Statistics.gamma);
+ // int l = (int) Math.floor(r / Statistics.gamma);
+ // int beta = Statistics.bucketCnt;
+ // int lowBound, highBound;
+ // for (int u=0; u < beta; u++){
+ // lowBound = 0;
+ // highBound = 0;
+ // for (int i = Math.max(0, u-l+1); i < Math.min(beta, u+l-1); i++) {
+ // lowBound += mergedBuckets.get(i);
+ // }
+ // if (lowBound >= k) continue;
+ //
+ // for (int i = Math.max(0, u-j); i < Math.min(beta, u+j); i++){
+ // highBound += mergedBuckets.get(i);
+ // }
+ // if (j - r/Statistics.gamma >= 0.5){
+ // highBound = Math.max(highBound - mergedBuckets.get(u-j), highBound -
+ // mergedBuckets.get(u+j));
+ // }
+ //
+ // if (highBound < k) {continue;} //TODO: update outlier set
+ // }
+
+ }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/query/executor/AggregationExecutor.java b/server/src/main/java/org/apache/iotdb/db/query/executor/AggregationExecutor.java
index fc33f2639a..149346d63c 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/executor/AggregationExecutor.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/executor/AggregationExecutor.java
@@ -25,6 +25,7 @@ import org.apache.iotdb.db.engine.StorageEngine;
import org.apache.iotdb.db.engine.querycontext.QueryDataSource;
import org.apache.iotdb.db.engine.storagegroup.VirtualStorageGroupProcessor;
import org.apache.iotdb.db.exception.StorageEngineException;
+import org.apache.iotdb.db.exception.metadata.IllegalPathException;
import org.apache.iotdb.db.exception.query.QueryProcessException;
import org.apache.iotdb.db.metadata.path.AlignedPath;
import org.apache.iotdb.db.metadata.path.MeasurementPath;
@@ -34,6 +35,8 @@ import org.apache.iotdb.db.qp.physical.crud.AggregationPlan;
import org.apache.iotdb.db.qp.physical.crud.QueryPlan;
import org.apache.iotdb.db.qp.physical.crud.RawDataQueryPlan;
import org.apache.iotdb.db.query.aggregation.AggregateResult;
+import org.apache.iotdb.db.query.aggregation.AggregationType;
+import org.apache.iotdb.db.query.aggregation.impl.DoddsAggrResult;
import org.apache.iotdb.db.query.context.QueryContext;
import org.apache.iotdb.db.query.control.QueryResourceManager;
import org.apache.iotdb.db.query.dataset.SingleDataSet;
@@ -47,6 +50,8 @@ import org.apache.iotdb.db.query.reader.series.SeriesReaderByTimestamp;
import org.apache.iotdb.db.query.timegenerator.ServerTimeGenerator;
import org.apache.iotdb.db.utils.QueryUtils;
import org.apache.iotdb.db.utils.ValueIterator;
+import org.apache.iotdb.tsfile.common.conf.TSFileConfig;
+import org.apache.iotdb.tsfile.common.conf.TSFileDescriptor;
import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
import org.apache.iotdb.tsfile.file.metadata.statistics.Statistics;
import org.apache.iotdb.tsfile.read.common.BatchData;
@@ -59,14 +64,8 @@ import org.apache.iotdb.tsfile.read.query.dataset.QueryDataSet;
import org.apache.iotdb.tsfile.read.query.timegenerator.TimeGenerator;
import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
+import java.util.*;
import java.util.Map.Entry;
-import java.util.Set;
import static org.apache.iotdb.tsfile.read.query.executor.ExecutorWithTimeGenerator.markFilterdPaths;
@@ -76,10 +75,12 @@ public class AggregationExecutor {
private List<PartialPath> selectedSeries;
protected List<TSDataType> dataTypes;
protected List<String> aggregations;
+ protected List<Map<String, String>> parameters;
protected IExpression expression;
protected boolean ascending;
+ private final TSFileConfig tsFileConfig = TSFileDescriptor.getInstance().getConfig();
protected QueryContext context;
- protected AggregateResult[] aggregateResultList;
+ protected AggregateResult[] aggregssateResultList;
/** aggregation batch calculation size. */
private int aggregateFetchSize;
@@ -91,6 +92,7 @@ public class AggregationExecutor {
.forEach(k -> selectedSeries.add(((MeasurementPath) k).transformToExactPath()));
this.dataTypes = aggregationPlan.getDeduplicatedDataTypes();
this.aggregations = aggregationPlan.getDeduplicatedAggregations();
+ this.parameters = aggregationPlan.getParameters();
this.expression = aggregationPlan.getExpression();
this.aggregateFetchSize = IoTDBDescriptor.getInstance().getConfig().getBatchSize();
this.ascending = aggregationPlan.isAscending();
@@ -101,7 +103,6 @@ public class AggregationExecutor {
/** execute aggregate function with only time filter or no filter. */
public QueryDataSet executeWithoutValueFilter(AggregationPlan aggregationPlan)
throws StorageEngineException, IOException, QueryProcessException {
-
Filter timeFilter = null;
if (expression != null) {
timeFilter = ((GlobalTimeExpression) expression).getFilter();
@@ -155,13 +156,24 @@ public class AggregationExecutor {
throws IOException, QueryProcessException, StorageEngineException {
List<AggregateResult> ascAggregateResultList = new ArrayList<>();
List<AggregateResult> descAggregateResultList = new ArrayList<>();
+
+ DoddsAggrResult doddsAggrResult = null;
boolean[] isAsc = new boolean[aggregateResultList.length];
+ boolean[] isValidity = new boolean[aggregateResultList.length];
+ boolean[] isDodds = new boolean[aggregateResultList.length];
TSDataType tsDataType = dataTypes.get(indexes.get(0));
for (int i : indexes) {
// construct AggregateResult
AggregateResult aggregateResult =
AggregateResultFactory.getAggrResultByName(aggregations.get(i), tsDataType);
+
+ if (aggregateResult.getAggregationType() == AggregationType.DODDS) {
+ doddsAggrResult = (DoddsAggrResult) aggregateResult;
+ doddsAggrResult.setParameters(parameters.get(i));
+ isDodds[i] = true;
+ continue;
+ }
if (aggregateResult.isAscending()) {
ascAggregateResultList.add(aggregateResult);
isAsc[i] = true;
@@ -169,6 +181,7 @@ public class AggregationExecutor {
descAggregateResultList.add(aggregateResult);
}
}
+
aggregateOneSeries(
seriesPath,
allMeasurementsInDevice,
@@ -178,15 +191,813 @@ public class AggregationExecutor {
ascAggregateResultList,
descAggregateResultList,
null);
-
+ if (doddsAggrResult != null) {
+ aggregateDodds(
+ seriesPath,
+ allMeasurementsInDevice,
+ context,
+ timeFilter,
+ tsDataType,
+ doddsAggrResult,
+ null);
+ }
int ascIndex = 0;
int descIndex = 0;
for (int i : indexes) {
- aggregateResultList[i] =
- isAsc[i]
- ? ascAggregateResultList.get(ascIndex++)
- : descAggregateResultList.get(descIndex++);
+ if (isDodds[i]) {
+ if (doddsAggrResult != null) {
+ aggregateResultList[i] = doddsAggrResult;
+ }
+ } else {
+ aggregateResultList[i] =
+ (isAsc[i]
+ ? ascAggregateResultList.get(ascIndex++)
+ : descAggregateResultList.get(descIndex++));
+ }
+ }
+ }
+
+ private void aggregateDodds(
+ PartialPath seriesPath,
+ Set<String> measurements,
+ QueryContext context,
+ Filter timeFilter,
+ TSDataType tsDataType,
+ DoddsAggrResult doddsAggrResult,
+ TsFileFilter fileFilter)
+ throws QueryProcessException, StorageEngineException, IOException {
+ // // construct series reader without value filter
+ QueryDataSource queryDataSource =
+ QueryResourceManager.getInstance().getQueryDataSource(seriesPath, context, timeFilter);
+ if (fileFilter != null) {
+ QueryUtils.filterQueryDataSource(queryDataSource, fileFilter);
+ }
+
+ double r = doddsAggrResult.getR();
+ int k = doddsAggrResult.getK();
+ int w = doddsAggrResult.getW();
+ int s = doddsAggrResult.getS();
+
+ System.out.println("r=" + r + ", k=" + k + ", w=" + w + ", s=" + s);
+
+// long delta = doddsAggrResult.getDelta();
+// double gamma = doddsAggrResult.getGamma();
+// int fileNum = doddsAggrResult.getFileNum();
+ int delta = IoTDBDescriptor.getInstance().getConfig().getDelta();
+ double gamma = IoTDBDescriptor.getInstance().getConfig().getGamma();
+ int fileNum = IoTDBDescriptor.getInstance().getConfig().getFileNum();
+ boolean ifWithUpperBound = doddsAggrResult.getIfWithUpperBound();
+
+ System.out.println(
+ "delta="
+ + delta
+ + ", gamma="
+ + gamma
+ + ", fileNum="
+ + fileNum
+ + ", ifWithUpperBound"
+ + ifWithUpperBound);
+
+ // update filter by TTL
+ timeFilter = queryDataSource.updateFilterUsingTTL(timeFilter);
+ IAggregateReader seriesReader =
+ new SeriesAggregateReader(
+ seriesPath,
+ measurements,
+ tsDataType,
+ context,
+ queryDataSource,
+ timeFilter,
+ null,
+ null,
+ true);
+ long startTime = Long.MAX_VALUE, endTime = Long.MIN_VALUE;
+ double minValue = Double.MAX_VALUE, maxValue = Double.MIN_VALUE;
+ // TODO: 构造原始数据的reader,计算min/maxValue
+ while (seriesReader.hasNextFile()) {
+ Statistics fileStatistic = seriesReader.currentFileStatistics();
+ startTime = Math.min(fileStatistic.getStartTime(), startTime);
+ endTime = Math.max(fileStatistic.getEndTime(), endTime);
+ minValue = Math.min((Double) fileStatistic.getMinValue(), minValue);
+ maxValue = Math.max((Double) fileStatistic.getMaxValue(), maxValue);
+ seriesReader.skipCurrentFile();
+ }
+ startTime = startTime / delta * delta;
+ endTime = (endTime / delta + 1) * delta;
+ minValue = Math.floor(minValue / gamma) * gamma;
+ maxValue = Math.floor(maxValue / gamma + 1) * gamma;
+
+ int bucketsCnt = (int) ((maxValue - minValue) / gamma);
+ int segsCnt = (int) ((endTime - startTime) / delta);
+ System.out.println("bucketsCnt:" + bucketsCnt);
+ System.out.println("segsCnt:" + segsCnt);
+ List<SeriesAggregateReader> seriesReaderList = new ArrayList<>();
+
+ for (int f = 0; f < fileNum; f++) {
+ // construct readers for each series (bucket)
+ for (int bucketIndex = 0; bucketIndex < bucketsCnt; bucketIndex++) {
+ String newPath = seriesPath.getFullPath() + "b.b" + bucketIndex;
+ if (f > 0) {
+ String oldDatasetName = newPath.split("\\.")[1];
+ String newDatasetName = oldDatasetName + Integer.toString(f);
+ newPath = newPath.replace(oldDatasetName, newDatasetName);
+ try {
+ MeasurementPath measurementPath = new MeasurementPath(newPath);
+ QueryResourceManager queryResourceManager = QueryResourceManager.getInstance();
+ QueryContext newContext = new QueryContext(queryResourceManager.assignQueryId(true));
+ QueryDataSource newQueryDataSource =
+ queryResourceManager.getQueryDataSource(measurementPath, newContext, timeFilter);
+ SeriesAggregateReader curSeriesReader =
+ new SeriesAggregateReader(
+ measurementPath,
+ Collections.singleton("b" + bucketIndex),
+ tsDataType,
+ newContext,
+ newQueryDataSource,
+ timeFilter,
+ null,
+ null,
+ true);
+ seriesReaderList.add(curSeriesReader);
+ } catch (IllegalPathException e) {
+ System.out.println(newPath + " failed.");
+ }
+ } else {
+ try {
+ MeasurementPath measurementPath = new MeasurementPath(newPath);
+ SeriesAggregateReader curSeriesReader =
+ new SeriesAggregateReader(
+ measurementPath,
+ Collections.singleton("b" + bucketIndex),
+ tsDataType,
+ context,
+ queryDataSource,
+ timeFilter,
+ null,
+ null,
+ true);
+ seriesReaderList.add(curSeriesReader);
+ } catch (IllegalPathException e) {
+ System.out.println(newPath + " failed.");
+ }
+ }
+ }
+ }
+
+ // if (fileNum == 1) {
+ // List<Pair<Long, Double>> outlierList = new ArrayList<>();
+ //
+ // int[] mergedBuckets = new int[bucketsCnt];
+ // Queue<List<Integer>> curWindowBuckets = new LinkedList<>();
+ // int lambda = (int) Math.ceil(r / gamma), ell = (int) Math.floor(r / gamma);
+ // int upper, lower, tightUpper;
+ //
+ // int segIndex = 0;
+ // int[] readFlags = new int[bucketsCnt];
+ // SeriesAggregateReader[] seriesReaders = new SeriesAggregateReader[bucketsCnt];
+ // Long[] startTimes = new Long[bucketsCnt];
+ // while (segIndex + w < segsCnt) {
+ // // update buckets, mergedBuckets, curWindowBuckets
+ // List<Integer> buckets = new ArrayList<>();
+ // for (int bucketIndex = 0; bucketIndex < bucketsCnt; bucketIndex++) {
+ //
+ // SeriesAggregateReader curSeriesReader = seriesReaderList.get(bucketIndex);
+ // int readFlag = readFlags[bucketIndex];
+ // boolean finish = false;
+ // while (!finish) {
+ // switch (readFlag) {
+ // case 0:
+ // if (curSeriesReader.hasNextFile()) {
+ // readFlag = 1;
+ // } else {
+ // readFlag = -1;
+ // }
+ // break;
+ // case 1:
+ // if (curSeriesReader.hasNextChunk()) {
+ // readFlag = 2;
+ // } else {
+ // readFlag = 0;
+ // }
+ // break;
+ // case 2:
+ // if (curSeriesReader.hasNextPage()) {
+ // long curStartTime = curSeriesReader.currentPageStatistics().getStartTime();
+ // if (curStartTime >= segIndex * delta && curStartTime < (segIndex + 1) *
+ // delta) {
+ // Statistics pageStatistic = curSeriesReader.currentPageStatistics();
+ // buckets.add(pageStatistic.getCount());
+ // mergedBuckets[bucketIndex] += pageStatistic.getCount();
+ // curSeriesReader.skipCurrentPage();
+ // } else {
+ // buckets.add(0);
+ // }
+ // finish = true;
+ // } else {
+ // readFlag = 1;
+ // }
+ // break;
+ // case -1:
+ // buckets.add(0);
+ // finish = true;
+ // break;
+ // }
+ // }
+ // readFlags[bucketIndex] = readFlag;
+ // }
+ // if (segIndex >= w) {
+ // for (int bucketIndex = 0; bucketIndex < bucketsCnt; bucketIndex++) {
+ // mergedBuckets[bucketIndex] =
+ // mergedBuckets[bucketIndex] - curWindowBuckets.peek().get(bucketIndex);
+ // }
+ // curWindowBuckets.poll();
+ // }
+ // curWindowBuckets.add(buckets);
+ //
+ // if (segIndex < w - 1 || (segIndex - w + 1) % s != 0) {
+ // segIndex++;
+ // continue;
+ // }
+ //
+ // segIndex++;
+ //
+ // // update bounds and outlier detection
+ // for (int bucketIndex = 0; bucketIndex < bucketsCnt; bucketIndex++) {
+ // upper = lower = tightUpper = 0;
+ // for (int tmpIndex = Math.max(0, bucketIndex - ell + 1);
+ // tmpIndex <= Math.min(bucketsCnt - 1, bucketIndex + ell - 1);
+ // tmpIndex++) {
+ // lower += mergedBuckets[tmpIndex];
+ // }
+ // for (int tmpIndex = Math.max(0, bucketIndex - lambda);
+ // tmpIndex <= Math.min(bucketsCnt - 1, bucketIndex + lambda);
+ // tmpIndex++) {
+ // upper += mergedBuckets[tmpIndex];
+ // }
+ // if (bucketIndex - lambda < 0 && bucketIndex + lambda < bucketsCnt)
+ // tightUpper = upper - mergedBuckets[bucketIndex + lambda];
+ // else if (bucketIndex + lambda > bucketsCnt - 1 && bucketIndex - lambda >= 0)
+ // tightUpper = upper - mergedBuckets[bucketIndex - lambda];
+ // else if (bucketIndex + lambda < bucketsCnt && bucketIndex - lambda >= 0) {
+ // tightUpper =
+ // Math.max(
+ // upper - mergedBuckets[bucketIndex + lambda],
+ // upper - mergedBuckets[bucketIndex - lambda]);
+ // } else tightUpper = upper;
+ // // TODO: outlier detection
+ // if (mergedBuckets[bucketIndex] == 0) continue;
+ // if (lower >= k) {
+ // // System.out.println("All points in this buckets are inliers");
+ // continue;
+ // } else if ((lambda - r / gamma < 0.5 && upper < k)
+ // || (lambda - r / gamma >= 0.5 && tightUpper < k)) {
+ // // TODO: load outliers from PageData
+ // System.out.println("load outliers from PageData");
+ // System.out.println(upper);
+ // String newPath = seriesPath.getFullPath() + "b.b" + bucketIndex;
+ // try {
+ // MeasurementPath measurementPath = new MeasurementPath(newPath);
+ // QueryResourceManager queryResourceManager = QueryResourceManager.getInstance();
+ // QueryContext newContext = new
+ // QueryContext(queryResourceManager.assignQueryId(true));
+ // QueryDataSource newQueryDataSource =
+ // queryResourceManager.getQueryDataSource(measurementPath, newContext,
+ // timeFilter);
+ // seriesReader =
+ // new SeriesAggregateReader(
+ // measurementPath,
+ // Collections.singleton("b" + bucketIndex),
+ // tsDataType,
+ // newContext,
+ // newQueryDataSource,
+ // timeFilter,
+ // null,
+ // null,
+ // true);
+ //
+ // } catch (IllegalPathException e) {
+ // System.out.println(newPath + " failed.");
+ // }
+ //
+ // boolean finished = false;
+ // while (seriesReader.hasNextFile()) {
+ // while (seriesReader.hasNextChunk()) {
+ // while (seriesReader.hasNextPage()) {
+ // Statistics curStatistics = seriesReader.currentPageStatistics();
+ // if (curStatistics.getStartTime() < (long) (segIndex - w) * delta) {
+ // seriesReader.nextPage();
+ // continue;
+ // }
+ // IBatchDataIterator batchIterator =
+ // seriesReader.nextPage().getBatchDataIterator();
+ // while (batchIterator.hasNext()) {
+ // long t = batchIterator.currentTime();
+ // if (t >= ((long) segIndex * delta)) {
+ // finished = true;
+ // break;
+ // } else {
+ // outlierList.add(
+ // new Pair<>(
+ // batchIterator.currentTime(), (double)
+ // batchIterator.currentValue()));
+ // // System.out.println(batchIterator.currentTime());
+ // batchIterator.next();
+ // }
+ // }
+ // if (finished) break;
+ // }
+ // if (finished) break;
+ // }
+ // if (finished) break;
+ // }
+ // } else { // TODO: load buckets [bucketIndex] [bucketIndex-lambda]
+ // [bucketIndex+lambda]
+ // // from
+ // // [segIndex] to [segIndex+w]
+ // // load bucket B[bucketIndex]
+ // System.out.println("load and check");
+ // List<Pair<Long, Double>> suspiciousPoints = new ArrayList<>();
+ // List<Pair<Long, Double>> checkPoints = new ArrayList<>();
+ // // load bucket B[bucketIndex] into suspiciousPoints
+ // String newPath = seriesPath.getFullPath() + "b.b" + bucketIndex;
+ // try {
+ // MeasurementPath measurementPath = new MeasurementPath(newPath);
+ // QueryResourceManager queryResourceManager = QueryResourceManager.getInstance();
+ // QueryContext newContext = new
+ // QueryContext(queryResourceManager.assignQueryId(true));
+ // QueryDataSource newQueryDataSource =
+ // queryResourceManager.getQueryDataSource(measurementPath, newContext,
+ // timeFilter);
+ // seriesReader =
+ // new SeriesAggregateReader(
+ // measurementPath,
+ // Collections.singleton("b" + bucketIndex),
+ // tsDataType,
+ // newContext,
+ // newQueryDataSource,
+ // timeFilter,
+ // null,
+ // null,
+ // true);
+ // } catch (IllegalPathException e) {
+ // System.out.println(newPath + " failed.");
+ // }
+ //
+ // boolean finished = false;
+ // while (seriesReader.hasNextFile()) {
+ // while (seriesReader.hasNextChunk()) {
+ // while (seriesReader.hasNextPage()) {
+ // Statistics curStatistics = seriesReader.currentPageStatistics();
+ // if (curStatistics.getStartTime() < (segIndex - w) * delta) {
+ // seriesReader.nextPage();
+ // continue;
+ // } else {
+ // IBatchDataIterator batchIterator =
+ // seriesReader.nextPage().getBatchDataIterator();
+ // while (batchIterator.hasNext()) {
+ // long t = batchIterator.currentTime();
+ // if (t >= (long) (segIndex * delta)) {
+ // finished = true;
+ // break;
+ // } else {
+ // suspiciousPoints.add(
+ // new Pair<>(
+ // batchIterator.currentTime(),
+ // (double) batchIterator.currentValue()));
+ // batchIterator.next();
+ // }
+ // }
+ // if (finished) break;
+ // }
+ // }
+ // if (finished) break;
+ // }
+ // if (finished) break;
+ // }
+ // // load buckets B[bucketIndex-lambda], B[bucketIndex+lambda] into checkPoints
+ // for (int index : new int[] {bucketIndex - lambda, bucketIndex + lambda}) {
+ // newPath = seriesPath.getFullPath() + "b.b" + index;
+ // try {
+ //
+ // MeasurementPath measurementPath = new MeasurementPath(newPath);
+ // QueryResourceManager queryResourceManager =
+ // QueryResourceManager.getInstance();
+ // QueryContext newContext =
+ // new QueryContext(queryResourceManager.assignQueryId(true));
+ // QueryDataSource newQueryDataSource =
+ // queryResourceManager.getQueryDataSource(
+ // measurementPath, newContext, timeFilter);
+ // seriesReader =
+ // new SeriesAggregateReader(
+ // measurementPath,
+ // Collections.singleton("b" + index),
+ // tsDataType,
+ // newContext,
+ // newQueryDataSource,
+ // timeFilter,
+ // null,
+ // null,
+ // true);
+ // } catch (IllegalPathException e) {
+ // System.out.println(newPath + " failed.");
+ // }
+ //
+ // while (seriesReader.hasNextFile()) {
+ // while (seriesReader.hasNextChunk()) {
+ // while (seriesReader.hasNextPage()) {
+ // Statistics curStatistics = seriesReader.currentPageStatistics();
+ // if (curStatistics.getStartTime() < (segIndex - w) * delta) {
+ // seriesReader.nextPage();
+ // continue;
+ // }
+ // IBatchDataIterator batchIterator =
+ // seriesReader.nextPage().getBatchDataIterator();
+ // while (batchIterator.hasNext()) {
+ // checkPoints.add(
+ // new Pair<>(
+ // batchIterator.currentTime(), (double)
+ // batchIterator.currentValue()));
+ // if (batchIterator.hasNext()) {
+ // batchIterator.next();
+ // if (batchIterator.currentTime() >= segIndex * delta) {
+ // finished = true;
+ // break;
+ // }
+ // } else {
+ // break;
+ // }
+ // }
+ // if (finished) break;
+ // }
+ // if (finished) break;
+ // }
+ // if (finished) break;
+ // }
+ // }
+ // // detection
+ // int trueNeighbor = lower;
+ // for (Pair<Long, Double> p1 : suspiciousPoints) {
+ // for (Pair<Long, Double> p2 : checkPoints) {
+ // if (Math.abs(p1.getSecond() - p2.getSecond()) <= r) trueNeighbor += 1;
+ // }
+ // if (trueNeighbor < k) {
+ // outlierList.add(p1);
+ // }
+ // }
+ // }
+ // }
+ // }
+ // System.out.println("Outliers Num:" + outlierList.size());
+ // // for (Pair<Long, Double> p : outlierList) {
+ // //// System.out.println(p.getSecond());
+ // // }
+ // }
+
+ Map<Long, Double> outliers = new HashMap<>();
+
+ int lambda = (int) Math.ceil(r / gamma), ell = (int) Math.floor(r / gamma);
+ int upper, lower, tightUpper;
+
+ int segIndex = 0;
+ int[] readFlags = new int[bucketsCnt * fileNum];
+ int[][] merged = new int[fileNum][bucketsCnt];
+ Queue<int[][]> expired = new LinkedList<>();
+
+ while (segIndex < segsCnt) {
+ // update buckets, mergedBuckets, curWindowBuckets
+ int[][] current = new int[fileNum][bucketsCnt];
+ for (int fileIndex = 0; fileIndex < fileNum; fileIndex++) {
+ for (int bucketIndex = 0; bucketIndex < bucketsCnt; bucketIndex++) {
+ SeriesAggregateReader curSeriesReader =
+ seriesReaderList.get(bucketIndex + fileIndex * bucketsCnt);
+ int readFlag = readFlags[bucketIndex + fileIndex * bucketsCnt];
+ boolean finish = false;
+ while (!finish) {
+ switch (readFlag) {
+ case 0:
+ if (curSeriesReader.hasNextFile()) {
+ readFlag = 1;
+ } else {
+ readFlag = -1;
+ }
+ break;
+ case 1:
+ if (curSeriesReader.hasNextChunk()) {
+ readFlag = 2;
+ } else {
+ readFlag = 0;
+ }
+ break;
+ case 2:
+ if (curSeriesReader.hasNextPage()) {
+ long curStartTime = curSeriesReader.currentPageStatistics().getStartTime();
+ if (curStartTime >= segIndex * delta && curStartTime < (segIndex + 1) * delta) {
+ Statistics pageStatistic = curSeriesReader.currentPageStatistics();
+ // update incoming
+ current[fileIndex][bucketIndex] = pageStatistic.getCount();
+ if (segIndex < w) {
+ merged[fileIndex][bucketIndex] += pageStatistic.getCount();
+ } else {
+ merged[fileIndex][bucketIndex] =
+ merged[fileIndex][bucketIndex]
+ - expired.peek()[fileIndex][bucketIndex]
+ + pageStatistic.getCount();
+ }
+ curSeriesReader.skipCurrentPage();
+ }
+ finish = true;
+ } else {
+ readFlag = 1;
+ }
+ break;
+ case -1:
+ finish = true;
+ break;
+ }
+ }
+ readFlags[bucketIndex + fileIndex * bucketsCnt] = readFlag;
+ }
+ }
+ if (segIndex >= w) {
+ expired.poll();
+ }
+ expired.add(current);
+ if (segIndex == 0 || (segIndex - w + 1) % s != 0) {
+ segIndex++;
+ continue;
+ }
+ segIndex++;
+
+ // update bounds and outlier detection
+ int[] bucketsCheck = new int[bucketsCnt];
+ int[] bucketsHat = new int[bucketsCnt];
+ if (fileNum > 1) {
+ for (int bucketIndex = 0; bucketIndex < bucketsCnt; bucketIndex++) {
+ for (int fileIndex = 0; fileIndex < fileNum; fileIndex++) {
+ bucketsHat[bucketIndex] += merged[fileIndex][bucketIndex];
+ if (fileIndex == 0 || merged[fileIndex][bucketIndex] > 0) {
+ bucketsCheck[bucketIndex] = merged[fileIndex][bucketIndex];
+ }
+ }
+ }
+ }
+
+ for (int bucketIndex = 0; bucketIndex < bucketsCnt; bucketIndex++) {
+ upper = lower = tightUpper = 0;
+ if (fileNum == 1) {
+ for (int tmpIndex = Math.max(0, bucketIndex - ell + 1);
+ tmpIndex <= Math.min(bucketsCnt - 1, bucketIndex + ell - 1);
+ tmpIndex++) lower += merged[0][tmpIndex];
+ for (int tmpIndex = Math.max(0, bucketIndex - lambda);
+ tmpIndex <= Math.min(bucketsCnt - 1, bucketIndex + lambda);
+ tmpIndex++) upper += merged[0][tmpIndex];
+
+ if (bucketIndex - lambda < 0 && bucketIndex + lambda < bucketsCnt)
+ tightUpper = upper - merged[0][bucketIndex + lambda];
+ else if (bucketIndex + lambda > bucketsCnt - 1 && bucketIndex - lambda >= 0)
+ tightUpper = upper - merged[0][bucketIndex - lambda];
+ else if (bucketIndex + lambda < bucketsCnt && bucketIndex - lambda >= 0) {
+ tightUpper =
+ Math.max(
+ upper - merged[0][bucketIndex + lambda],
+ upper - merged[0][bucketIndex - lambda]);
+ } else tightUpper = upper;
+ }
+
+ if (fileNum > 1) {
+ for (int tmpIndex = Math.max(0, bucketIndex - ell + 1);
+ tmpIndex <= Math.min(bucketsCnt - 1, bucketIndex + ell - 1);
+ tmpIndex++) {
+ lower += bucketsCheck[tmpIndex];
+ }
+ for (int tmpIndex = Math.max(0, bucketIndex - lambda);
+ tmpIndex <= Math.min(bucketsCnt - 1, bucketIndex + lambda);
+ tmpIndex++) {
+ upper += bucketsHat[tmpIndex];
+ }
+ if (bucketIndex - lambda < 0 && bucketIndex + lambda < bucketsCnt)
+ tightUpper = upper - bucketsHat[bucketIndex + lambda];
+ else if (bucketIndex + lambda > bucketsCnt - 1 && bucketIndex - lambda >= 0)
+ tightUpper = upper - bucketsHat[bucketIndex - lambda];
+ else if (bucketIndex + lambda < bucketsCnt && bucketIndex - lambda >= 0) {
+ tightUpper =
+ Math.max(
+ upper - bucketsHat[bucketIndex + lambda],
+ upper - bucketsHat[bucketIndex - lambda]);
+ } else tightUpper = upper;
+ }
+
+ // TODO: outlier detection
+ if ((fileNum == 1 && merged[0][bucketIndex] == 0)
+ || (fileNum > 1 && bucketsHat[bucketIndex] == 0)) continue;
+ if (lower >= k) {
+ continue;
+ } else if (ifWithUpperBound
+ && ((lambda - r / gamma < 0.5 && upper < k)
+ || (lambda - r / gamma >= 0.5 && tightUpper < k))) {
+ // TODO: load outliers from PageData
+ System.out.println("load outliers from PageData");
+ for (int f = fileNum - 1; f >= 0; f--) {
+ String newPath = seriesPath.getFullPath() + "b.b" + bucketIndex;
+ if (f > 0) {
+ String oldDatasetName = newPath.split("\\.")[1];
+ String newDatasetName = oldDatasetName + Integer.toString(f);
+ newPath = newPath.replace(oldDatasetName, newDatasetName);
+ }
+ try {
+ MeasurementPath measurementPath = new MeasurementPath(newPath);
+ QueryResourceManager queryResourceManager = QueryResourceManager.getInstance();
+ QueryContext newContext = new QueryContext(queryResourceManager.assignQueryId(true));
+ QueryDataSource newQueryDataSource =
+ queryResourceManager.getQueryDataSource(measurementPath, newContext, timeFilter);
+ seriesReader =
+ new SeriesAggregateReader(
+ measurementPath,
+ measurements,
+ tsDataType,
+ newContext,
+ newQueryDataSource,
+ timeFilter,
+ null,
+ null,
+ true);
+ } catch (IllegalPathException e) {
+ System.out.println(newPath + " failed.");
+ }
+
+ boolean finished = false;
+ while (seriesReader.hasNextFile()) {
+ while (seriesReader.hasNextChunk()) {
+ while (seriesReader.hasNextPage()) {
+ Statistics curStatistics = seriesReader.currentPageStatistics();
+ if (curStatistics.getStartTime() < (segIndex - w) * delta) {
+ seriesReader.nextPage();
+ continue;
+ }
+ IBatchDataIterator batchIterator = seriesReader.nextPage().getBatchDataIterator();
+ while (batchIterator.hasNext()) {
+ if (!outliers.containsKey(batchIterator.currentTime())) {
+ outliers.put(
+ batchIterator.currentTime(), (double) batchIterator.currentValue());
+ }
+ batchIterator.next();
+ if (batchIterator.currentTime() >= (long) segIndex * delta) {
+ finished = true;
+ break;
+ }
+ }
+ if (finished) break;
+ }
+ if (finished) break;
+ }
+ if (finished) break;
+ }
+ }
+ } else {
+ // TODO: load buckets [bucketIndex] [bucketIndex-lambda] [bucketIndex+lambda]
+ System.out.println("load and check");
+
+ Map<Long, Double> suspiciousPoints = new HashMap<>();
+ Map<Long, Double> checkPoints = new HashMap<>();
+ // load bucket B[bucketIndex] into suspiciousPoints
+ for (int f = fileNum - 1; f >= 0; f--) {
+ String newPath = seriesPath.getFullPath() + "b.b" + bucketIndex;
+ if (f > 0) {
+ String oldDatasetName = newPath.split("\\.")[1];
+ String newDatasetName = oldDatasetName + Integer.toString(f);
+ newPath = newPath.replace(oldDatasetName, newDatasetName);
+ }
+ try {
+ MeasurementPath measurementPath = new MeasurementPath(newPath);
+ QueryResourceManager queryResourceManager = QueryResourceManager.getInstance();
+ QueryContext newContext = new QueryContext(queryResourceManager.assignQueryId(true));
+ QueryDataSource newQueryDataSource =
+ queryResourceManager.getQueryDataSource(measurementPath, newContext, timeFilter);
+ seriesReader =
+ new SeriesAggregateReader(
+ measurementPath,
+ measurements,
+ tsDataType,
+ newContext,
+ newQueryDataSource,
+ timeFilter,
+ null,
+ null,
+ true);
+ } catch (IllegalPathException e) {
+ System.out.println(newPath + " failed.");
+ }
+
+ boolean finished = false;
+ while (seriesReader.hasNextFile()) {
+ while (seriesReader.hasNextChunk()) {
+ while (seriesReader.hasNextPage()) {
+ Statistics curStatistics = seriesReader.currentPageStatistics();
+ if (curStatistics.getStartTime() < (long) (segIndex - w) * delta) {
+ seriesReader.nextPage();
+ continue;
+ } else {
+ IBatchDataIterator batchIterator =
+ seriesReader.nextPage().getBatchDataIterator();
+ while (batchIterator.hasNext()) {
+ if (!suspiciousPoints.containsKey(batchIterator.currentTime())) {
+ suspiciousPoints.put(
+ batchIterator.currentTime(), (double) batchIterator.currentValue());
+ }
+ batchIterator.next();
+ if (batchIterator.currentTime() >= (long) segIndex * delta) {
+ finished = true;
+ break;
+ }
+ }
+ if (finished) break;
+ }
+ }
+ if (finished) break;
+ }
+ if (finished) break;
+ }
+ }
+ // load buckets B[bucketIndex-lambda], B[bucketIndex+lambda] into checkPoints
+ for (int f = fileNum - 1; f >= 0; f--) {
+ for (int index :
+ new int[] {
+ bucketIndex - lambda, bucketIndex + lambda, bucketIndex - ell, bucketIndex + ell
+ }) {
+ if (index < 0 || index > bucketsCnt - 1) continue;
+ String newPath = seriesPath.getFullPath() + "b.b" + index;
+ if (f > 0) {
+ String oldDatasetName = newPath.split("\\.")[1];
+ String newDatasetName = oldDatasetName + Integer.toString(f);
+ newPath = newPath.replace(oldDatasetName, newDatasetName);
+ }
+ try {
+ MeasurementPath measurementPath = new MeasurementPath(newPath);
+ QueryResourceManager queryResourceManager = QueryResourceManager.getInstance();
+ QueryContext newContext =
+ new QueryContext(queryResourceManager.assignQueryId(true));
+ QueryDataSource newQueryDataSource =
+ queryResourceManager.getQueryDataSource(
+ measurementPath, newContext, timeFilter);
+ seriesReader =
+ new SeriesAggregateReader(
+ measurementPath,
+ measurements,
+ tsDataType,
+ newContext,
+ newQueryDataSource,
+ timeFilter,
+ null,
+ null,
+ true);
+ } catch (IllegalPathException e) {
+ System.out.println(newPath + " failed.");
+ }
+ boolean finished = false;
+ while (seriesReader.hasNextFile()) {
+ while (seriesReader.hasNextChunk()) {
+ while (seriesReader.hasNextPage()) {
+ Statistics curStatistics = seriesReader.currentPageStatistics();
+ if (curStatistics.getStartTime() < (segIndex - w) * delta) {
+ seriesReader.nextPage();
+ continue;
+ }
+ IBatchDataIterator batchIterator =
+ seriesReader.nextPage().getBatchDataIterator();
+ while (batchIterator.hasNext()) {
+ if (!checkPoints.containsKey(batchIterator.currentTime())) {
+ checkPoints.put(
+ batchIterator.currentTime(), (double) batchIterator.currentValue());
+ }
+ batchIterator.next();
+ if (batchIterator.currentTime() >= (long) segIndex * delta) {
+ finished = true;
+ break;
+ }
+ }
+ if (finished) break;
+ }
+ if (finished) break;
+ }
+ if (finished) break;
+ }
+ }
+ }
+ // detection
+ int trueNeighbor = lower;
+ for (Long t1 : suspiciousPoints.keySet()) {
+ for (Long t2 : checkPoints.keySet()) {
+ if (Math.abs(suspiciousPoints.get(t1) - checkPoints.get(t2)) <= r) trueNeighbor += 1;
+ }
+ if (trueNeighbor < k) outliers.put(t1, suspiciousPoints.get(t1));
+ }
+ }
+ }
+ }
+ for (Long o : outliers.keySet()){
+ System.out.println(o);
}
+ System.out.println("Outliers Num:" + outliers.keySet().size());
}
protected void aggregateOneAlignedSeries(
diff --git a/server/src/main/java/org/apache/iotdb/db/query/factory/AggregateResultFactory.java b/server/src/main/java/org/apache/iotdb/db/query/factory/AggregateResultFactory.java
index 987cdcd451..9894676046 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/factory/AggregateResultFactory.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/factory/AggregateResultFactory.java
@@ -67,6 +67,8 @@ public class AggregateResultFactory {
return !ascending
? new LastValueDescAggrResult(dataType)
: new LastValueAggrResult(dataType);
+ case SQLConstant.DODDS:
+ return new DoddsAggrResult(dataType);
default:
throw new IllegalArgumentException("Invalid Aggregation function: " + aggrFuncName);
}
@@ -98,6 +100,8 @@ public class AggregateResultFactory {
return new SumAggrResult(dataType);
case SQLConstant.LAST_VALUE:
return new LastValueDescAggrResult(dataType);
+ case SQLConstant.DODDS:
+ return new DoddsAggrResult(dataType);
default:
throw new IllegalArgumentException("Invalid Aggregation function: " + aggrFuncName);
}
@@ -130,6 +134,8 @@ public class AggregateResultFactory {
return new MinValueAggrResult(dataType);
case EXTREME:
return new ExtremeAggrResult(dataType);
+ case DODDS:
+ return new DoddsAggrResult(dataType);
default:
throw new IllegalArgumentException("Invalid Aggregation Type: " + aggregationType.name());
}
diff --git a/server/src/main/java/org/apache/iotdb/db/query/filter/executor/IPlanExecutor.java b/server/src/main/java/org/apache/iotdb/db/query/filter/executor/IPlanExecutor.java
new file mode 100644
index 0000000000..4ef688a7e3
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/query/filter/executor/IPlanExecutor.java
@@ -0,0 +1,134 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.db.query.filter.executor;
+
+import org.apache.iotdb.db.engine.storagegroup.VirtualStorageGroupProcessor.TimePartitionFilter;
+import org.apache.iotdb.db.exception.BatchProcessException;
+import org.apache.iotdb.db.exception.StorageEngineException;
+import org.apache.iotdb.db.exception.metadata.MetadataException;
+import org.apache.iotdb.db.exception.metadata.StorageGroupNotSetException;
+import org.apache.iotdb.db.exception.query.QueryProcessException;
+import org.apache.iotdb.db.metadata.path.PartialPath;
+import org.apache.iotdb.db.qp.physical.PhysicalPlan;
+import org.apache.iotdb.db.qp.physical.crud.DeletePlan;
+import org.apache.iotdb.db.qp.physical.crud.InsertMultiTabletPlan;
+import org.apache.iotdb.db.qp.physical.crud.InsertRowPlan;
+import org.apache.iotdb.db.qp.physical.crud.InsertRowsOfOneDevicePlan;
+import org.apache.iotdb.db.qp.physical.crud.InsertRowsPlan;
+import org.apache.iotdb.db.qp.physical.crud.InsertTabletPlan;
+import org.apache.iotdb.db.query.context.QueryContext;
+import org.apache.iotdb.tsfile.exception.filter.QueryFilterOptimizationException;
+import org.apache.iotdb.tsfile.read.query.dataset.QueryDataSet;
+
+import org.apache.thrift.TException;
+
+import java.io.IOException;
+import java.sql.SQLException;
+
+public interface IPlanExecutor {
+
+ /**
+ * process query plan of qp layer, construct queryDataSet.
+ *
+ * @param queryPlan QueryPlan
+ * @return QueryDataSet
+ */
+ QueryDataSet processQuery(PhysicalPlan queryPlan, QueryContext context)
+ throws IOException, StorageEngineException, QueryFilterOptimizationException,
+ QueryProcessException, MetadataException, SQLException, TException, InterruptedException;
+
+ /**
+ * Process Non-Query Physical plan, including insert/update/delete operation of
+ * data/metadata/Privilege
+ *
+ * @param plan Physical Non-Query Plan
+ */
+ boolean processNonQuery(PhysicalPlan plan)
+ throws QueryProcessException, StorageGroupNotSetException, StorageEngineException;
+
+ /**
+ * execute update command and return whether the operator is successful.
+ *
+ * @param path : update series seriesPath
+ * @param startTime start time in update command
+ * @param endTime end time in update command
+ * @param value - in type of string
+ */
+ void update(PartialPath path, long startTime, long endTime, String value)
+ throws QueryProcessException;
+
+ /**
+ * execute delete command and return whether the operator is successful.
+ *
+ * @param deletePlan physical delete plan
+ */
+ void delete(DeletePlan deletePlan) throws QueryProcessException;
+
+ /**
+ * execute delete command and return whether the operator is successful.
+ *
+ * @param path delete series seriesPath
+ * @param startTime start time in delete command
+ * @param endTime end time in delete command
+ * @param planIndex index of the deletion plan
+ * @param partitionFilter specify involving time partitions, if null, all partitions are involved
+ */
+ void delete(
+ PartialPath path,
+ long startTime,
+ long endTime,
+ long planIndex,
+ TimePartitionFilter partitionFilter)
+ throws QueryProcessException;
+
+ /**
+ * execute insert command and return whether the operator is successful.
+ *
+ * @param insertRowPlan physical insert plan
+ */
+ void insert(InsertRowPlan insertRowPlan) throws QueryProcessException;
+
+ /**
+ * execute insert command and return whether the operator is successful.
+ *
+ * @param insertRowsPlan physical insert rows plan, which contains multi insertRowPlans
+ */
+ void insert(InsertRowsPlan insertRowsPlan) throws QueryProcessException;
+
+ /**
+ * execute insert command and return whether the operator is successful.
+ *
+ * @param insertRowsOfOneDevicePlan physical insert plan
+ */
+ void insert(InsertRowsOfOneDevicePlan insertRowsOfOneDevicePlan) throws QueryProcessException;
+
+ /**
+ * execute batch insert plan
+ *
+ * @throws BatchProcessException when some of the rows failed
+ */
+ void insertTablet(InsertTabletPlan insertTabletPlan) throws QueryProcessException;
+
+ /**
+ * execute multi batch insert plan
+ *
+ * @throws QueryProcessException when some of the rows failed
+ */
+ void insertTablet(InsertMultiTabletPlan insertMultiTabletPlan) throws QueryProcessException;
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/query/filter/executor/PlanExecutor.java b/server/src/main/java/org/apache/iotdb/db/query/filter/executor/PlanExecutor.java
new file mode 100644
index 0000000000..84f8b3c5b4
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/query/filter/executor/PlanExecutor.java
@@ -0,0 +1,2176 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.db.query.filter.executor;
+
+import org.apache.iotdb.db.auth.AuthException;
+import org.apache.iotdb.db.auth.AuthorityChecker;
+import org.apache.iotdb.db.auth.authorizer.BasicAuthorizer;
+import org.apache.iotdb.db.auth.authorizer.IAuthorizer;
+import org.apache.iotdb.db.auth.entity.PathPrivilege;
+import org.apache.iotdb.db.auth.entity.Role;
+import org.apache.iotdb.db.auth.entity.User;
+import org.apache.iotdb.db.concurrent.IoTDBThreadPoolFactory;
+import org.apache.iotdb.db.concurrent.ThreadName;
+import org.apache.iotdb.db.conf.IoTDBConstant;
+import org.apache.iotdb.db.conf.IoTDBDescriptor;
+import org.apache.iotdb.db.cq.ContinuousQueryService;
+import org.apache.iotdb.db.engine.StorageEngine;
+import org.apache.iotdb.db.engine.cache.ChunkCache;
+import org.apache.iotdb.db.engine.cache.TimeSeriesMetadataCache;
+import org.apache.iotdb.db.engine.compaction.cross.inplace.manage.MergeManager;
+import org.apache.iotdb.db.engine.compaction.cross.inplace.manage.MergeManager.TaskStatus;
+import org.apache.iotdb.db.engine.flush.pool.FlushTaskPoolManager;
+import org.apache.iotdb.db.engine.storagegroup.TsFileResource;
+import org.apache.iotdb.db.engine.storagegroup.VirtualStorageGroupProcessor.TimePartitionFilter;
+import org.apache.iotdb.db.engine.trigger.service.TriggerRegistrationService;
+import org.apache.iotdb.db.exception.BatchProcessException;
+import org.apache.iotdb.db.exception.ContinuousQueryException;
+import org.apache.iotdb.db.exception.QueryIdNotExsitException;
+import org.apache.iotdb.db.exception.StorageEngineException;
+import org.apache.iotdb.db.exception.TriggerExecutionException;
+import org.apache.iotdb.db.exception.TriggerManagementException;
+import org.apache.iotdb.db.exception.UDFRegistrationException;
+import org.apache.iotdb.db.exception.WriteProcessException;
+import org.apache.iotdb.db.exception.metadata.IllegalPathException;
+import org.apache.iotdb.db.exception.metadata.MetadataException;
+import org.apache.iotdb.db.exception.metadata.PathNotExistException;
+import org.apache.iotdb.db.exception.metadata.StorageGroupAlreadySetException;
+import org.apache.iotdb.db.exception.metadata.StorageGroupNotSetException;
+import org.apache.iotdb.db.exception.query.QueryProcessException;
+import org.apache.iotdb.db.metadata.mnode.IMeasurementMNode;
+import org.apache.iotdb.db.metadata.mnode.IStorageGroupMNode;
+import org.apache.iotdb.db.metadata.path.MeasurementPath;
+import org.apache.iotdb.db.metadata.path.PartialPath;
+import org.apache.iotdb.db.metadata.utils.MetaUtils;
+import org.apache.iotdb.db.monitor.StatMonitor;
+import org.apache.iotdb.db.qp.constant.SQLConstant;
+import org.apache.iotdb.db.qp.logical.Operator.OperatorType;
+import org.apache.iotdb.db.qp.logical.sys.AuthorOperator;
+import org.apache.iotdb.db.qp.logical.sys.AuthorOperator.AuthorType;
+import org.apache.iotdb.db.qp.physical.PhysicalPlan;
+import org.apache.iotdb.db.qp.physical.crud.AggregationPlan;
+import org.apache.iotdb.db.qp.physical.crud.AlignByDevicePlan;
+import org.apache.iotdb.db.qp.physical.crud.DeletePartitionPlan;
+import org.apache.iotdb.db.qp.physical.crud.DeletePlan;
+import org.apache.iotdb.db.qp.physical.crud.FillQueryPlan;
+import org.apache.iotdb.db.qp.physical.crud.GroupByTimeFillPlan;
+import org.apache.iotdb.db.qp.physical.crud.GroupByTimePlan;
+import org.apache.iotdb.db.qp.physical.crud.InsertMultiTabletPlan;
+import org.apache.iotdb.db.qp.physical.crud.InsertPlan;
+import org.apache.iotdb.db.qp.physical.crud.InsertRowPlan;
+import org.apache.iotdb.db.qp.physical.crud.InsertRowsOfOneDevicePlan;
+import org.apache.iotdb.db.qp.physical.crud.InsertRowsPlan;
+import org.apache.iotdb.db.qp.physical.crud.InsertTabletPlan;
+import org.apache.iotdb.db.qp.physical.crud.LastQueryPlan;
+import org.apache.iotdb.db.qp.physical.crud.QueryIndexPlan;
+import org.apache.iotdb.db.qp.physical.crud.QueryPlan;
+import org.apache.iotdb.db.qp.physical.crud.RawDataQueryPlan;
+import org.apache.iotdb.db.qp.physical.crud.UDAFPlan;
+import org.apache.iotdb.db.qp.physical.crud.UDTFPlan;
+import org.apache.iotdb.db.qp.physical.sys.ActivateTemplatePlan;
+import org.apache.iotdb.db.qp.physical.sys.AlterTimeSeriesPlan;
+import org.apache.iotdb.db.qp.physical.sys.AppendTemplatePlan;
+import org.apache.iotdb.db.qp.physical.sys.AuthorPlan;
+import org.apache.iotdb.db.qp.physical.sys.CountPlan;
+import org.apache.iotdb.db.qp.physical.sys.CreateAlignedTimeSeriesPlan;
+import org.apache.iotdb.db.qp.physical.sys.CreateContinuousQueryPlan;
+import org.apache.iotdb.db.qp.physical.sys.CreateFunctionPlan;
+import org.apache.iotdb.db.qp.physical.sys.CreateMultiTimeSeriesPlan;
+import org.apache.iotdb.db.qp.physical.sys.CreateTemplatePlan;
+import org.apache.iotdb.db.qp.physical.sys.CreateTimeSeriesPlan;
+import org.apache.iotdb.db.qp.physical.sys.CreateTriggerPlan;
+import org.apache.iotdb.db.qp.physical.sys.DataAuthPlan;
+import org.apache.iotdb.db.qp.physical.sys.DeleteStorageGroupPlan;
+import org.apache.iotdb.db.qp.physical.sys.DeleteTimeSeriesPlan;
+import org.apache.iotdb.db.qp.physical.sys.DropContinuousQueryPlan;
+import org.apache.iotdb.db.qp.physical.sys.DropFunctionPlan;
+import org.apache.iotdb.db.qp.physical.sys.DropTriggerPlan;
+import org.apache.iotdb.db.qp.physical.sys.FlushPlan;
+import org.apache.iotdb.db.qp.physical.sys.KillQueryPlan;
+import org.apache.iotdb.db.qp.physical.sys.LoadConfigurationPlan;
+import org.apache.iotdb.db.qp.physical.sys.MergePlan;
+import org.apache.iotdb.db.qp.physical.sys.OperateFilePlan;
+import org.apache.iotdb.db.qp.physical.sys.PruneTemplatePlan;
+import org.apache.iotdb.db.qp.physical.sys.SetStorageGroupPlan;
+import org.apache.iotdb.db.qp.physical.sys.SetSystemModePlan;
+import org.apache.iotdb.db.qp.physical.sys.SetTTLPlan;
+import org.apache.iotdb.db.qp.physical.sys.SetTemplatePlan;
+import org.apache.iotdb.db.qp.physical.sys.SettlePlan;
+import org.apache.iotdb.db.qp.physical.sys.ShowChildNodesPlan;
+import org.apache.iotdb.db.qp.physical.sys.ShowChildPathsPlan;
+import org.apache.iotdb.db.qp.physical.sys.ShowDevicesPlan;
+import org.apache.iotdb.db.qp.physical.sys.ShowFunctionsPlan;
+import org.apache.iotdb.db.qp.physical.sys.ShowLockInfoPlan;
+import org.apache.iotdb.db.qp.physical.sys.ShowPlan;
+import org.apache.iotdb.db.qp.physical.sys.ShowStorageGroupPlan;
+import org.apache.iotdb.db.qp.physical.sys.ShowTTLPlan;
+import org.apache.iotdb.db.qp.physical.sys.ShowTimeSeriesPlan;
+import org.apache.iotdb.db.qp.physical.sys.StartTriggerPlan;
+import org.apache.iotdb.db.qp.physical.sys.StopTriggerPlan;
+import org.apache.iotdb.db.qp.physical.sys.UnsetTemplatePlan;
+import org.apache.iotdb.db.query.context.QueryContext;
+import org.apache.iotdb.db.query.control.QueryTimeManager;
+import org.apache.iotdb.db.query.dataset.AlignByDeviceDataSet;
+import org.apache.iotdb.db.query.dataset.ListDataSet;
+import org.apache.iotdb.db.query.dataset.ShowContinuousQueriesResult;
+import org.apache.iotdb.db.query.dataset.ShowDevicesDataSet;
+import org.apache.iotdb.db.query.dataset.ShowTimeseriesDataSet;
+import org.apache.iotdb.db.query.dataset.SingleDataSet;
+import org.apache.iotdb.db.query.executor.IQueryRouter;
+import org.apache.iotdb.db.query.executor.QueryRouter;
+import org.apache.iotdb.db.query.udf.service.UDFRegistrationInformation;
+import org.apache.iotdb.db.query.udf.service.UDFRegistrationService;
+import org.apache.iotdb.db.service.IoTDB;
+import org.apache.iotdb.db.service.SettleService;
+import org.apache.iotdb.db.tools.TsFileRewriteTool;
+import org.apache.iotdb.db.utils.AuthUtils;
+import org.apache.iotdb.db.utils.FileLoaderUtils;
+import org.apache.iotdb.db.utils.TypeInferenceUtils;
+import org.apache.iotdb.db.utils.UpgradeUtils;
+import org.apache.iotdb.rpc.RpcUtils;
+import org.apache.iotdb.rpc.TSStatusCode;
+import org.apache.iotdb.service.rpc.thrift.TSStatus;
+import org.apache.iotdb.tsfile.common.constant.TsFileConstant;
+import org.apache.iotdb.tsfile.exception.filter.QueryFilterOptimizationException;
+import org.apache.iotdb.tsfile.exception.write.UnSupportedDataTypeException;
+import org.apache.iotdb.tsfile.file.metadata.ChunkGroupMetadata;
+import org.apache.iotdb.tsfile.file.metadata.ChunkMetadata;
+import org.apache.iotdb.tsfile.file.metadata.TimeseriesMetadata;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+import org.apache.iotdb.tsfile.read.TsFileSequenceReader;
+import org.apache.iotdb.tsfile.read.common.Field;
+import org.apache.iotdb.tsfile.read.common.Path;
+import org.apache.iotdb.tsfile.read.common.RowRecord;
+import org.apache.iotdb.tsfile.read.query.dataset.EmptyDataSet;
+import org.apache.iotdb.tsfile.read.query.dataset.QueryDataSet;
+import org.apache.iotdb.tsfile.utils.Binary;
+import org.apache.iotdb.tsfile.utils.Pair;
+import org.apache.iotdb.tsfile.write.schema.IMeasurementSchema;
+import org.apache.iotdb.tsfile.write.writer.RestorableTsFileIOWriter;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.io.IOException;
+import java.lang.reflect.InvocationTargetException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Set;
+import java.util.concurrent.Future;
+import java.util.concurrent.ThreadPoolExecutor;
+
+import static org.apache.iotdb.db.conf.IoTDBConstant.COLUMN_CANCELLED;
+import static org.apache.iotdb.db.conf.IoTDBConstant.COLUMN_CHILD_NODES;
+import static org.apache.iotdb.db.conf.IoTDBConstant.COLUMN_CHILD_PATHS;
+import static org.apache.iotdb.db.conf.IoTDBConstant.COLUMN_COLUMN;
+import static org.apache.iotdb.db.conf.IoTDBConstant.COLUMN_CONTINUOUS_QUERY_EVERY_INTERVAL;
+import static org.apache.iotdb.db.conf.IoTDBConstant.COLUMN_CONTINUOUS_QUERY_FOR_INTERVAL;
+import static org.apache.iotdb.db.conf.IoTDBConstant.COLUMN_CONTINUOUS_QUERY_NAME;
+import static org.apache.iotdb.db.conf.IoTDBConstant.COLUMN_CONTINUOUS_QUERY_QUERY_SQL;
+import static org.apache.iotdb.db.conf.IoTDBConstant.COLUMN_CONTINUOUS_QUERY_TARGET_PATH;
+import static org.apache.iotdb.db.conf.IoTDBConstant.COLUMN_COUNT;
+import static org.apache.iotdb.db.conf.IoTDBConstant.COLUMN_CREATED_TIME;
+import static org.apache.iotdb.db.conf.IoTDBConstant.COLUMN_DEVICES;
+import static org.apache.iotdb.db.conf.IoTDBConstant.COLUMN_DONE;
+import static org.apache.iotdb.db.conf.IoTDBConstant.COLUMN_FUNCTION_CLASS;
+import static org.apache.iotdb.db.conf.IoTDBConstant.COLUMN_FUNCTION_NAME;
+import static org.apache.iotdb.db.conf.IoTDBConstant.COLUMN_FUNCTION_TYPE;
+import static org.apache.iotdb.db.conf.IoTDBConstant.COLUMN_ITEM;
+import static org.apache.iotdb.db.conf.IoTDBConstant.COLUMN_LOCK_INFO;
+import static org.apache.iotdb.db.conf.IoTDBConstant.COLUMN_PRIVILEGE;
+import static org.apache.iotdb.db.conf.IoTDBConstant.COLUMN_PROGRESS;
+import static org.apache.iotdb.db.conf.IoTDBConstant.COLUMN_ROLE;
+import static org.apache.iotdb.db.conf.IoTDBConstant.COLUMN_STORAGE_GROUP;
+import static org.apache.iotdb.db.conf.IoTDBConstant.COLUMN_TASK_NAME;
+import static org.apache.iotdb.db.conf.IoTDBConstant.COLUMN_TTL;
+import static org.apache.iotdb.db.conf.IoTDBConstant.COLUMN_USER;
+import static org.apache.iotdb.db.conf.IoTDBConstant.COLUMN_VALUE;
+import static org.apache.iotdb.db.conf.IoTDBConstant.FILE_NAME_SEPARATOR;
+import static org.apache.iotdb.db.conf.IoTDBConstant.FUNCTION_TYPE_BUILTIN_UDAF;
+import static org.apache.iotdb.db.conf.IoTDBConstant.FUNCTION_TYPE_BUILTIN_UDTF;
+import static org.apache.iotdb.db.conf.IoTDBConstant.FUNCTION_TYPE_EXTERNAL_UDAF;
+import static org.apache.iotdb.db.conf.IoTDBConstant.FUNCTION_TYPE_EXTERNAL_UDTF;
+import static org.apache.iotdb.db.conf.IoTDBConstant.FUNCTION_TYPE_NATIVE;
+import static org.apache.iotdb.db.conf.IoTDBConstant.ONE_LEVEL_PATH_WILDCARD;
+import static org.apache.iotdb.db.conf.IoTDBConstant.QUERY_ID;
+import static org.apache.iotdb.db.conf.IoTDBConstant.STATEMENT;
+import static org.apache.iotdb.rpc.TSStatusCode.INTERNAL_SERVER_ERROR;
+import static org.apache.iotdb.tsfile.common.constant.TsFileConstant.TSFILE_SUFFIX;
+
+public class PlanExecutor implements IPlanExecutor {
+
+ private static final Logger logger = LoggerFactory.getLogger(PlanExecutor.class);
+ private static final Logger AUDIT_LOGGER =
+ LoggerFactory.getLogger(IoTDBConstant.AUDIT_LOGGER_NAME);
+ // for data query
+ protected IQueryRouter queryRouter;
+ // for administration
+ private final IAuthorizer authorizer;
+
+ private ThreadPoolExecutor insertionPool;
+
+ private static final String INSERT_MEASUREMENTS_FAILED_MESSAGE = "failed to insert measurements ";
+
+ public PlanExecutor() throws QueryProcessException {
+ queryRouter = new QueryRouter();
+ try {
+ authorizer = BasicAuthorizer.getInstance();
+ } catch (AuthException e) {
+ throw new QueryProcessException(e.getMessage());
+ }
+ }
+
+ @Override
+ public QueryDataSet processQuery(PhysicalPlan queryPlan, QueryContext context)
+ throws IOException, StorageEngineException, QueryFilterOptimizationException,
+ QueryProcessException, MetadataException, InterruptedException {
+ if (queryPlan instanceof QueryPlan) {
+ return processDataQuery((QueryPlan) queryPlan, context);
+ } else if (queryPlan instanceof AuthorPlan) {
+ return processAuthorQuery((AuthorPlan) queryPlan);
+ } else if (queryPlan instanceof ShowPlan) {
+ return processShowQuery((ShowPlan) queryPlan, context);
+ } else {
+ throw new QueryProcessException(String.format("Unrecognized query plan %s", queryPlan));
+ }
+ }
+
+ @Override
+ public boolean processNonQuery(PhysicalPlan plan)
+ throws QueryProcessException, StorageGroupNotSetException, StorageEngineException {
+ switch (plan.getOperatorType()) {
+ case DELETE:
+ delete((DeletePlan) plan);
+ return true;
+ case INSERT:
+ insert((InsertRowPlan) plan);
+ return true;
+ case BATCH_INSERT_ONE_DEVICE:
+ insert((InsertRowsOfOneDevicePlan) plan);
+ return true;
+ case BATCH_INSERT_ROWS:
+ insert((InsertRowsPlan) plan);
+ return true;
+ case BATCH_INSERT:
+ insertTablet((InsertTabletPlan) plan);
+ return true;
+ case MULTI_BATCH_INSERT:
+ insertTablet((InsertMultiTabletPlan) plan);
+ return true;
+ case CREATE_ROLE:
+ case DELETE_ROLE:
+ case CREATE_USER:
+ case REVOKE_USER_ROLE:
+ case REVOKE_ROLE_PRIVILEGE:
+ case REVOKE_USER_PRIVILEGE:
+ case GRANT_ROLE_PRIVILEGE:
+ case GRANT_USER_PRIVILEGE:
+ case GRANT_USER_ROLE:
+ case MODIFY_PASSWORD:
+ case DELETE_USER:
+ AuthorPlan author = (AuthorPlan) plan;
+ return operateAuthor(author);
+ case GRANT_WATERMARK_EMBEDDING:
+ return operateWatermarkEmbedding(((DataAuthPlan) plan).getUsers(), true);
+ case REVOKE_WATERMARK_EMBEDDING:
+ return operateWatermarkEmbedding(((DataAuthPlan) plan).getUsers(), false);
+ case DELETE_TIMESERIES:
+ return deleteTimeSeries((DeleteTimeSeriesPlan) plan);
+ case CREATE_TIMESERIES:
+ return createTimeSeries((CreateTimeSeriesPlan) plan);
+ case CREATE_ALIGNED_TIMESERIES:
+ return createAlignedTimeSeries((CreateAlignedTimeSeriesPlan) plan);
+ case CREATE_MULTI_TIMESERIES:
+ return createMultiTimeSeries((CreateMultiTimeSeriesPlan) plan);
+ case ALTER_TIMESERIES:
+ return alterTimeSeries((AlterTimeSeriesPlan) plan);
+ case SET_STORAGE_GROUP:
+ return setStorageGroup((SetStorageGroupPlan) plan);
+ case DELETE_STORAGE_GROUP:
+ return deleteStorageGroups((DeleteStorageGroupPlan) plan);
+ case TTL:
+ operateTTL((SetTTLPlan) plan);
+ return true;
+ case LOAD_CONFIGURATION:
+ loadConfiguration((LoadConfigurationPlan) plan);
+ return true;
+ case LOAD_FILES:
+ operateLoadFiles((OperateFilePlan) plan);
+ return true;
+ case REMOVE_FILE:
+ operateRemoveFile((OperateFilePlan) plan);
+ return true;
+ case UNLOAD_FILE:
+ operateUnloadFile((OperateFilePlan) plan);
+ return true;
+ case FLUSH:
+ operateFlush((FlushPlan) plan);
+ return true;
+ case MERGE:
+ case FULL_MERGE:
+ operateMerge((MergePlan) plan);
+ return true;
+ case SET_SYSTEM_MODE:
+ operateSetSystemMode((SetSystemModePlan) plan);
+ return true;
+ case CLEAR_CACHE:
+ operateClearCache();
+ return true;
+ case DELETE_PARTITION:
+ DeletePartitionPlan p = (DeletePartitionPlan) plan;
+ TimePartitionFilter filter =
+ (storageGroupName, partitionId) ->
+ storageGroupName.equals(
+ ((DeletePartitionPlan) plan).getStorageGroupName().getFullPath())
+ && p.getPartitionId().contains(partitionId);
+ StorageEngine.getInstance()
+ .removePartitions(((DeletePartitionPlan) plan).getStorageGroupName(), filter);
+ return true;
+ case CREATE_SCHEMA_SNAPSHOT:
+ operateCreateSnapshot();
+ return true;
+ case CREATE_FUNCTION:
+ return operateCreateFunction((CreateFunctionPlan) plan);
+ case DROP_FUNCTION:
+ return operateDropFunction((DropFunctionPlan) plan);
+ case CREATE_TRIGGER:
+ return operateCreateTrigger((CreateTriggerPlan) plan);
+ case DROP_TRIGGER:
+ return operateDropTrigger((DropTriggerPlan) plan);
+ case START_TRIGGER:
+ return operateStartTrigger((StartTriggerPlan) plan);
+ case STOP_TRIGGER:
+ return operateStopTrigger((StopTriggerPlan) plan);
+ case KILL:
+ try {
+ operateKillQuery((KillQueryPlan) plan);
+ } catch (QueryIdNotExsitException e) {
+ throw new QueryProcessException(e.getMessage());
+ }
+ return true;
+ case CREATE_TEMPLATE:
+ return createTemplate((CreateTemplatePlan) plan);
+ case APPEND_TEMPLATE:
+ return appendTemplate((AppendTemplatePlan) plan);
+ case PRUNE_TEMPLATE:
+ return pruneTemplate((PruneTemplatePlan) plan);
+ case SET_TEMPLATE:
+ return setTemplate((SetTemplatePlan) plan);
+ case ACTIVATE_TEMPLATE:
+ return activateTemplate((ActivateTemplatePlan) plan);
+ case UNSET_TEMPLATE:
+ return unsetTemplate((UnsetTemplatePlan) plan);
+ case CREATE_CONTINUOUS_QUERY:
+ return operateCreateContinuousQuery((CreateContinuousQueryPlan) plan);
+ case DROP_CONTINUOUS_QUERY:
+ return operateDropContinuousQuery((DropContinuousQueryPlan) plan);
+ case SETTLE:
+ settle((SettlePlan) plan);
+ return true;
+ default:
+ throw new UnsupportedOperationException(
+ String.format("operation %s is not supported", plan.getOperatorType()));
+ }
+ }
+
+ private boolean createTemplate(CreateTemplatePlan createTemplatePlan)
+ throws QueryProcessException {
+ try {
+ IoTDB.metaManager.createSchemaTemplate(createTemplatePlan);
+ } catch (MetadataException e) {
+ throw new QueryProcessException(e);
+ }
+ return true;
+ }
+
+ private boolean appendTemplate(AppendTemplatePlan plan) throws QueryProcessException {
+ try {
+ IoTDB.metaManager.appendSchemaTemplate(plan);
+ } catch (MetadataException e) {
+ throw new QueryProcessException(e);
+ }
+ return true;
+ }
+
+ private boolean pruneTemplate(PruneTemplatePlan plan) throws QueryProcessException {
+ try {
+ IoTDB.metaManager.pruneSchemaTemplate(plan);
+ } catch (MetadataException e) {
+ throw new QueryProcessException(e);
+ }
+ return true;
+ }
+
+ private boolean setTemplate(SetTemplatePlan setTemplatePlan) throws QueryProcessException {
+ try {
+ IoTDB.metaManager.setSchemaTemplate(setTemplatePlan);
+ } catch (MetadataException e) {
+ throw new QueryProcessException(e);
+ }
+ return true;
+ }
+
+ private boolean activateTemplate(ActivateTemplatePlan activateTemplatePlan)
+ throws QueryProcessException {
+ try {
+ IoTDB.metaManager.setUsingSchemaTemplate(activateTemplatePlan);
+ } catch (MetadataException e) {
+ throw new QueryProcessException(e);
+ }
+ return true;
+ }
+
+ private boolean unsetTemplate(UnsetTemplatePlan unsetTemplatePlan) throws QueryProcessException {
+ try {
+ IoTDB.metaManager.unsetSchemaTemplate(unsetTemplatePlan);
+ } catch (MetadataException e) {
+ throw new QueryProcessException(e);
+ }
+ return true;
+ }
+
+ private boolean operateCreateFunction(CreateFunctionPlan plan) throws UDFRegistrationException {
+ UDFRegistrationService.getInstance().register(plan.getUdfName(), plan.getClassName(), true);
+ return true;
+ }
+
+ private boolean operateDropFunction(DropFunctionPlan plan) throws UDFRegistrationException {
+ UDFRegistrationService.getInstance().deregister(plan.getUdfName());
+ return true;
+ }
+
+ private boolean operateCreateTrigger(CreateTriggerPlan plan)
+ throws TriggerManagementException, TriggerExecutionException {
+ TriggerRegistrationService.getInstance().register(plan);
+ return true;
+ }
+
+ private boolean operateDropTrigger(DropTriggerPlan plan) throws TriggerManagementException {
+ TriggerRegistrationService.getInstance().deregister(plan);
+ return true;
+ }
+
+ private boolean operateStartTrigger(StartTriggerPlan plan)
+ throws TriggerManagementException, TriggerExecutionException {
+ TriggerRegistrationService.getInstance().activate(plan);
+ return true;
+ }
+
+ private boolean operateStopTrigger(StopTriggerPlan plan) throws TriggerManagementException {
+ TriggerRegistrationService.getInstance().inactivate(plan);
+ return true;
+ }
+
+ private void operateMerge(MergePlan plan) throws StorageEngineException {
+ if (plan.getOperatorType() == OperatorType.FULL_MERGE) {
+ StorageEngine.getInstance().mergeAll(true);
+ } else {
+ StorageEngine.getInstance().mergeAll(false);
+ }
+ }
+
+ private void operateClearCache() {
+ ChunkCache.getInstance().clear();
+ TimeSeriesMetadataCache.getInstance().clear();
+ }
+
+ private void operateCreateSnapshot() {
+ IoTDB.metaManager.createMTreeSnapshot();
+ }
+
+ private void operateKillQuery(KillQueryPlan killQueryPlan) throws QueryIdNotExsitException {
+ QueryTimeManager queryTimeManager = QueryTimeManager.getInstance();
+ long killQueryId = killQueryPlan.getQueryId();
+ if (killQueryId != -1) {
+ if (queryTimeManager.getQueryContextMap().get(killQueryId) != null) {
+ queryTimeManager.killQuery(killQueryId);
+ } else {
+ throw new QueryIdNotExsitException(
+ String.format(
+ "Query Id %d is not exist, please check it.", killQueryPlan.getQueryId()));
+ }
+ } else {
+ // if queryId is not specified, kill all running queries
+ if (!queryTimeManager.getQueryContextMap().isEmpty()) {
+ synchronized (queryTimeManager.getQueryContextMap()) {
+ List<Long> queryIdList = new ArrayList<>(queryTimeManager.getQueryContextMap().keySet());
+ for (Long queryId : queryIdList) {
+ queryTimeManager.killQuery(queryId);
+ }
+ }
+ }
+ }
+ }
+
+ private void operateSetSystemMode(SetSystemModePlan plan) {
+ IoTDBDescriptor.getInstance().getConfig().setReadOnly(plan.isReadOnly());
+ }
+
+ private void operateFlush(FlushPlan plan) throws StorageGroupNotSetException {
+ if (plan.getPaths().isEmpty()) {
+ StorageEngine.getInstance().syncCloseAllProcessor();
+ } else {
+ flushSpecifiedStorageGroups(plan);
+ }
+
+ if (!plan.getPaths().isEmpty()) {
+ List<PartialPath> noExistSg = checkStorageGroupExist(plan.getPaths());
+ if (!noExistSg.isEmpty()) {
+ StringBuilder sb = new StringBuilder();
+ noExistSg.forEach(storageGroup -> sb.append(storageGroup.getFullPath()).append(","));
+ throw new StorageGroupNotSetException(sb.subSequence(0, sb.length() - 1).toString(), true);
+ }
+ }
+ }
+
+ private boolean operateCreateContinuousQuery(CreateContinuousQueryPlan plan)
+ throws ContinuousQueryException {
+ return ContinuousQueryService.getInstance().register(plan, true);
+ }
+
+ private boolean operateDropContinuousQuery(DropContinuousQueryPlan plan)
+ throws ContinuousQueryException {
+ return ContinuousQueryService.getInstance().deregister(plan);
+ }
+
+ public static void flushSpecifiedStorageGroups(FlushPlan plan)
+ throws StorageGroupNotSetException {
+ Map<PartialPath, List<Pair<Long, Boolean>>> storageGroupMap =
+ plan.getStorageGroupPartitionIds();
+ for (Entry<PartialPath, List<Pair<Long, Boolean>>> entry : storageGroupMap.entrySet()) {
+ PartialPath storageGroupName = entry.getKey();
+ // normal flush
+ if (entry.getValue() == null) {
+ if (plan.isSeq() == null) {
+ StorageEngine.getInstance()
+ .closeStorageGroupProcessor(storageGroupName, true, plan.isSync());
+ StorageEngine.getInstance()
+ .closeStorageGroupProcessor(storageGroupName, false, plan.isSync());
+ } else {
+ StorageEngine.getInstance()
+ .closeStorageGroupProcessor(storageGroupName, plan.isSeq(), plan.isSync());
+ }
+ }
+ // partition specified flush, for snapshot flush plan
+ else {
+ List<Pair<Long, Boolean>> partitionIdSequencePairs = entry.getValue();
+ for (Pair<Long, Boolean> pair : partitionIdSequencePairs) {
+ StorageEngine.getInstance()
+ .closeStorageGroupProcessor(storageGroupName, pair.left, pair.right, true);
+ }
+ }
+ }
+ }
+
+ protected QueryDataSet processDataQuery(QueryPlan queryPlan, QueryContext context)
+ throws StorageEngineException, QueryFilterOptimizationException, QueryProcessException,
+ IOException, InterruptedException {
+ QueryDataSet queryDataSet;
+ if (queryPlan instanceof AlignByDevicePlan) {
+ queryDataSet = getAlignByDeviceDataSet((AlignByDevicePlan) queryPlan, context, queryRouter);
+ } else {
+ if (queryPlan.getPaths() == null || queryPlan.getPaths().isEmpty()) {
+ // no time series are selected, return EmptyDataSet
+ return new EmptyDataSet();
+ } else if (queryPlan instanceof UDAFPlan) {
+ UDAFPlan udafPlan = (UDAFPlan) queryPlan;
+ queryDataSet = queryRouter.udafQuery(udafPlan, context);
+ } else if (queryPlan instanceof UDTFPlan) {
+ UDTFPlan udtfPlan = (UDTFPlan) queryPlan;
+ queryDataSet = queryRouter.udtfQuery(udtfPlan, context);
+ } else if (queryPlan instanceof GroupByTimeFillPlan) {
+ GroupByTimeFillPlan groupByFillPlan = (GroupByTimeFillPlan) queryPlan;
+ queryDataSet = queryRouter.groupByFill(groupByFillPlan, context);
+ } else if (queryPlan instanceof GroupByTimePlan) {
+ GroupByTimePlan groupByTimePlan = (GroupByTimePlan) queryPlan;
+ queryDataSet = queryRouter.groupBy(groupByTimePlan, context);
+ } else if (queryPlan instanceof QueryIndexPlan) {
+ throw new QueryProcessException("Query index hasn't been supported yet");
+ } else if (queryPlan instanceof AggregationPlan) {
+ AggregationPlan aggregationPlan = (AggregationPlan) queryPlan;
+ queryDataSet = queryRouter.aggregate(aggregationPlan, context);
+ } else if (queryPlan instanceof FillQueryPlan) {
+ FillQueryPlan fillQueryPlan = (FillQueryPlan) queryPlan;
+ queryDataSet = queryRouter.fill(fillQueryPlan, context);
+ } else if (queryPlan instanceof LastQueryPlan) {
+ queryDataSet = queryRouter.lastQuery((LastQueryPlan) queryPlan, context);
+ } else {
+ queryDataSet = queryRouter.rawDataQuery((RawDataQueryPlan) queryPlan, context);
+ }
+ }
+ queryDataSet.setRowLimit(queryPlan.getRowLimit());
+ queryDataSet.setRowOffset(queryPlan.getRowOffset());
+ queryDataSet.setWithoutAllNull(queryPlan.isWithoutAllNull());
+ queryDataSet.setWithoutAnyNull(queryPlan.isWithoutAnyNull());
+ return queryDataSet;
+ }
+
+ protected AlignByDeviceDataSet getAlignByDeviceDataSet(
+ AlignByDevicePlan plan, QueryContext context, IQueryRouter router) {
+ return new AlignByDeviceDataSet(plan, context, router);
+ }
+
+ protected QueryDataSet processShowQuery(ShowPlan showPlan, QueryContext context)
+ throws QueryProcessException, MetadataException {
+ switch (showPlan.getShowContentType()) {
+ case TTL:
+ return processShowTTLQuery((ShowTTLPlan) showPlan);
+ case FLUSH_TASK_INFO:
+ return processShowFlushTaskInfo();
+ case VERSION:
+ return processShowVersion();
+ case TIMESERIES:
+ return processShowTimeseries((ShowTimeSeriesPlan) showPlan, context);
+ case STORAGE_GROUP:
+ return processShowStorageGroup((ShowStorageGroupPlan) showPlan);
+ case LOCK_INFO:
+ return processShowLockInfo((ShowLockInfoPlan) showPlan);
+ case DEVICES:
+ return processShowDevices((ShowDevicesPlan) showPlan);
+ case CHILD_PATH:
+ return processShowChildPaths((ShowChildPathsPlan) showPlan);
+ case CHILD_NODE:
+ return processShowChildNodes((ShowChildNodesPlan) showPlan);
+ case COUNT_TIMESERIES:
+ return processCountTimeSeries((CountPlan) showPlan);
+ case COUNT_NODE_TIMESERIES:
+ return processCountNodeTimeSeries((CountPlan) showPlan);
+ case COUNT_DEVICES:
+ return processCountDevices((CountPlan) showPlan);
+ case COUNT_STORAGE_GROUP:
+ return processCountStorageGroup((CountPlan) showPlan);
+ case COUNT_NODES:
+ return processCountNodes((CountPlan) showPlan);
+ case MERGE_STATUS:
+ return processShowMergeStatus();
+ case QUERY_PROCESSLIST:
+ return processShowQueryProcesslist();
+ case FUNCTIONS:
+ return processShowFunctions((ShowFunctionsPlan) showPlan);
+ case TRIGGERS:
+ return processShowTriggers();
+ case CONTINUOUS_QUERY:
+ return processShowContinuousQueries();
+ default:
+ throw new QueryProcessException(String.format("Unrecognized show plan %s", showPlan));
+ }
+ }
+
+ private QueryDataSet processCountNodes(CountPlan countPlan) throws MetadataException {
+ int num = getNodesNumInGivenLevel(countPlan.getPath(), countPlan.getLevel());
+ return createSingleDataSet(COLUMN_COUNT, TSDataType.INT32, num);
+ }
+
+ private QueryDataSet processCountNodeTimeSeries(CountPlan countPlan) throws MetadataException {
+ // get the nodes that need to group by first
+ List<PartialPath> nodes = getNodesList(countPlan.getPath(), countPlan.getLevel());
+ ListDataSet listDataSet =
+ new ListDataSet(
+ Arrays.asList(
+ new PartialPath(COLUMN_COLUMN, false), new PartialPath(COLUMN_COUNT, false)),
+ Arrays.asList(TSDataType.TEXT, TSDataType.INT32));
+ for (PartialPath columnPath : nodes) {
+ RowRecord record = new RowRecord(0);
+ Field field = new Field(TSDataType.TEXT);
+ field.setBinaryV(new Binary(columnPath.getFullPath()));
+ Field field1 = new Field(TSDataType.INT32);
+ // get the count of every group
+ field1.setIntV(getPathsNum(columnPath));
+ record.addField(field);
+ record.addField(field1);
+ listDataSet.putRecord(record);
+ }
+ return listDataSet;
+ }
+
+ private QueryDataSet processCountDevices(CountPlan countPlan) throws MetadataException {
+ int num = getDevicesNum(countPlan.getPath());
+ return createSingleDataSet(COLUMN_DEVICES, TSDataType.INT32, num);
+ }
+
+ private QueryDataSet processCountStorageGroup(CountPlan countPlan) throws MetadataException {
+ int num = getStorageGroupNum(countPlan.getPath());
+ return createSingleDataSet(COLUMN_STORAGE_GROUP, TSDataType.INT32, num);
+ }
+
+ private QueryDataSet createSingleDataSet(String columnName, TSDataType columnType, Object val) {
+ SingleDataSet singleDataSet =
+ new SingleDataSet(
+ Collections.singletonList(new PartialPath(columnName, false)),
+ Collections.singletonList(columnType));
+ Field field = new Field(columnType);
+ switch (columnType) {
+ case TEXT:
+ field.setBinaryV(((Binary) val));
+ break;
+ case FLOAT:
+ field.setFloatV(((float) val));
+ break;
+ case INT32:
+ field.setIntV(((int) val));
+ break;
+ case INT64:
+ field.setLongV(((long) val));
+ break;
+ case DOUBLE:
+ field.setDoubleV(((double) val));
+ break;
+ case BOOLEAN:
+ field.setBoolV(((boolean) val));
+ break;
+ default:
+ throw new UnSupportedDataTypeException("Unsupported data type" + columnType);
+ }
+ RowRecord record = new RowRecord(0);
+ record.addField(field);
+ singleDataSet.setRecord(record);
+ return singleDataSet;
+ }
+
+ protected int getDevicesNum(PartialPath path) throws MetadataException {
+ return IoTDB.metaManager.getDevicesNum(path);
+ }
+
+ private int getStorageGroupNum(PartialPath path) throws MetadataException {
+ return IoTDB.metaManager.getStorageGroupNum(path);
+ }
+
+ protected int getPathsNum(PartialPath path) throws MetadataException {
+ return IoTDB.metaManager.getAllTimeseriesCount(path);
+ }
+
+ protected int getNodesNumInGivenLevel(PartialPath path, int level) throws MetadataException {
+ return IoTDB.metaManager.getNodesCountInGivenLevel(path, level);
+ }
+
+ protected List<MeasurementPath> getPathsName(PartialPath path) throws MetadataException {
+ return IoTDB.metaManager.getMeasurementPaths(path);
+ }
+
+ protected List<PartialPath> getNodesList(PartialPath schemaPattern, int level)
+ throws MetadataException {
+ return IoTDB.metaManager.getNodesListInGivenLevel(schemaPattern, level);
+ }
+
+ private QueryDataSet processCountTimeSeries(CountPlan countPlan) throws MetadataException {
+ int num = getPathsNum(countPlan.getPath());
+ return createSingleDataSet(COLUMN_COUNT, TSDataType.INT32, num);
+ }
+
+ private QueryDataSet processShowDevices(ShowDevicesPlan showDevicesPlan)
+ throws MetadataException {
+ return new ShowDevicesDataSet(showDevicesPlan);
+ }
+
+ private QueryDataSet processShowChildPaths(ShowChildPathsPlan showChildPathsPlan)
+ throws MetadataException {
+ Set<String> childPathsList = getPathNextChildren(showChildPathsPlan.getPath());
+ ListDataSet listDataSet =
+ new ListDataSet(
+ Collections.singletonList(new PartialPath(COLUMN_CHILD_PATHS, false)),
+ Collections.singletonList(TSDataType.TEXT));
+ for (String s : childPathsList) {
+ RowRecord record = new RowRecord(0);
+ Field field = new Field(TSDataType.TEXT);
+ field.setBinaryV(new Binary(s));
+ record.addField(field);
+ listDataSet.putRecord(record);
+ }
+ return listDataSet;
+ }
+
+ protected Set<String> getPathNextChildren(PartialPath path) throws MetadataException {
+ return IoTDB.metaManager.getChildNodePathInNextLevel(path);
+ }
+
+ private QueryDataSet processShowChildNodes(ShowChildNodesPlan showChildNodesPlan)
+ throws MetadataException {
+ // getNodeNextChildren
+ Set<String> childNodesList = getNodeNextChildren(showChildNodesPlan.getPath());
+ ListDataSet listDataSet =
+ new ListDataSet(
+ Collections.singletonList(new PartialPath(COLUMN_CHILD_NODES, false)),
+ Collections.singletonList(TSDataType.TEXT));
+ for (String s : childNodesList) {
+ RowRecord record = new RowRecord(0);
+ Field field = new Field(TSDataType.TEXT);
+ field.setBinaryV(new Binary(s));
+ record.addField(field);
+ listDataSet.putRecord(record);
+ }
+ return listDataSet;
+ }
+
+ protected Set<String> getNodeNextChildren(PartialPath path) throws MetadataException {
+ return IoTDB.metaManager.getChildNodeNameInNextLevel(path);
+ }
+
+ protected List<PartialPath> getStorageGroupNames(PartialPath path) throws MetadataException {
+ return IoTDB.metaManager.getMatchedStorageGroups(path);
+ }
+
+ private QueryDataSet processShowStorageGroup(ShowStorageGroupPlan showStorageGroupPlan)
+ throws MetadataException {
+ ListDataSet listDataSet =
+ new ListDataSet(
+ Collections.singletonList(new PartialPath(COLUMN_STORAGE_GROUP, false)),
+ Collections.singletonList(TSDataType.TEXT));
+ List<PartialPath> storageGroupList = getStorageGroupNames(showStorageGroupPlan.getPath());
+ addToDataSet(storageGroupList, listDataSet);
+ return listDataSet;
+ }
+
+ private void addToDataSet(Collection<PartialPath> paths, ListDataSet dataSet) {
+ for (PartialPath s : paths) {
+ RowRecord record = new RowRecord(0);
+ Field field = new Field(TSDataType.TEXT);
+ field.setBinaryV(new Binary(s.getFullPath()));
+ record.addField(field);
+ dataSet.putRecord(record);
+ }
+ }
+
+ private QueryDataSet processShowLockInfo(ShowLockInfoPlan showLockInfoPlan)
+ throws MetadataException {
+ ListDataSet listDataSet =
+ new ListDataSet(
+ Arrays.asList(
+ new PartialPath(COLUMN_STORAGE_GROUP, false),
+ new PartialPath(COLUMN_LOCK_INFO, false)),
+ Arrays.asList(TSDataType.TEXT, TSDataType.TEXT));
+ try {
+ List<PartialPath> storageGroupList = getStorageGroupNames(showLockInfoPlan.getPath());
+ List<String> lockHolderList = StorageEngine.getInstance().getLockInfo(storageGroupList);
+ addLockInfoToDataSet(storageGroupList, lockHolderList, listDataSet);
+ } catch (StorageEngineException e) {
+ throw new MetadataException(e);
+ }
+ return listDataSet;
+ }
+
+ private void addLockInfoToDataSet(
+ List<PartialPath> paths, List<String> lockHolderList, ListDataSet dataSet) {
+ for (int i = 0; i < paths.size(); i++) {
+ RowRecord record = new RowRecord(0);
+ Field field = new Field(TSDataType.TEXT);
+ field.setBinaryV(new Binary(paths.get(i).getFullPath()));
+ record.addField(field);
+ field = new Field(TSDataType.TEXT);
+ field.setBinaryV(new Binary(lockHolderList.get(i)));
+ record.addField(field);
+ dataSet.putRecord(record);
+ }
+ }
+
+ private QueryDataSet processShowTimeseries(
+ ShowTimeSeriesPlan showTimeSeriesPlan, QueryContext context) throws MetadataException {
+ return new ShowTimeseriesDataSet(showTimeSeriesPlan, context);
+ }
+
+ protected List<IStorageGroupMNode> getAllStorageGroupNodes() {
+ return IoTDB.metaManager.getAllStorageGroupNodes();
+ }
+
+ private QueryDataSet processShowTTLQuery(ShowTTLPlan showTTLPlan) {
+ ListDataSet listDataSet =
+ new ListDataSet(
+ Arrays.asList(
+ new PartialPath(COLUMN_STORAGE_GROUP, false), new PartialPath(COLUMN_TTL, false)),
+ Arrays.asList(TSDataType.TEXT, TSDataType.INT64));
+ Set<PartialPath> selectedSgs = new HashSet<>(showTTLPlan.getStorageGroups());
+
+ List<IStorageGroupMNode> storageGroups = getAllStorageGroupNodes();
+ int timestamp = 0;
+ for (IStorageGroupMNode mNode : storageGroups) {
+ PartialPath sgName = mNode.getPartialPath();
+ if (!selectedSgs.isEmpty() && !selectedSgs.contains(sgName)) {
+ continue;
+ }
+ RowRecord rowRecord = new RowRecord(timestamp++);
+ Field sg = new Field(TSDataType.TEXT);
+ Field ttl;
+ sg.setBinaryV(new Binary(sgName.getFullPath()));
+ if (mNode.getDataTTL() != Long.MAX_VALUE) {
+ ttl = new Field(TSDataType.INT64);
+ ttl.setLongV(mNode.getDataTTL());
+ } else {
+ ttl = null;
+ }
+ rowRecord.addField(sg);
+ rowRecord.addField(ttl);
+ listDataSet.putRecord(rowRecord);
+ }
+
+ return listDataSet;
+ }
+
+ private QueryDataSet processShowVersion() {
+ SingleDataSet singleDataSet =
+ new SingleDataSet(
+ Collections.singletonList(new PartialPath(IoTDBConstant.COLUMN_VERSION, false)),
+ Collections.singletonList(TSDataType.TEXT));
+ Field field = new Field(TSDataType.TEXT);
+ field.setBinaryV(new Binary(IoTDBConstant.VERSION));
+ RowRecord rowRecord = new RowRecord(0);
+ rowRecord.addField(field);
+ singleDataSet.setRecord(rowRecord);
+ return singleDataSet;
+ }
+
+ private QueryDataSet processShowFlushTaskInfo() {
+ ListDataSet listDataSet =
+ new ListDataSet(
+ Arrays.asList(
+ new PartialPath(COLUMN_ITEM, false), new PartialPath(COLUMN_VALUE, false)),
+ Arrays.asList(TSDataType.TEXT, TSDataType.TEXT));
+
+ int timestamp = 0;
+ addRowRecordForShowQuery(
+ listDataSet,
+ timestamp++,
+ "total number of flush tasks",
+ Integer.toString(FlushTaskPoolManager.getInstance().getTotalTasks()));
+ addRowRecordForShowQuery(
+ listDataSet,
+ timestamp++,
+ "number of working flush tasks",
+ Integer.toString(FlushTaskPoolManager.getInstance().getWorkingTasksNumber()));
+ addRowRecordForShowQuery(
+ listDataSet,
+ timestamp,
+ "number of waiting flush tasks",
+ Integer.toString(FlushTaskPoolManager.getInstance().getWaitingTasksNumber()));
+ return listDataSet;
+ }
+
+ private QueryDataSet processShowFunctions(ShowFunctionsPlan showPlan)
+ throws QueryProcessException {
+ ListDataSet listDataSet =
+ new ListDataSet(
+ Arrays.asList(
+ new PartialPath(COLUMN_FUNCTION_NAME, false),
+ new PartialPath(COLUMN_FUNCTION_TYPE, false),
+ new PartialPath(COLUMN_FUNCTION_CLASS, false)),
+ Arrays.asList(TSDataType.TEXT, TSDataType.TEXT, TSDataType.TEXT));
+
+ appendUDFs(listDataSet, showPlan);
+ appendNativeFunctions(listDataSet, showPlan);
+
+ listDataSet.sort(
+ (r1, r2) ->
+ String.CASE_INSENSITIVE_ORDER.compare(
+ r1.getFields().get(0).getStringValue(), r2.getFields().get(0).getStringValue()));
+ return listDataSet;
+ }
+
+ @SuppressWarnings("squid:S3776")
+ private void appendUDFs(ListDataSet listDataSet, ShowFunctionsPlan showPlan)
+ throws QueryProcessException {
+ for (UDFRegistrationInformation info :
+ UDFRegistrationService.getInstance().getRegistrationInformation()) {
+ RowRecord rowRecord = new RowRecord(0); // ignore timestamp
+ rowRecord.addField(Binary.valueOf(info.getFunctionName()), TSDataType.TEXT);
+ String functionType = "";
+ try {
+ if (info.isBuiltin()) {
+ if (info.isUDTF()) {
+ functionType = FUNCTION_TYPE_BUILTIN_UDTF;
+ } else if (info.isUDAF()) {
+ functionType = FUNCTION_TYPE_BUILTIN_UDAF;
+ }
+ } else {
+ if (info.isUDTF()) {
+ functionType = FUNCTION_TYPE_EXTERNAL_UDTF;
+ } else if (info.isUDAF()) {
+ functionType = FUNCTION_TYPE_EXTERNAL_UDAF;
+ }
+ }
+ } catch (InstantiationException
+ | InvocationTargetException
+ | NoSuchMethodException
+ | IllegalAccessException e) {
+ throw new QueryProcessException(e.toString());
+ }
+ rowRecord.addField(Binary.valueOf(functionType), TSDataType.TEXT);
+ rowRecord.addField(Binary.valueOf(info.getClassName()), TSDataType.TEXT);
+ listDataSet.putRecord(rowRecord);
+ }
+ }
+
+ private QueryDataSet processShowContinuousQueries() {
+ ListDataSet listDataSet =
+ new ListDataSet(
+ Arrays.asList(
+ new PartialPath(COLUMN_CONTINUOUS_QUERY_NAME, false),
+ new PartialPath(COLUMN_CONTINUOUS_QUERY_EVERY_INTERVAL, false),
+ new PartialPath(COLUMN_CONTINUOUS_QUERY_FOR_INTERVAL, false),
+ new PartialPath(COLUMN_CONTINUOUS_QUERY_QUERY_SQL, false),
+ new PartialPath(COLUMN_CONTINUOUS_QUERY_TARGET_PATH, false)),
+ Arrays.asList(
+ TSDataType.TEXT,
+ TSDataType.INT64,
+ TSDataType.INT64,
+ TSDataType.TEXT,
+ TSDataType.TEXT));
+
+ List<ShowContinuousQueriesResult> continuousQueriesList =
+ ContinuousQueryService.getInstance().getShowContinuousQueriesResultList();
+
+ for (ShowContinuousQueriesResult result : continuousQueriesList) {
+ RowRecord record = new RowRecord(0);
+ record.addField(Binary.valueOf(result.getContinuousQueryName()), TSDataType.TEXT);
+ record.addField(result.getEveryInterval(), TSDataType.INT64);
+ record.addField(result.getForInterval(), TSDataType.INT64);
+ record.addField(Binary.valueOf(result.getQuerySql()), TSDataType.TEXT);
+ record.addField(Binary.valueOf(result.getTargetPath().getFullPath()), TSDataType.TEXT);
+ listDataSet.putRecord(record);
+ }
+
+ return listDataSet;
+ }
+
+ private void appendNativeFunctions(ListDataSet listDataSet, ShowFunctionsPlan showPlan) {
+ final Binary functionType = Binary.valueOf(FUNCTION_TYPE_NATIVE);
+ final Binary className = Binary.valueOf("");
+ for (String functionName : SQLConstant.getNativeFunctionNames()) {
+ RowRecord rowRecord = new RowRecord(0); // ignore timestamp
+ rowRecord.addField(Binary.valueOf(functionName.toUpperCase()), TSDataType.TEXT);
+ rowRecord.addField(functionType, TSDataType.TEXT);
+ rowRecord.addField(className, TSDataType.TEXT);
+ listDataSet.putRecord(rowRecord);
+ }
+ }
+
+ private QueryDataSet processShowTriggers() {
+ return TriggerRegistrationService.getInstance().show();
+ }
+
+ private void addRowRecordForShowQuery(
+ ListDataSet listDataSet, int timestamp, String item, String value) {
+ RowRecord rowRecord = new RowRecord(timestamp);
+ Field itemField = new Field(TSDataType.TEXT);
+ itemField.setBinaryV(new Binary(item));
+ Field valueField = new Field(TSDataType.TEXT);
+ valueField.setBinaryV(new Binary(value));
+ rowRecord.addField(itemField);
+ rowRecord.addField(valueField);
+ listDataSet.putRecord(rowRecord);
+ }
+
+ @Override
+ public void delete(DeletePlan deletePlan) throws QueryProcessException {
+ AUDIT_LOGGER.info(
+ "delete data from {} in [{},{}]",
+ deletePlan.getPaths(),
+ deletePlan.getDeleteStartTime(),
+ deletePlan.getDeleteEndTime());
+ for (PartialPath path : deletePlan.getPaths()) {
+ delete(
+ path,
+ deletePlan.getDeleteStartTime(),
+ deletePlan.getDeleteEndTime(),
+ deletePlan.getIndex(),
+ deletePlan.getPartitionFilter());
+ }
+ }
+
+ private void operateLoadFiles(OperateFilePlan plan) throws QueryProcessException {
+ File file = plan.getFile();
+ if (!file.exists()) {
+ throw new QueryProcessException(
+ String.format("File path %s doesn't exists.", file.getPath()));
+ }
+ if (file.isDirectory()) {
+ loadDir(file, plan);
+ } else {
+ loadFile(file, plan);
+ }
+ }
+
+ private void loadDir(File curFile, OperateFilePlan plan) throws QueryProcessException {
+ File[] files = curFile.listFiles();
+ long[] establishTime = new long[files.length];
+ List<Integer> tsfiles = new ArrayList<>();
+
+ for (int i = 0; i < files.length; i++) {
+ File file = files[i];
+ if (!file.isDirectory()) {
+ String fileName = file.getName();
+ if (fileName.endsWith(TSFILE_SUFFIX)) {
+ establishTime[i] = Long.parseLong(fileName.split(FILE_NAME_SEPARATOR)[0]);
+ tsfiles.add(i);
+ }
+ }
+ }
+ Collections.sort(
+ tsfiles,
+ (o1, o2) -> {
+ if (establishTime[o1] == establishTime[o2]) {
+ return 0;
+ }
+ return establishTime[o1] < establishTime[o2] ? -1 : 1;
+ });
+ for (Integer i : tsfiles) {
+ loadFile(files[i], plan);
+ }
+
+ for (File file : files) {
+ if (file.isDirectory()) {
+ loadDir(file, plan);
+ }
+ }
+ }
+
+ private void loadFile(File file, OperateFilePlan plan) throws QueryProcessException {
+ if (!file.getName().endsWith(TSFILE_SUFFIX)) {
+ return;
+ }
+ TsFileResource tsFileResource = new TsFileResource(file);
+ tsFileResource.setClosed(true);
+ try {
+ // check file
+ RestorableTsFileIOWriter restorableTsFileIOWriter = new RestorableTsFileIOWriter(file);
+ if (restorableTsFileIOWriter.hasCrashed()) {
+ restorableTsFileIOWriter.close();
+ throw new QueryProcessException(
+ String.format(
+ "Cannot load file %s because the file has crashed.", file.getAbsolutePath()));
+ }
+ Map<Path, IMeasurementSchema> schemaMap = new HashMap<>();
+
+ List<ChunkGroupMetadata> chunkGroupMetadataList = new ArrayList<>();
+ try (TsFileSequenceReader reader = new TsFileSequenceReader(file.getAbsolutePath(), false)) {
+ reader.selfCheck(schemaMap, chunkGroupMetadataList, false);
+ if (plan.getVerifyMetadata()) {
+ loadNewTsFileVerifyMetadata(reader);
+ }
+ } catch (IOException e) {
+ logger.warn("can not get timeseries metadata from {}.", file.getAbsoluteFile());
+ throw new QueryProcessException(e.getMessage());
+ }
+
+ FileLoaderUtils.checkTsFileResource(tsFileResource);
+ if (UpgradeUtils.isNeedUpgrade(tsFileResource)) {
+ throw new QueryProcessException(
+ String.format(
+ "Cannot load file %s because the file's version is old which needs to be upgraded.",
+ file.getAbsolutePath()));
+ }
+
+ // create schemas if they doesn't exist
+ if (plan.isAutoCreateSchema()) {
+ createSchemaAutomatically(chunkGroupMetadataList, schemaMap, plan.getSgLevel());
+ }
+
+ List<TsFileResource> splitResources = new ArrayList();
+ if (tsFileResource.isSpanMultiTimePartitions()) {
+ logger.info(
+ "try to split the tsFile={} du to it spans multi partitions",
+ tsFileResource.getTsFile().getPath());
+ TsFileRewriteTool.rewriteTsFile(tsFileResource, splitResources);
+ tsFileResource.writeLock();
+ tsFileResource.removeModFile();
+ tsFileResource.writeUnlock();
+ logger.info(
+ "after split, the old tsFile was split to {} new tsFiles", splitResources.size());
+ }
+
+ if (splitResources.isEmpty()) {
+ splitResources.add(tsFileResource);
+ }
+
+ for (TsFileResource resource : splitResources) {
+ StorageEngine.getInstance().loadNewTsFile(resource);
+ }
+ } catch (Exception e) {
+ logger.error("fail to load file {}", file.getName(), e);
+ throw new QueryProcessException(
+ String.format("Cannot load file %s because %s", file.getAbsolutePath(), e.getMessage()));
+ }
+ }
+
+ private void loadNewTsFileVerifyMetadata(TsFileSequenceReader tsFileSequenceReader)
+ throws MetadataException, QueryProcessException, IOException {
+ Map<String, List<TimeseriesMetadata>> metadataSet =
+ tsFileSequenceReader.getAllTimeseriesMetadata();
+ for (Map.Entry<String, List<TimeseriesMetadata>> entry : metadataSet.entrySet()) {
+ String deviceId = entry.getKey();
+ PartialPath devicePath = new PartialPath(deviceId);
+ if (!IoTDB.metaManager.isPathExist(devicePath)) {
+ continue;
+ }
+ for (TimeseriesMetadata metadata : entry.getValue()) {
+ PartialPath fullPath =
+ new PartialPath(deviceId + TsFileConstant.PATH_SEPARATOR + metadata.getMeasurementId());
+ if (IoTDB.metaManager.isPathExist(fullPath)) {
+ TSDataType dataType = IoTDB.metaManager.getSeriesType(fullPath);
+ if (dataType != metadata.getTSDataType()) {
+ throw new QueryProcessException(
+ fullPath.getFullPath()
+ + " is "
+ + metadata.getTSDataType().name()
+ + " in the loading TsFile but is "
+ + dataType.name()
+ + " in IoTDB.");
+ }
+ }
+ }
+ }
+ }
+
+ @SuppressWarnings("squid:S3776") // Suppress high Cognitive Complexity warning
+ private void createSchemaAutomatically(
+ List<ChunkGroupMetadata> chunkGroupMetadataList,
+ Map<Path, IMeasurementSchema> knownSchemas,
+ int sgLevel)
+ throws MetadataException {
+ if (chunkGroupMetadataList.isEmpty()) {
+ return;
+ }
+
+ Set<PartialPath> registeredSeries = new HashSet<>();
+ for (ChunkGroupMetadata chunkGroupMetadata : chunkGroupMetadataList) {
+ String device = chunkGroupMetadata.getDevice();
+ Set<String> existSeriesSet = new HashSet<>();
+ PartialPath devicePath = new PartialPath(device);
+ PartialPath storageGroupPath = MetaUtils.getStorageGroupPathByLevel(devicePath, sgLevel);
+ try {
+ IoTDB.metaManager.setStorageGroup(storageGroupPath);
+ } catch (StorageGroupAlreadySetException alreadySetException) {
+ if (!alreadySetException.getStorageGroupPath().equals(storageGroupPath.getFullPath())) {
+ throw alreadySetException;
+ }
+ }
+ for (PartialPath path :
+ IoTDB.metaManager.getMeasurementPaths(devicePath.concatNode(ONE_LEVEL_PATH_WILDCARD))) {
+ existSeriesSet.add(path.getMeasurement());
+ existSeriesSet.add(path.getMeasurementAlias());
+ }
+ for (ChunkMetadata chunkMetadata : chunkGroupMetadata.getChunkMetadataList()) {
+ PartialPath series =
+ new PartialPath(
+ chunkGroupMetadata.getDevice()
+ + TsFileConstant.PATH_SEPARATOR
+ + chunkMetadata.getMeasurementUid());
+ if (!registeredSeries.contains(series)) {
+ registeredSeries.add(series);
+ IMeasurementSchema schema =
+ knownSchemas.get(new Path(series.getDevice(), series.getMeasurement()));
+ if (schema == null) {
+ throw new MetadataException(
+ String.format(
+ "Can not get the schema of measurement [%s]",
+ chunkMetadata.getMeasurementUid()));
+ }
+ if (!existSeriesSet.contains(chunkMetadata.getMeasurementUid())) {
+ IoTDB.metaManager.createTimeseries(
+ series,
+ schema.getType(),
+ schema.getEncodingType(),
+ schema.getCompressor(),
+ Collections.emptyMap());
+ }
+ }
+ }
+ }
+ }
+
+ private void operateRemoveFile(OperateFilePlan plan) throws QueryProcessException {
+ try {
+ if (!StorageEngine.getInstance().deleteTsfile(plan.getFile())) {
+ throw new QueryProcessException(
+ String.format("File %s doesn't exist.", plan.getFile().getName()));
+ }
+ } catch (StorageEngineException | IllegalPathException e) {
+ throw new QueryProcessException(
+ String.format("Cannot remove file because %s", e.getMessage()));
+ }
+ }
+
+ private void operateUnloadFile(OperateFilePlan plan) throws QueryProcessException {
+ if (!plan.getTargetDir().exists() || !plan.getTargetDir().isDirectory()) {
+ throw new QueryProcessException(
+ String.format("Target dir %s is invalid.", plan.getTargetDir().getPath()));
+ }
+ try {
+ if (!StorageEngine.getInstance().unloadTsfile(plan.getFile(), plan.getTargetDir())) {
+ throw new QueryProcessException(
+ String.format("File %s doesn't exist.", plan.getFile().getName()));
+ }
+ } catch (StorageEngineException | IllegalPathException e) {
+ throw new QueryProcessException(
+ String.format(
+ "Cannot unload file %s to target directory %s because %s",
+ plan.getFile().getPath(), plan.getTargetDir().getPath(), e.getMessage()));
+ }
+ }
+
+ private void operateTTL(SetTTLPlan plan) throws QueryProcessException {
+ try {
+ List<PartialPath> storageGroupPaths =
+ IoTDB.metaManager.getMatchedStorageGroups(plan.getStorageGroup());
+ for (PartialPath storagePath : storageGroupPaths) {
+ IoTDB.metaManager.setTTL(storagePath, plan.getDataTTL());
+ StorageEngine.getInstance().setTTL(storagePath, plan.getDataTTL());
+ }
+ } catch (MetadataException e) {
+ throw new QueryProcessException(e);
+ } catch (IOException e) {
+ throw new QueryProcessException(e.getMessage());
+ }
+ }
+
+ @Override
+ public void update(PartialPath path, long startTime, long endTime, String value) {
+ throw new UnsupportedOperationException("update is not supported now");
+ }
+
+ @Override
+ public void delete(
+ PartialPath path,
+ long startTime,
+ long endTime,
+ long planIndex,
+ TimePartitionFilter timePartitionFilter)
+ throws QueryProcessException {
+ try {
+ StorageEngine.getInstance().delete(path, startTime, endTime, planIndex, timePartitionFilter);
+ } catch (StorageEngineException e) {
+ throw new QueryProcessException(e);
+ }
+ }
+
+ private void checkFailedMeasurments(InsertPlan plan)
+ throws PathNotExistException, StorageEngineException {
+ // check if all path not exist exceptions
+ List<String> failedPaths = plan.getFailedMeasurements();
+ List<Exception> exceptions = plan.getFailedExceptions();
+ boolean isPathNotExistException = true;
+ for (Exception e : exceptions) {
+ Throwable curException = e;
+ while (curException.getCause() != null) {
+ curException = curException.getCause();
+ }
+ if (!(curException instanceof PathNotExistException)) {
+ isPathNotExistException = false;
+ break;
+ }
+ }
+ if (isPathNotExistException) {
+ throw new PathNotExistException(failedPaths);
+ } else {
+ throw new StorageEngineException(
+ INSERT_MEASUREMENTS_FAILED_MESSAGE
+ + plan.getFailedMeasurements()
+ + (!exceptions.isEmpty() ? (" caused by " + exceptions.get(0).getMessage()) : ""));
+ }
+ }
+
+ @Override
+ public void insert(InsertRowsOfOneDevicePlan insertRowsOfOneDevicePlan)
+ throws QueryProcessException {
+ if (insertRowsOfOneDevicePlan.getRowPlans().length == 0) {
+ return;
+ }
+ try {
+ // insert to storage engine
+ StorageEngine.getInstance().insert(insertRowsOfOneDevicePlan);
+
+ List<String> notExistedPaths = null;
+ List<String> failedMeasurements = null;
+
+ // If there are some exceptions, we assume they caused by the same reason.
+ Exception exception = null;
+ for (InsertRowPlan plan : insertRowsOfOneDevicePlan.getRowPlans()) {
+ if (plan.getFailedMeasurements() != null) {
+ if (notExistedPaths == null) {
+ notExistedPaths = new ArrayList<>();
+ failedMeasurements = new ArrayList<>();
+ }
+ // check if all path not exist exceptions
+ List<String> failedPaths = plan.getFailedMeasurements();
+ List<Exception> exceptions = plan.getFailedExceptions();
+ boolean isPathNotExistException = true;
+ for (Exception e : exceptions) {
+ exception = e;
+ Throwable curException = e;
+ while (curException.getCause() != null) {
+ curException = curException.getCause();
+ }
+ if (!(curException instanceof PathNotExistException)) {
+ isPathNotExistException = false;
+ break;
+ }
+ }
+ if (isPathNotExistException) {
+ notExistedPaths.addAll(failedPaths);
+ } else {
+ failedMeasurements.addAll(plan.getFailedMeasurements());
+ }
+ }
+ }
+ if (notExistedPaths != null && !notExistedPaths.isEmpty()) {
+ throw new PathNotExistException(notExistedPaths);
+ } else if (notExistedPaths != null && !failedMeasurements.isEmpty()) {
+ throw new StorageEngineException(
+ "failed to insert points "
+ + failedMeasurements
+ + (exception != null ? (" caused by " + exception.getMessage()) : ""));
+ }
+
+ } catch (StorageEngineException | MetadataException e) {
+ throw new QueryProcessException(e);
+ }
+ }
+
+ @Override
+ public void insert(InsertRowsPlan plan) throws QueryProcessException {
+ for (int i = 0; i < plan.getInsertRowPlanList().size(); i++) {
+ if (plan.getResults().containsKey(i) || plan.isExecuted(i)) {
+ continue;
+ }
+ try {
+ insert(plan.getInsertRowPlanList().get(i));
+ } catch (QueryProcessException e) {
+ plan.getResults().put(i, RpcUtils.getStatus(e.getErrorCode(), e.getMessage()));
+ }
+ }
+ if (!plan.getResults().isEmpty()) {
+ throw new BatchProcessException(plan.getFailingStatus());
+ }
+ }
+
+ @Override
+ public void insert(InsertRowPlan insertRowPlan) throws QueryProcessException {
+ try {
+ insertRowPlan.setMeasurementMNodes(
+ new IMeasurementMNode[insertRowPlan.getMeasurements().length]);
+ // When insert data with sql statement, the data types will be null here.
+ // We need to predicted the data types first
+ if (insertRowPlan.getDataTypes()[0] == null) {
+ for (int i = 0; i < insertRowPlan.getDataTypes().length; i++) {
+ insertRowPlan.getDataTypes()[i] =
+ TypeInferenceUtils.getPredictedDataType(
+ insertRowPlan.getValues()[i], insertRowPlan.isNeedInferType());
+ }
+ }
+
+ StorageEngine.getInstance().insert(insertRowPlan);
+
+ if (insertRowPlan.getFailedMeasurements() != null) {
+ checkFailedMeasurments(insertRowPlan);
+ }
+ } catch (StorageEngineException | MetadataException e) {
+ if (IoTDBDescriptor.getInstance().getConfig().isEnableStatMonitor()) {
+ StatMonitor.getInstance().updateFailedStatValue();
+ }
+ throw new QueryProcessException(e);
+ } catch (Exception e) {
+ // update failed statistics
+ if (IoTDBDescriptor.getInstance().getConfig().isEnableStatMonitor()) {
+ StatMonitor.getInstance().updateFailedStatValue();
+ }
+ throw e;
+ }
+ }
+
+ @Override
+ public void insertTablet(InsertMultiTabletPlan insertMultiTabletPlan)
+ throws QueryProcessException {
+ if (insertMultiTabletPlan.isEnableMultiThreading()) {
+ insertTabletParallel(insertMultiTabletPlan);
+ } else {
+ insertTabletSerial(insertMultiTabletPlan);
+ }
+ }
+
+ private void insertTabletSerial(InsertMultiTabletPlan insertMultiTabletPlan)
+ throws BatchProcessException {
+ for (int i = 0; i < insertMultiTabletPlan.getInsertTabletPlanList().size(); i++) {
+ if (insertMultiTabletPlan.getResults().containsKey(i)
+ || insertMultiTabletPlan.isExecuted(i)) {
+ continue;
+ }
+ try {
+ insertTablet(insertMultiTabletPlan.getInsertTabletPlanList().get(i));
+ } catch (QueryProcessException e) {
+ insertMultiTabletPlan
+ .getResults()
+ .put(i, RpcUtils.getStatus(e.getErrorCode(), e.getMessage()));
+ }
+ }
+ if (!insertMultiTabletPlan.getResults().isEmpty()) {
+ throw new BatchProcessException(insertMultiTabletPlan.getFailingStatus());
+ }
+ }
+
+ private void insertTabletParallel(InsertMultiTabletPlan insertMultiTabletPlan)
+ throws BatchProcessException {
+ updateInsertTabletsPool(insertMultiTabletPlan.getDifferentStorageGroupsCount());
+
+ List<InsertTabletPlan> planList = insertMultiTabletPlan.getInsertTabletPlanList();
+ List<Future<?>> futureList = new ArrayList<>();
+
+ Map<Integer, TSStatus> results = insertMultiTabletPlan.getResults();
+
+ List<InsertTabletPlan> runPlanList = new ArrayList<>();
+ Map<Integer, Integer> runIndexToRealIndex = new HashMap<>();
+ for (int i = 0; i < planList.size(); i++) {
+ if (!(results.containsKey(i) || insertMultiTabletPlan.isExecuted(i))) {
+ runPlanList.add(planList.get(i));
+ runIndexToRealIndex.put(runPlanList.size() - 1, i);
+ }
+ }
+ for (InsertTabletPlan plan : runPlanList) {
+ Future<?> f =
+ insertionPool.submit(
+ () -> {
+ insertTablet(plan);
+ return null;
+ });
+ futureList.add(f);
+ }
+ for (int i = 0; i < futureList.size(); i++) {
+ try {
+ futureList.get(i).get();
+ } catch (Exception e) {
+ if (e.getCause() instanceof QueryProcessException) {
+ QueryProcessException qe = (QueryProcessException) e.getCause();
+ results.put(
+ runIndexToRealIndex.get(i), RpcUtils.getStatus(qe.getErrorCode(), qe.getMessage()));
+ } else {
+ results.put(
+ runIndexToRealIndex.get(i),
+ RpcUtils.getStatus(INTERNAL_SERVER_ERROR, e.getMessage()));
+ }
+ }
+ }
+
+ if (!results.isEmpty()) {
+ throw new BatchProcessException(insertMultiTabletPlan.getFailingStatus());
+ }
+ }
+
+ private void updateInsertTabletsPool(int sgSize) {
+ int updateCoreSize = Math.min(sgSize, Runtime.getRuntime().availableProcessors() / 2);
+ if (insertionPool == null || insertionPool.isTerminated()) {
+ insertionPool =
+ (ThreadPoolExecutor)
+ IoTDBThreadPoolFactory.newFixedThreadPool(
+ updateCoreSize, ThreadName.INSERTION_SERVICE.getName());
+ } else if (insertionPool.getCorePoolSize() > updateCoreSize) {
+ insertionPool.setCorePoolSize(updateCoreSize);
+ insertionPool.setMaximumPoolSize(updateCoreSize);
+ } else if (insertionPool.getCorePoolSize() < updateCoreSize) {
+ insertionPool.setMaximumPoolSize(updateCoreSize);
+ insertionPool.setCorePoolSize(updateCoreSize);
+ }
+ }
+
+ @Override
+ public void insertTablet(InsertTabletPlan insertTabletPlan) throws QueryProcessException {
+
+ if (insertTabletPlan.getRowCount() == 0) {
+ return;
+ }
+ try {
+ insertTabletPlan.setMeasurementMNodes(
+ new IMeasurementMNode[insertTabletPlan.getMeasurements().length]);
+
+ StorageEngine.getInstance().insertTablet(insertTabletPlan);
+
+ if (insertTabletPlan.getFailedMeasurements() != null) {
+ checkFailedMeasurments(insertTabletPlan);
+ }
+ } catch (StorageEngineException | MetadataException e) {
+ if (IoTDBDescriptor.getInstance().getConfig().isEnableStatMonitor()) {
+ StatMonitor.getInstance().updateFailedStatValue();
+ }
+ throw new QueryProcessException(e);
+ } catch (Exception e) {
+ // update failed statistics
+ if (IoTDBDescriptor.getInstance().getConfig().isEnableStatMonitor()) {
+ StatMonitor.getInstance().updateFailedStatValue();
+ }
+ throw e;
+ }
+ }
+
+ private boolean operateAuthor(AuthorPlan author) throws QueryProcessException {
+ AuthorOperator.AuthorType authorType = author.getAuthorType();
+ String userName = author.getUserName();
+ String roleName = author.getRoleName();
+ String password = author.getPassword();
+ String newPassword = author.getNewPassword();
+ Set<Integer> permissions = author.getPermissions();
+ PartialPath nodeName = author.getNodeName();
+ try {
+ switch (authorType) {
+ case UPDATE_USER:
+ authorizer.updateUserPassword(userName, newPassword);
+ break;
+ case CREATE_USER:
+ authorizer.createUser(userName, password);
+ break;
+ case CREATE_ROLE:
+ authorizer.createRole(roleName);
+ break;
+ case DROP_USER:
+ authorizer.deleteUser(userName);
+ break;
+ case DROP_ROLE:
+ authorizer.deleteRole(roleName);
+ break;
+ case GRANT_ROLE:
+ for (int i : permissions) {
+ authorizer.grantPrivilegeToRole(roleName, nodeName.getFullPath(), i);
+ }
+ break;
+ case GRANT_USER:
+ for (int i : permissions) {
+ authorizer.grantPrivilegeToUser(userName, nodeName.getFullPath(), i);
+ }
+ break;
+ case GRANT_ROLE_TO_USER:
+ authorizer.grantRoleToUser(roleName, userName);
+ break;
+ case REVOKE_USER:
+ for (int i : permissions) {
+ authorizer.revokePrivilegeFromUser(userName, nodeName.getFullPath(), i);
+ }
+ break;
+ case REVOKE_ROLE:
+ for (int i : permissions) {
+ authorizer.revokePrivilegeFromRole(roleName, nodeName.getFullPath(), i);
+ }
+ break;
+ case REVOKE_ROLE_FROM_USER:
+ authorizer.revokeRoleFromUser(roleName, userName);
+ break;
+ default:
+ throw new QueryProcessException("Unsupported operation " + authorType);
+ }
+ } catch (AuthException e) {
+ throw new QueryProcessException(e.getMessage(), true);
+ }
+ return true;
+ }
+
+ private boolean operateWatermarkEmbedding(List<String> users, boolean useWatermark)
+ throws QueryProcessException {
+ try {
+ for (String user : users) {
+ authorizer.setUserUseWaterMark(user, useWatermark);
+ }
+ } catch (AuthException e) {
+ throw new QueryProcessException(e.getMessage());
+ }
+ return true;
+ }
+
+ private boolean createTimeSeries(CreateTimeSeriesPlan createTimeSeriesPlan)
+ throws QueryProcessException {
+ try {
+ IoTDB.metaManager.createTimeseries(createTimeSeriesPlan);
+ } catch (MetadataException e) {
+ throw new QueryProcessException(e);
+ }
+ return true;
+ }
+
+ private boolean createAlignedTimeSeries(CreateAlignedTimeSeriesPlan createAlignedTimeSeriesPlan)
+ throws QueryProcessException {
+ try {
+ IoTDB.metaManager.createAlignedTimeSeries(createAlignedTimeSeriesPlan);
+ } catch (MetadataException e) {
+ throw new QueryProcessException(e);
+ }
+ return true;
+ }
+
+ @SuppressWarnings("squid:S3776") // high Cognitive Complexity
+ private boolean createMultiTimeSeries(CreateMultiTimeSeriesPlan multiPlan)
+ throws BatchProcessException {
+ int dataTypeIdx = 0;
+ for (int i = 0; i < multiPlan.getPaths().size(); i++) {
+ if (multiPlan.getResults().containsKey(i) || multiPlan.isExecuted(i)) {
+ continue;
+ }
+ PartialPath path = multiPlan.getPaths().get(i);
+ String measurement = path.getMeasurement();
+ CreateTimeSeriesPlan plan =
+ new CreateTimeSeriesPlan(
+ multiPlan.getPaths().get(i),
+ multiPlan.getDataTypes().get(i),
+ multiPlan.getEncodings().get(i),
+ multiPlan.getCompressors().get(i),
+ multiPlan.getProps() == null ? null : multiPlan.getProps().get(i),
+ multiPlan.getTags() == null ? null : multiPlan.getTags().get(i),
+ multiPlan.getAttributes() == null ? null : multiPlan.getAttributes().get(i),
+ multiPlan.getAlias() == null ? null : multiPlan.getAlias().get(i));
+ dataTypeIdx++;
+ try {
+ createTimeSeries(plan);
+ } catch (QueryProcessException e) {
+ multiPlan.getResults().put(i, RpcUtils.getStatus(e.getErrorCode(), e.getMessage()));
+ }
+ }
+ if (!multiPlan.getResults().isEmpty()) {
+ throw new BatchProcessException(multiPlan.getFailingStatus());
+ }
+ return true;
+ }
+
+ protected boolean deleteTimeSeries(DeleteTimeSeriesPlan deleteTimeSeriesPlan)
+ throws QueryProcessException {
+ AUDIT_LOGGER.info("delete timeseries {}", deleteTimeSeriesPlan.getPaths());
+ List<PartialPath> deletePathList = deleteTimeSeriesPlan.getPaths();
+ for (int i = 0; i < deletePathList.size(); i++) {
+ PartialPath path = deletePathList.get(i);
+ try {
+ StorageEngine.getInstance()
+ .deleteTimeseries(
+ path, deleteTimeSeriesPlan.getIndex(), deleteTimeSeriesPlan.getPartitionFilter());
+ String failed = IoTDB.metaManager.deleteTimeseries(path);
+ if (failed != null) {
+ deleteTimeSeriesPlan
+ .getResults()
+ .put(i, RpcUtils.getStatus(TSStatusCode.NODE_DELETE_FAILED_ERROR, failed));
+ }
+ } catch (StorageEngineException | MetadataException e) {
+ deleteTimeSeriesPlan
+ .getResults()
+ .put(i, RpcUtils.getStatus(e.getErrorCode(), e.getMessage()));
+ }
+ }
+ if (!deleteTimeSeriesPlan.getResults().isEmpty()) {
+ throw new BatchProcessException(deleteTimeSeriesPlan.getFailingStatus());
+ }
+ return true;
+ }
+
+ private boolean alterTimeSeries(AlterTimeSeriesPlan alterTimeSeriesPlan)
+ throws QueryProcessException {
+ PartialPath path = alterTimeSeriesPlan.getPath();
+ Map<String, String> alterMap = alterTimeSeriesPlan.getAlterMap();
+ try {
+ switch (alterTimeSeriesPlan.getAlterType()) {
+ case RENAME:
+ String beforeName = alterMap.keySet().iterator().next();
+ String currentName = alterMap.get(beforeName);
+ IoTDB.metaManager.renameTagOrAttributeKey(beforeName, currentName, path);
+ break;
+ case SET:
+ IoTDB.metaManager.setTagsOrAttributesValue(alterMap, path);
+ break;
+ case DROP:
+ IoTDB.metaManager.dropTagsOrAttributes(alterMap.keySet(), path);
+ break;
+ case ADD_TAGS:
+ IoTDB.metaManager.addTags(alterMap, path);
+ break;
+ case ADD_ATTRIBUTES:
+ IoTDB.metaManager.addAttributes(alterMap, path);
+ break;
+ case UPSERT:
+ IoTDB.metaManager.upsertTagsAndAttributes(
+ alterTimeSeriesPlan.getAlias(),
+ alterTimeSeriesPlan.getTagsMap(),
+ alterTimeSeriesPlan.getAttributesMap(),
+ path);
+ break;
+ }
+ } catch (MetadataException e) {
+ throw new QueryProcessException(e);
+ } catch (IOException e) {
+ throw new QueryProcessException(
+ String.format(
+ "Something went wrong while read/write the [%s]'s tag/attribute info.",
+ path.getFullPath()));
+ }
+ return true;
+ }
+
+ public boolean setStorageGroup(SetStorageGroupPlan setStorageGroupPlan)
+ throws QueryProcessException {
+ AUDIT_LOGGER.info("set storage group to {}", setStorageGroupPlan.getPaths());
+ PartialPath path = setStorageGroupPlan.getPath();
+ try {
+ IoTDB.metaManager.setStorageGroup(path);
+ } catch (MetadataException e) {
+ throw new QueryProcessException(e);
+ }
+ return true;
+ }
+
+ protected boolean deleteStorageGroups(DeleteStorageGroupPlan deleteStorageGroupPlan)
+ throws QueryProcessException {
+ AUDIT_LOGGER.info("delete storage group {}", deleteStorageGroupPlan.getPaths());
+ List<PartialPath> deletePathList = new ArrayList<>();
+ try {
+ for (PartialPath storageGroupPath : deleteStorageGroupPlan.getPaths()) {
+ List<PartialPath> allRelatedStorageGroupPath =
+ IoTDB.metaManager.getMatchedStorageGroups(storageGroupPath);
+ if (allRelatedStorageGroupPath.isEmpty()) {
+ throw new PathNotExistException(storageGroupPath.getFullPath(), true);
+ }
+ for (PartialPath path : allRelatedStorageGroupPath) {
+ StorageEngine.getInstance().deleteStorageGroup(path);
+ deletePathList.add(path);
+ }
+ }
+ IoTDB.metaManager.deleteStorageGroups(deletePathList);
+ operateClearCache();
+ } catch (MetadataException e) {
+ throw new QueryProcessException(e);
+ }
+ return true;
+ }
+
+ protected QueryDataSet processAuthorQuery(AuthorPlan plan) throws QueryProcessException {
+ AuthorType authorType = plan.getAuthorType();
+ String userName = plan.getUserName();
+ String roleName = plan.getRoleName();
+ PartialPath path = plan.getNodeName();
+
+ ListDataSet dataSet;
+
+ try {
+ switch (authorType) {
+ case LIST_ROLE:
+ dataSet = executeListRole(plan);
+ break;
+ case LIST_USER:
+ dataSet = executeListUser(plan);
+ break;
+ case LIST_ROLE_USERS:
+ dataSet = executeListRoleUsers(roleName);
+ break;
+ case LIST_USER_ROLES:
+ dataSet = executeListUserRoles(userName);
+ break;
+ case LIST_ROLE_PRIVILEGE:
+ dataSet = executeListRolePrivileges(roleName, path);
+ break;
+ case LIST_USER_PRIVILEGE:
+ dataSet = executeListUserPrivileges(userName, path);
+ break;
+ default:
+ throw new QueryProcessException("Unsupported operation " + authorType);
+ }
+ } catch (AuthException e) {
+ throw new QueryProcessException(e.getMessage());
+ }
+ return dataSet;
+ }
+
+ private ListDataSet executeListRole(AuthorPlan plan) throws AuthException {
+ ListDataSet dataSet =
+ new ListDataSet(
+ Collections.singletonList(new PartialPath(COLUMN_ROLE, false)),
+ Collections.singletonList(TSDataType.TEXT));
+
+ // check if current user is granted list_role privilege
+ boolean hasListRolePrivilege =
+ AuthorityChecker.check(
+ plan.getLoginUserName(),
+ Collections.emptyList(),
+ plan.getOperatorType(),
+ plan.getLoginUserName());
+ if (!hasListRolePrivilege) {
+ return dataSet;
+ }
+
+ List<String> roleList = authorizer.listAllRoles();
+ addToDataSet(roleList, dataSet);
+ return dataSet;
+ }
+
+ private void addToDataSet(List<String> strResults, ListDataSet dataSet) {
+ int index = 0;
+ for (String role : strResults) {
+ RowRecord record = new RowRecord(index++);
+ Field field = new Field(TSDataType.TEXT);
+ field.setBinaryV(new Binary(role));
+ record.addField(field);
+ dataSet.putRecord(record);
+ }
+ }
+
+ private ListDataSet executeListUser(AuthorPlan plan) throws AuthException {
+ ListDataSet dataSet =
+ new ListDataSet(
+ Collections.singletonList(new PartialPath(COLUMN_USER, false)),
+ Collections.singletonList(TSDataType.TEXT));
+
+ // check if current user is granted list_user privilege
+ boolean hasListUserPrivilege =
+ AuthorityChecker.check(
+ plan.getLoginUserName(),
+ Collections.singletonList((plan.getNodeName())),
+ plan.getOperatorType(),
+ plan.getLoginUserName());
+ if (!hasListUserPrivilege) {
+ return dataSet;
+ }
+
+ List<String> userList = authorizer.listAllUsers();
+ addToDataSet(userList, dataSet);
+ return dataSet;
+ }
+
+ private ListDataSet executeListRoleUsers(String roleName) throws AuthException {
+ Role role = authorizer.getRole(roleName);
+ if (role == null) {
+ throw new AuthException("No such role : " + roleName);
+ }
+ ListDataSet dataSet =
+ new ListDataSet(
+ Collections.singletonList(new PartialPath(COLUMN_USER, false)),
+ Collections.singletonList(TSDataType.TEXT));
+ List<String> userList = authorizer.listAllUsers();
+ int index = 0;
+ for (String userN : userList) {
+ User userObj = authorizer.getUser(userN);
+ if (userObj != null && userObj.hasRole(roleName)) {
+ RowRecord record = new RowRecord(index++);
+ Field field = new Field(TSDataType.TEXT);
+ field.setBinaryV(new Binary(userN));
+ record.addField(field);
+ dataSet.putRecord(record);
+ }
+ }
+ return dataSet;
+ }
+
+ private ListDataSet executeListUserRoles(String userName) throws AuthException {
+ User user = authorizer.getUser(userName);
+ if (user != null) {
+ ListDataSet dataSet =
+ new ListDataSet(
+ Collections.singletonList(new PartialPath(COLUMN_ROLE, false)),
+ Collections.singletonList(TSDataType.TEXT));
+ int index = 0;
+ for (String roleN : user.getRoleList()) {
+ RowRecord record = new RowRecord(index++);
+ Field field = new Field(TSDataType.TEXT);
+ field.setBinaryV(new Binary(roleN));
+ record.addField(field);
+ dataSet.putRecord(record);
+ }
+ return dataSet;
+ } else {
+ throw new AuthException("No such user : " + userName);
+ }
+ }
+
+ private ListDataSet executeListRolePrivileges(String roleName, PartialPath path)
+ throws AuthException {
+ Role role = authorizer.getRole(roleName);
+ if (role != null) {
+ List<PartialPath> headerList = new ArrayList<>();
+ List<TSDataType> typeList = new ArrayList<>();
+ headerList.add(new PartialPath(COLUMN_PRIVILEGE, false));
+ typeList.add(TSDataType.TEXT);
+ ListDataSet dataSet = new ListDataSet(headerList, typeList);
+ int index = 0;
+ for (PathPrivilege pathPrivilege : role.getPrivilegeList()) {
+ if (path == null || AuthUtils.pathBelongsTo(path.getFullPath(), pathPrivilege.getPath())) {
+ RowRecord record = new RowRecord(index++);
+ Field field = new Field(TSDataType.TEXT);
+ field.setBinaryV(new Binary(pathPrivilege.toString()));
+ record.addField(field);
+ dataSet.putRecord(record);
+ }
+ }
+ return dataSet;
+ } else {
+ throw new AuthException("No such role : " + roleName);
+ }
+ }
+
+ private ListDataSet executeListUserPrivileges(String userName, PartialPath path)
+ throws AuthException {
+ User user = authorizer.getUser(userName);
+ if (user == null) {
+ throw new AuthException("No such user : " + userName);
+ }
+ List<PartialPath> headerList = new ArrayList<>();
+ List<TSDataType> typeList = new ArrayList<>();
+ headerList.add(new PartialPath(COLUMN_ROLE, false));
+ headerList.add(new PartialPath(COLUMN_PRIVILEGE, false));
+ typeList.add(TSDataType.TEXT);
+ typeList.add(TSDataType.TEXT);
+ ListDataSet dataSet = new ListDataSet(headerList, typeList);
+ int index = 0;
+ for (PathPrivilege pathPrivilege : user.getPrivilegeList()) {
+ if (path == null || AuthUtils.pathBelongsTo(path.getFullPath(), pathPrivilege.getPath())) {
+ RowRecord record = new RowRecord(index++);
+ Field roleF = new Field(TSDataType.TEXT);
+ roleF.setBinaryV(new Binary(""));
+ record.addField(roleF);
+ Field privilegeF = new Field(TSDataType.TEXT);
+ privilegeF.setBinaryV(new Binary(pathPrivilege.toString()));
+ record.addField(privilegeF);
+ dataSet.putRecord(record);
+ }
+ }
+ for (String roleN : user.getRoleList()) {
+ Role role = authorizer.getRole(roleN);
+ if (role == null) {
+ continue;
+ }
+ for (PathPrivilege pathPrivilege : role.getPrivilegeList()) {
+ if (path == null || AuthUtils.pathBelongsTo(path.getFullPath(), pathPrivilege.getPath())) {
+ RowRecord record = new RowRecord(index++);
+ Field roleF = new Field(TSDataType.TEXT);
+ roleF.setBinaryV(new Binary(roleN));
+ record.addField(roleF);
+ Field privilegeF = new Field(TSDataType.TEXT);
+ privilegeF.setBinaryV(new Binary(pathPrivilege.toString()));
+ record.addField(privilegeF);
+ dataSet.putRecord(record);
+ }
+ }
+ }
+ return dataSet;
+ }
+
+ @SuppressWarnings("unused") // for the distributed version
+ protected void loadConfiguration(LoadConfigurationPlan plan) throws QueryProcessException {
+ IoTDBDescriptor.getInstance().loadHotModifiedProps();
+ }
+
+ private QueryDataSet processShowMergeStatus() {
+ List<PartialPath> headerList = new ArrayList<>();
+ List<TSDataType> typeList = new ArrayList<>();
+ headerList.add(new PartialPath(COLUMN_STORAGE_GROUP, false));
+ headerList.add(new PartialPath(COLUMN_TASK_NAME, false));
+ headerList.add(new PartialPath(COLUMN_CREATED_TIME, false));
+ headerList.add(new PartialPath(COLUMN_PROGRESS, false));
+ headerList.add(new PartialPath(COLUMN_CANCELLED, false));
+ headerList.add(new PartialPath(COLUMN_DONE, false));
+
+ typeList.add(TSDataType.TEXT);
+ typeList.add(TSDataType.TEXT);
+ typeList.add(TSDataType.TEXT);
+ typeList.add(TSDataType.TEXT);
+ typeList.add(TSDataType.BOOLEAN);
+ typeList.add(TSDataType.BOOLEAN);
+ ListDataSet dataSet = new ListDataSet(headerList, typeList);
+ Map<String, List<TaskStatus>>[] taskStatus = MergeManager.getINSTANCE().collectTaskStatus();
+ for (Map<String, List<TaskStatus>> statusMap : taskStatus) {
+ for (Entry<String, List<TaskStatus>> stringListEntry : statusMap.entrySet()) {
+ for (TaskStatus status : stringListEntry.getValue()) {
+ dataSet.putRecord(toRowRecord(status, stringListEntry.getKey()));
+ }
+ }
+ }
+ return dataSet;
+ }
+
+ public RowRecord toRowRecord(TaskStatus status, String storageGroup) {
+ RowRecord record = new RowRecord(0);
+ record.addField(new Binary(storageGroup), TSDataType.TEXT);
+ record.addField(new Binary(status.getTaskName()), TSDataType.TEXT);
+ record.addField(new Binary(status.getCreatedTime()), TSDataType.TEXT);
+ record.addField(new Binary(status.getProgress()), TSDataType.TEXT);
+ record.addField(status.isCancelled(), TSDataType.BOOLEAN);
+ record.addField(status.isDone(), TSDataType.BOOLEAN);
+ return record;
+ }
+
+ private QueryDataSet processShowQueryProcesslist() {
+ ListDataSet listDataSet =
+ new ListDataSet(
+ Arrays.asList(new PartialPath(QUERY_ID, false), new PartialPath(STATEMENT, false)),
+ Arrays.asList(TSDataType.INT64, TSDataType.TEXT));
+ QueryTimeManager queryTimeManager = QueryTimeManager.getInstance();
+ for (Entry<Long, QueryContext> context : queryTimeManager.getQueryContextMap().entrySet()) {
+ RowRecord record = new RowRecord(context.getValue().getStartTime());
+ record.addField(context.getKey(), TSDataType.INT64);
+ record.addField(new Binary(context.getValue().getStatement()), TSDataType.TEXT);
+ listDataSet.putRecord(record);
+ }
+ return listDataSet;
+ }
+
+ /**
+ * @param storageGroups the storage groups to check
+ * @return List of PartialPath the storage groups that not exist
+ */
+ List<PartialPath> checkStorageGroupExist(List<PartialPath> storageGroups) {
+ List<PartialPath> noExistSg = new ArrayList<>();
+ if (storageGroups == null) {
+ return noExistSg;
+ }
+ for (PartialPath storageGroup : storageGroups) {
+ if (!IoTDB.metaManager.isStorageGroup(storageGroup)) {
+ noExistSg.add(storageGroup);
+ }
+ }
+ return noExistSg;
+ }
+
+ private void settle(SettlePlan plan) throws StorageEngineException {
+ if (IoTDBDescriptor.getInstance().getConfig().isReadOnly()) {
+ throw new StorageEngineException(
+ "Current system mode is read only, does not support file settle");
+ }
+ if (!SettleService.getINSTANCE().isRecoverFinish()) {
+ throw new StorageEngineException("Existing sg that is not ready, please try later.");
+ }
+ PartialPath sgPath = null;
+ try {
+ List<TsFileResource> seqResourcesToBeSettled = new ArrayList<>();
+ List<TsFileResource> unseqResourcesToBeSettled = new ArrayList<>();
+ List<String> tsFilePaths = new ArrayList<>();
+ if (plan.isSgPath()) {
+ sgPath = plan.getSgPath();
+ } else {
+ String tsFilePath = plan.getTsFilePath();
+ if (new File(tsFilePath).isDirectory()) {
+ throw new WriteProcessException("The file should not be a directory.");
+ } else if (!new File(tsFilePath).exists()) {
+ throw new WriteProcessException("The tsFile " + tsFilePath + " is not existed.");
+ }
+ sgPath = SettleService.getINSTANCE().getSGByFilePath(tsFilePath);
+ tsFilePaths.add(tsFilePath);
+ }
+ StorageEngine.getInstance()
+ .getResourcesToBeSettled(
+ sgPath, seqResourcesToBeSettled, unseqResourcesToBeSettled, tsFilePaths);
+ SettleService.getINSTANCE().startSettling(seqResourcesToBeSettled, unseqResourcesToBeSettled);
+ StorageEngine.getInstance().setSettling(sgPath, false);
+ } catch (WriteProcessException e) {
+ if (sgPath != null) {
+ StorageEngine.getInstance().setSettling(sgPath, false);
+ }
+ throw new StorageEngineException(e.getMessage());
+ }
+ }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/service/basic/BasicServiceProvider.java b/server/src/main/java/org/apache/iotdb/db/service/basic/BasicServiceProvider.java
index 6f8e823a4e..c9bcdf1cd6 100644
--- a/server/src/main/java/org/apache/iotdb/db/service/basic/BasicServiceProvider.java
+++ b/server/src/main/java/org/apache/iotdb/db/service/basic/BasicServiceProvider.java
@@ -33,8 +33,6 @@ import org.apache.iotdb.db.exception.metadata.StorageGroupNotSetException;
import org.apache.iotdb.db.exception.query.QueryProcessException;
import org.apache.iotdb.db.metadata.path.PartialPath;
import org.apache.iotdb.db.qp.Planner;
-import org.apache.iotdb.db.qp.executor.IPlanExecutor;
-import org.apache.iotdb.db.qp.executor.PlanExecutor;
import org.apache.iotdb.db.qp.physical.PhysicalPlan;
import org.apache.iotdb.db.qp.physical.sys.AuthorPlan;
import org.apache.iotdb.db.qp.physical.sys.FlushPlan;
@@ -44,6 +42,8 @@ import org.apache.iotdb.db.query.control.QueryTimeManager;
import org.apache.iotdb.db.query.control.SessionManager;
import org.apache.iotdb.db.query.control.SessionTimeoutManager;
import org.apache.iotdb.db.query.control.tracing.TracingManager;
+import org.apache.iotdb.db.query.filter.executor.IPlanExecutor;
+import org.apache.iotdb.db.query.filter.executor.PlanExecutor;
import org.apache.iotdb.rpc.RpcUtils;
import org.apache.iotdb.rpc.TSStatusCode;
import org.apache.iotdb.service.rpc.thrift.TSProtocolVersion;
diff --git a/server/src/test/java/org/apache/iotdb/db/engine/storagegroup/TTLTest.java b/server/src/test/java/org/apache/iotdb/db/engine/storagegroup/TTLTest.java
index 27bba9d092..11a2026e5c 100644
--- a/server/src/test/java/org/apache/iotdb/db/engine/storagegroup/TTLTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/engine/storagegroup/TTLTest.java
@@ -39,10 +39,10 @@ import org.apache.iotdb.db.metadata.mnode.MeasurementMNode;
import org.apache.iotdb.db.metadata.path.MeasurementPath;
import org.apache.iotdb.db.metadata.path.PartialPath;
import org.apache.iotdb.db.qp.Planner;
-import org.apache.iotdb.db.qp.executor.PlanExecutor;
import org.apache.iotdb.db.qp.physical.crud.InsertRowPlan;
import org.apache.iotdb.db.qp.physical.sys.SetTTLPlan;
import org.apache.iotdb.db.qp.physical.sys.ShowTTLPlan;
+import org.apache.iotdb.db.query.filter.executor.PlanExecutor;
import org.apache.iotdb.db.query.reader.series.SeriesRawDataBatchReader;
import org.apache.iotdb.db.service.IoTDB;
import org.apache.iotdb.db.utils.EnvironmentUtils;
diff --git a/server/src/test/java/org/apache/iotdb/db/metadata/idtable/IDTableResourceControlTest.java b/server/src/test/java/org/apache/iotdb/db/metadata/idtable/IDTableResourceControlTest.java
index 9766bfc85b..860bdd0ab7 100644
--- a/server/src/test/java/org/apache/iotdb/db/metadata/idtable/IDTableResourceControlTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/metadata/idtable/IDTableResourceControlTest.java
@@ -30,9 +30,9 @@ import org.apache.iotdb.db.metadata.idtable.entry.DeviceEntry;
import org.apache.iotdb.db.metadata.idtable.entry.IDeviceID;
import org.apache.iotdb.db.metadata.path.PartialPath;
import org.apache.iotdb.db.qp.Planner;
-import org.apache.iotdb.db.qp.executor.PlanExecutor;
import org.apache.iotdb.db.qp.physical.crud.InsertRowPlan;
import org.apache.iotdb.db.qp.physical.crud.QueryPlan;
+import org.apache.iotdb.db.query.filter.executor.PlanExecutor;
import org.apache.iotdb.db.utils.EnvironmentUtils;
import org.apache.iotdb.tsfile.exception.filter.QueryFilterOptimizationException;
import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
diff --git a/server/src/test/java/org/apache/iotdb/db/metadata/idtable/IDTableRestartTest.java b/server/src/test/java/org/apache/iotdb/db/metadata/idtable/IDTableRestartTest.java
index 526226884f..e3e21b1cb6 100644
--- a/server/src/test/java/org/apache/iotdb/db/metadata/idtable/IDTableRestartTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/metadata/idtable/IDTableRestartTest.java
@@ -25,11 +25,11 @@ import org.apache.iotdb.db.exception.metadata.IllegalPathException;
import org.apache.iotdb.db.exception.query.QueryProcessException;
import org.apache.iotdb.db.metadata.path.PartialPath;
import org.apache.iotdb.db.qp.Planner;
-import org.apache.iotdb.db.qp.executor.PlanExecutor;
import org.apache.iotdb.db.qp.physical.PhysicalPlan;
import org.apache.iotdb.db.qp.physical.crud.InsertRowPlan;
import org.apache.iotdb.db.qp.physical.crud.InsertTabletPlan;
import org.apache.iotdb.db.qp.physical.crud.QueryPlan;
+import org.apache.iotdb.db.query.filter.executor.PlanExecutor;
import org.apache.iotdb.db.utils.EnvironmentUtils;
import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
import org.apache.iotdb.tsfile.read.common.RowRecord;
diff --git a/server/src/test/java/org/apache/iotdb/db/metadata/idtable/InsertWithIDTableTest.java b/server/src/test/java/org/apache/iotdb/db/metadata/idtable/InsertWithIDTableTest.java
index e70e5ee404..21284733fe 100644
--- a/server/src/test/java/org/apache/iotdb/db/metadata/idtable/InsertWithIDTableTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/metadata/idtable/InsertWithIDTableTest.java
@@ -26,7 +26,6 @@ import org.apache.iotdb.db.exception.metadata.MetadataException;
import org.apache.iotdb.db.exception.query.QueryProcessException;
import org.apache.iotdb.db.metadata.path.PartialPath;
import org.apache.iotdb.db.qp.Planner;
-import org.apache.iotdb.db.qp.executor.PlanExecutor;
import org.apache.iotdb.db.qp.physical.PhysicalPlan.PhysicalPlanType;
import org.apache.iotdb.db.qp.physical.crud.InsertMultiTabletPlan;
import org.apache.iotdb.db.qp.physical.crud.InsertRowPlan;
@@ -34,6 +33,7 @@ import org.apache.iotdb.db.qp.physical.crud.InsertTabletPlan;
import org.apache.iotdb.db.qp.physical.crud.QueryPlan;
import org.apache.iotdb.db.qp.physical.sys.CreateTemplatePlan;
import org.apache.iotdb.db.qp.physical.sys.SetTemplatePlan;
+import org.apache.iotdb.db.query.filter.executor.PlanExecutor;
import org.apache.iotdb.db.service.IoTDB;
import org.apache.iotdb.db.utils.EnvironmentUtils;
import org.apache.iotdb.tsfile.exception.filter.QueryFilterOptimizationException;
diff --git a/server/src/test/java/org/apache/iotdb/db/metadata/idtable/LastQueryWithIDTable.java b/server/src/test/java/org/apache/iotdb/db/metadata/idtable/LastQueryWithIDTable.java
index 2a596895e9..60ba99f132 100644
--- a/server/src/test/java/org/apache/iotdb/db/metadata/idtable/LastQueryWithIDTable.java
+++ b/server/src/test/java/org/apache/iotdb/db/metadata/idtable/LastQueryWithIDTable.java
@@ -26,10 +26,10 @@ import org.apache.iotdb.db.exception.query.QueryProcessException;
import org.apache.iotdb.db.metadata.idtable.entry.TimeseriesID;
import org.apache.iotdb.db.metadata.path.PartialPath;
import org.apache.iotdb.db.qp.Planner;
-import org.apache.iotdb.db.qp.executor.PlanExecutor;
import org.apache.iotdb.db.qp.physical.PhysicalPlan;
import org.apache.iotdb.db.qp.physical.crud.InsertTabletPlan;
import org.apache.iotdb.db.qp.physical.crud.QueryPlan;
+import org.apache.iotdb.db.query.filter.executor.PlanExecutor;
import org.apache.iotdb.db.utils.EnvironmentUtils;
import org.apache.iotdb.tsfile.exception.filter.QueryFilterOptimizationException;
import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
diff --git a/server/src/test/java/org/apache/iotdb/db/metadata/idtable/QueryAlignedTimeseriesWithIDTableTest.java b/server/src/test/java/org/apache/iotdb/db/metadata/idtable/QueryAlignedTimeseriesWithIDTableTest.java
index 8aeb162b48..a449f92ff0 100644
--- a/server/src/test/java/org/apache/iotdb/db/metadata/idtable/QueryAlignedTimeseriesWithIDTableTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/metadata/idtable/QueryAlignedTimeseriesWithIDTableTest.java
@@ -28,10 +28,10 @@ import org.apache.iotdb.db.exception.query.QueryProcessException;
import org.apache.iotdb.db.metadata.idtable.entry.TimeseriesID;
import org.apache.iotdb.db.metadata.path.PartialPath;
import org.apache.iotdb.db.qp.Planner;
-import org.apache.iotdb.db.qp.executor.PlanExecutor;
import org.apache.iotdb.db.qp.physical.PhysicalPlan;
import org.apache.iotdb.db.qp.physical.crud.InsertTabletPlan;
import org.apache.iotdb.db.qp.physical.crud.QueryPlan;
+import org.apache.iotdb.db.query.filter.executor.PlanExecutor;
import org.apache.iotdb.db.utils.EnvironmentUtils;
import org.apache.iotdb.tsfile.exception.filter.QueryFilterOptimizationException;
import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
diff --git a/server/src/test/java/org/apache/iotdb/db/metadata/idtable/QueryWithIDTableTest.java b/server/src/test/java/org/apache/iotdb/db/metadata/idtable/QueryWithIDTableTest.java
index 329e4bf090..f7fd194709 100644
--- a/server/src/test/java/org/apache/iotdb/db/metadata/idtable/QueryWithIDTableTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/metadata/idtable/QueryWithIDTableTest.java
@@ -28,10 +28,10 @@ import org.apache.iotdb.db.exception.query.QueryProcessException;
import org.apache.iotdb.db.metadata.idtable.entry.TimeseriesID;
import org.apache.iotdb.db.metadata.path.PartialPath;
import org.apache.iotdb.db.qp.Planner;
-import org.apache.iotdb.db.qp.executor.PlanExecutor;
import org.apache.iotdb.db.qp.physical.PhysicalPlan;
import org.apache.iotdb.db.qp.physical.crud.InsertTabletPlan;
import org.apache.iotdb.db.qp.physical.crud.QueryPlan;
+import org.apache.iotdb.db.query.filter.executor.PlanExecutor;
import org.apache.iotdb.db.utils.EnvironmentUtils;
import org.apache.iotdb.tsfile.exception.filter.QueryFilterOptimizationException;
import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
diff --git a/server/src/test/java/org/apache/iotdb/db/qp/physical/InsertRowPlanTest.java b/server/src/test/java/org/apache/iotdb/db/qp/physical/InsertRowPlanTest.java
index 5677ad27c8..4f8e894bb8 100644
--- a/server/src/test/java/org/apache/iotdb/db/qp/physical/InsertRowPlanTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/qp/physical/InsertRowPlanTest.java
@@ -25,12 +25,12 @@ import org.apache.iotdb.db.exception.metadata.MetadataException;
import org.apache.iotdb.db.exception.query.QueryProcessException;
import org.apache.iotdb.db.metadata.path.PartialPath;
import org.apache.iotdb.db.qp.Planner;
-import org.apache.iotdb.db.qp.executor.PlanExecutor;
import org.apache.iotdb.db.qp.physical.PhysicalPlan.PhysicalPlanType;
import org.apache.iotdb.db.qp.physical.crud.InsertRowPlan;
import org.apache.iotdb.db.qp.physical.crud.QueryPlan;
import org.apache.iotdb.db.qp.physical.sys.CreateTemplatePlan;
import org.apache.iotdb.db.qp.physical.sys.SetTemplatePlan;
+import org.apache.iotdb.db.query.filter.executor.PlanExecutor;
import org.apache.iotdb.db.service.IoTDB;
import org.apache.iotdb.db.utils.EnvironmentUtils;
import org.apache.iotdb.tsfile.exception.filter.QueryFilterOptimizationException;
diff --git a/server/src/test/java/org/apache/iotdb/db/qp/physical/InsertTabletMultiPlanTest.java b/server/src/test/java/org/apache/iotdb/db/qp/physical/InsertTabletMultiPlanTest.java
index 062bbc3611..db9584f819 100644
--- a/server/src/test/java/org/apache/iotdb/db/qp/physical/InsertTabletMultiPlanTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/qp/physical/InsertTabletMultiPlanTest.java
@@ -23,10 +23,10 @@ import org.apache.iotdb.db.exception.metadata.MetadataException;
import org.apache.iotdb.db.exception.query.QueryProcessException;
import org.apache.iotdb.db.metadata.path.PartialPath;
import org.apache.iotdb.db.qp.Planner;
-import org.apache.iotdb.db.qp.executor.PlanExecutor;
import org.apache.iotdb.db.qp.physical.crud.InsertMultiTabletPlan;
import org.apache.iotdb.db.qp.physical.crud.InsertTabletPlan;
import org.apache.iotdb.db.qp.physical.crud.QueryPlan;
+import org.apache.iotdb.db.query.filter.executor.PlanExecutor;
import org.apache.iotdb.db.utils.EnvironmentUtils;
import org.apache.iotdb.tsfile.exception.filter.QueryFilterOptimizationException;
import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
diff --git a/server/src/test/java/org/apache/iotdb/db/qp/physical/InsertTabletPlanTest.java b/server/src/test/java/org/apache/iotdb/db/qp/physical/InsertTabletPlanTest.java
index 75213a288a..bd0cb839cf 100644
--- a/server/src/test/java/org/apache/iotdb/db/qp/physical/InsertTabletPlanTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/qp/physical/InsertTabletPlanTest.java
@@ -25,12 +25,12 @@ import org.apache.iotdb.db.exception.metadata.MetadataException;
import org.apache.iotdb.db.exception.query.QueryProcessException;
import org.apache.iotdb.db.metadata.path.PartialPath;
import org.apache.iotdb.db.qp.Planner;
-import org.apache.iotdb.db.qp.executor.PlanExecutor;
import org.apache.iotdb.db.qp.physical.PhysicalPlan.PhysicalPlanType;
import org.apache.iotdb.db.qp.physical.crud.InsertTabletPlan;
import org.apache.iotdb.db.qp.physical.crud.QueryPlan;
import org.apache.iotdb.db.qp.physical.sys.CreateTemplatePlan;
import org.apache.iotdb.db.qp.physical.sys.SetTemplatePlan;
+import org.apache.iotdb.db.query.filter.executor.PlanExecutor;
import org.apache.iotdb.db.service.IoTDB;
import org.apache.iotdb.db.utils.EnvironmentUtils;
import org.apache.iotdb.tsfile.exception.filter.QueryFilterOptimizationException;
diff --git a/server/src/test/java/org/apache/iotdb/db/query/aggregation/AggregateResultTest.java b/server/src/test/java/org/apache/iotdb/db/query/aggregation/AggregateResultTest.java
index d33d1c9230..ab955ed0f8 100644
--- a/server/src/test/java/org/apache/iotdb/db/query/aggregation/AggregateResultTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/query/aggregation/AggregateResultTest.java
@@ -36,6 +36,8 @@ import java.nio.ByteBuffer;
/** Unit tests of AggregateResult without desc aggregate result. */
public class AggregateResultTest {
+ private static final double maxError = 0.0001d;
+
@Test
public void avgAggrResultTest() throws QueryProcessException, IOException {
AggregateResult avgAggrResult1 =
@@ -291,4 +293,36 @@ public class AggregateResultTest {
AggregateResult result = AggregateResult.deserializeFrom(byteBuffer);
Assert.assertEquals(2d, (double) result.getResult(), 0.01);
}
+
+ // @Test
+ // public void validityAggrResultTest() throws QueryProcessException, IOException {
+ // AggregateResult validityAggrResult1 =
+ // AggregateResultFactory.getAggrResultByName(SQLConstant.VALIDITY, TSDataType.DOUBLE,
+ // true);
+ // AggregateResult validityAggrResult2 =
+ // AggregateResultFactory.getAggrResultByName(SQLConstant.VALIDITY, TSDataType.DOUBLE,
+ // true);
+ //
+ // Statistics statistics1 = Statistics.getStatsByType(TSDataType.DOUBLE);
+ // Statistics statistics2 = Statistics.getStatsByType(TSDataType.DOUBLE);
+ // statistics1.update(1L, 1d);
+ // statistics1.update(2L, 1d);
+ // statistics2.update(3L, 1d);
+ // statistics2.update(4L, 3d);
+ //
+ // validityAggrResult1.updateResultFromStatistics(statistics1);
+ // validityAggrResult2.updateResultFromStatistics(statistics2);
+ // // System.out.println(validityAggrResult1.getResult());
+ // // System.out.println(validityAggrResult2.getResult());
+ // validityAggrResult1.merge(validityAggrResult2);
+ // // System.out.println(validityAggrResult1.getResult());
+ //
+ // // Assert.assertEquals(0.25, (double) validityAggrResult1.getResult(), maxError);
+ //
+ // ByteArrayOutputStream outputStream = new ByteArrayOutputStream();
+ // validityAggrResult1.serializeTo(outputStream);
+ // ByteBuffer byteBuffer = ByteBuffer.wrap(outputStream.toByteArray());
+ // AggregateResult result = AggregateResult.deserializeFrom(byteBuffer);
+ // Assert.assertEquals(0.25, (double) result.getResult(), maxError);
+ // }
}
diff --git a/server/src/test/java/org/apache/iotdb/db/query/aggregation/DescAggregateResultTest.java b/server/src/test/java/org/apache/iotdb/db/query/aggregation/DescAggregateResultTest.java
index d038b3467f..56fb6ef09d 100644
--- a/server/src/test/java/org/apache/iotdb/db/query/aggregation/DescAggregateResultTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/query/aggregation/DescAggregateResultTest.java
@@ -19,172 +19,37 @@
package org.apache.iotdb.db.query.aggregation;
-import org.apache.iotdb.db.exception.query.QueryProcessException;
-import org.apache.iotdb.db.qp.constant.SQLConstant;
-import org.apache.iotdb.db.query.factory.AggregateResultFactory;
-import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
-import org.apache.iotdb.tsfile.file.metadata.statistics.Statistics;
-import org.apache.iotdb.tsfile.read.common.BatchData;
-import org.apache.iotdb.tsfile.read.common.BatchDataFactory;
-import org.apache.iotdb.tsfile.read.common.IBatchDataIterator;
-import org.apache.iotdb.tsfile.utils.Binary;
-
-import org.junit.Assert;
-import org.junit.Test;
-
-import java.io.ByteArrayOutputStream;
-import java.io.IOException;
-import java.nio.ByteBuffer;
-
/** Unit tests of desc aggregate result. */
public class DescAggregateResultTest {
- @Test
- public void maxTimeDescAggrResultTest() throws QueryProcessException, IOException {
- AggregateResult maxTimeDescAggrResult =
- AggregateResultFactory.getAggrResultByName(SQLConstant.MAX_TIME, TSDataType.FLOAT, false);
-
- Statistics statistics1 = Statistics.getStatsByType(TSDataType.FLOAT);
- Statistics statistics2 = Statistics.getStatsByType(TSDataType.FLOAT);
- statistics1.update(10L, 10.0f);
- statistics2.update(1L, 1.0f);
-
- maxTimeDescAggrResult.updateResultFromStatistics(statistics1);
- Assert.assertEquals(10L, (long) maxTimeDescAggrResult.getResult());
- maxTimeDescAggrResult.updateResultFromStatistics(statistics2);
- Assert.assertEquals(10L, (long) maxTimeDescAggrResult.getResult());
-
- ByteArrayOutputStream outputStream = new ByteArrayOutputStream();
- maxTimeDescAggrResult.serializeTo(outputStream);
- ByteBuffer byteBuffer = ByteBuffer.wrap(outputStream.toByteArray());
- AggregateResult result = AggregateResult.deserializeFrom(byteBuffer);
- Assert.assertEquals(10L, (long) result.getResult());
-
- maxTimeDescAggrResult.reset();
- BatchData batchData = BatchDataFactory.createBatchData(TSDataType.FLOAT, false, false);
- batchData.putFloat(1, 1.0F);
- batchData.putFloat(2, 2.0F);
- batchData.putFloat(3, 3.0F);
- batchData.putFloat(4, 4.0F);
- batchData.putFloat(5, 5.0F);
- batchData.resetBatchData();
- IBatchDataIterator it = batchData.getBatchDataIterator();
- maxTimeDescAggrResult.updateResultFromPageData(it);
- Assert.assertEquals(5L, maxTimeDescAggrResult.getResult());
- it.reset();
- maxTimeDescAggrResult.updateResultFromPageData(it, 2, 5);
- Assert.assertEquals(5L, maxTimeDescAggrResult.getResult());
- }
-
- @Test
- public void minTimeDescAggrResultTest() throws QueryProcessException, IOException {
- AggregateResult minTimeDescAggrResult =
- AggregateResultFactory.getAggrResultByName(SQLConstant.MIN_TIME, TSDataType.FLOAT, false);
-
- Statistics statistics1 = Statistics.getStatsByType(TSDataType.FLOAT);
- Statistics statistics2 = Statistics.getStatsByType(TSDataType.FLOAT);
- statistics1.update(10L, 10.0f);
- statistics2.update(1L, 1.0f);
-
- minTimeDescAggrResult.updateResultFromStatistics(statistics1);
- Assert.assertEquals(10L, (long) minTimeDescAggrResult.getResult());
- minTimeDescAggrResult.updateResultFromStatistics(statistics2);
- Assert.assertEquals(1L, (long) minTimeDescAggrResult.getResult());
-
- ByteArrayOutputStream outputStream = new ByteArrayOutputStream();
- minTimeDescAggrResult.serializeTo(outputStream);
- ByteBuffer byteBuffer = ByteBuffer.wrap(outputStream.toByteArray());
- AggregateResult result = AggregateResult.deserializeFrom(byteBuffer);
- Assert.assertEquals(1L, (long) result.getResult());
-
- minTimeDescAggrResult.reset();
- BatchData batchData = BatchDataFactory.createBatchData(TSDataType.FLOAT, false, false);
- batchData.putFloat(1, 1.0F);
- batchData.putFloat(2, 2.0F);
- batchData.putFloat(3, 3.0F);
- batchData.putFloat(4, 4.0F);
- batchData.putFloat(5, 5.0F);
- batchData.resetBatchData();
- IBatchDataIterator it = batchData.getBatchDataIterator();
- minTimeDescAggrResult.updateResultFromPageData(it);
- Assert.assertEquals(1L, minTimeDescAggrResult.getResult());
- it.reset();
- minTimeDescAggrResult.updateResultFromPageData(it, 1, 3);
- Assert.assertEquals(1L, minTimeDescAggrResult.getResult());
- }
-
- @Test
- public void firstValueDescAggrResultTest() throws QueryProcessException, IOException {
- AggregateResult firstValueDescAggrResult =
- AggregateResultFactory.getAggrResultByName(
- SQLConstant.FIRST_VALUE, TSDataType.BOOLEAN, false);
-
- Statistics statistics1 = Statistics.getStatsByType(TSDataType.BOOLEAN);
- Statistics statistics2 = Statistics.getStatsByType(TSDataType.BOOLEAN);
- statistics1.update(10L, true);
- statistics2.update(1L, false);
-
- firstValueDescAggrResult.updateResultFromStatistics(statistics1);
- Assert.assertEquals(true, firstValueDescAggrResult.getResult());
- firstValueDescAggrResult.updateResultFromStatistics(statistics2);
- Assert.assertEquals(false, firstValueDescAggrResult.getResult());
-
- ByteArrayOutputStream outputStream = new ByteArrayOutputStream();
- firstValueDescAggrResult.serializeTo(outputStream);
- ByteBuffer byteBuffer = ByteBuffer.wrap(outputStream.toByteArray());
- AggregateResult result = AggregateResult.deserializeFrom(byteBuffer);
- Assert.assertEquals(false, result.getResult());
-
- firstValueDescAggrResult.reset();
- BatchData batchData = BatchDataFactory.createBatchData(TSDataType.BOOLEAN, false, false);
- batchData.putBoolean(1, true);
- batchData.putBoolean(2, false);
- batchData.putBoolean(3, false);
- batchData.putBoolean(4, true);
- batchData.putBoolean(5, false);
- batchData.resetBatchData();
- IBatchDataIterator it = batchData.getBatchDataIterator();
- firstValueDescAggrResult.updateResultFromPageData(it);
- Assert.assertTrue((boolean) firstValueDescAggrResult.getResult());
- it.reset();
- firstValueDescAggrResult.updateResultFromPageData(it, 1, 3);
- Assert.assertTrue((boolean) firstValueDescAggrResult.getResult());
- }
-
- @Test
- public void lastValueDescAggrResultTest() throws QueryProcessException, IOException {
- AggregateResult lastValueDescAggrResult =
- AggregateResultFactory.getAggrResultByName(SQLConstant.LAST_VALUE, TSDataType.TEXT, false);
-
- Statistics statistics1 = Statistics.getStatsByType(TSDataType.TEXT);
- Statistics statistics2 = Statistics.getStatsByType(TSDataType.TEXT);
- statistics1.update(10L, new Binary("last"));
- statistics2.update(1L, new Binary("first"));
-
- lastValueDescAggrResult.updateResultFromStatistics(statistics1);
- Assert.assertEquals("last", String.valueOf(lastValueDescAggrResult.getResult()));
- lastValueDescAggrResult.updateResultFromStatistics(statistics2);
- Assert.assertEquals("last", String.valueOf(lastValueDescAggrResult.getResult()));
-
- ByteArrayOutputStream outputStream = new ByteArrayOutputStream();
- lastValueDescAggrResult.serializeTo(outputStream);
- ByteBuffer byteBuffer = ByteBuffer.wrap(outputStream.toByteArray());
- AggregateResult result = AggregateResult.deserializeFrom(byteBuffer);
- Assert.assertEquals("last", String.valueOf(result.getResult()));
-
- lastValueDescAggrResult.reset();
- BatchData batchData = BatchDataFactory.createBatchData(TSDataType.TEXT, false, false);
- batchData.putBinary(1L, new Binary("a"));
- batchData.putBinary(2L, new Binary("b"));
- batchData.putBinary(3L, new Binary("c"));
- batchData.putBinary(4L, new Binary("d"));
- batchData.putBinary(5L, new Binary("e"));
- batchData.resetBatchData();
- IBatchDataIterator it = batchData.getBatchDataIterator();
- lastValueDescAggrResult.updateResultFromPageData(it);
- Assert.assertEquals("e", ((Binary) lastValueDescAggrResult.getResult()).getStringValue());
- it.reset();
- lastValueDescAggrResult.updateResultFromPageData(it, 3, 5);
- Assert.assertEquals("e", ((Binary) lastValueDescAggrResult.getResult()).getStringValue());
- }
+ // @Test
+ // public void validityMergeTest() throws QueryProcessException, IOException {
+ // AggregateResult ValidityAggrResult =
+ // AggregateResultFactory.getAggrResultByName(SQLConstant.VALIDITY, TSDataType.DOUBLE,
+ // true);
+ //
+ // System.out.println(Runtime.getRuntime().freeMemory() / 1024 / 1024);
+ //
+ // BatchData batchData = BatchDataFactory.createBatchData(TSDataType.DOUBLE, true, true);
+ // BatchData batchData2 = BatchDataFactory.createBatchData(TSDataType.DOUBLE, true, true);
+ //
+ // for (int i = 0; i < 3000; i++) {
+ // batchData.putDouble(1623311071000L - 3000 + i, 0d);
+ // }
+ // batchData.putDouble(1623311071000L, 0d);
+ // batchData.putDouble(1623312051000L, 2d);
+ // batchData.putDouble(1623312053000L, 2d);
+ // batchData.putDouble(1623312055000L, 2d);
+ // batchData.putDouble(1623312057000L, 2d);
+ // for (int i = 0; i < 7024; i++) {
+ // batchData2.putDouble(1623312057000L + i, 0d);
+ // }
+ // batchData.resetBatchData();
+ // batchData2.resetBatchData();
+ // IBatchDataIterator it = batchData.getBatchDataIterator();
+ // IBatchDataIterator it2 = batchData2.getBatchDataIterator();
+ // ValidityAggrResult.updateResultFromPageData(it);
+ // it.reset();
+ // ValidityAggrResult.updateResultFromPageData(it2);
+ // }
}
diff --git a/server/src/test/java/org/apache/iotdb/db/query/dataset/EngineDataSetWithValueFilterTest.java b/server/src/test/java/org/apache/iotdb/db/query/dataset/EngineDataSetWithValueFilterTest.java
index 07fad6d9df..92171f832c 100644
--- a/server/src/test/java/org/apache/iotdb/db/query/dataset/EngineDataSetWithValueFilterTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/query/dataset/EngineDataSetWithValueFilterTest.java
@@ -20,9 +20,9 @@ package org.apache.iotdb.db.query.dataset;
import org.apache.iotdb.db.exception.query.QueryProcessException;
import org.apache.iotdb.db.qp.Planner;
-import org.apache.iotdb.db.qp.executor.IPlanExecutor;
-import org.apache.iotdb.db.qp.executor.PlanExecutor;
import org.apache.iotdb.db.qp.physical.crud.QueryPlan;
+import org.apache.iotdb.db.query.filter.executor.IPlanExecutor;
+import org.apache.iotdb.db.query.filter.executor.PlanExecutor;
import org.apache.iotdb.db.service.IoTDB;
import org.apache.iotdb.db.utils.EnvironmentUtils;
import org.apache.iotdb.tsfile.read.query.dataset.QueryDataSet;
diff --git a/server/src/test/java/org/apache/iotdb/db/query/dataset/ListDataSetTest.java b/server/src/test/java/org/apache/iotdb/db/query/dataset/ListDataSetTest.java
index 14873748bd..8909c62d82 100644
--- a/server/src/test/java/org/apache/iotdb/db/query/dataset/ListDataSetTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/query/dataset/ListDataSetTest.java
@@ -22,9 +22,9 @@ import org.apache.iotdb.db.exception.StorageEngineException;
import org.apache.iotdb.db.exception.metadata.MetadataException;
import org.apache.iotdb.db.exception.query.QueryProcessException;
import org.apache.iotdb.db.qp.Planner;
-import org.apache.iotdb.db.qp.executor.IPlanExecutor;
-import org.apache.iotdb.db.qp.executor.PlanExecutor;
import org.apache.iotdb.db.qp.physical.PhysicalPlan;
+import org.apache.iotdb.db.query.filter.executor.IPlanExecutor;
+import org.apache.iotdb.db.query.filter.executor.PlanExecutor;
import org.apache.iotdb.db.utils.EnvironmentUtils;
import org.apache.iotdb.tsfile.exception.filter.QueryFilterOptimizationException;
import org.apache.iotdb.tsfile.read.common.RowRecord;
diff --git a/server/src/test/java/org/apache/iotdb/db/query/dataset/SingleDataSetTest.java b/server/src/test/java/org/apache/iotdb/db/query/dataset/SingleDataSetTest.java
index 8f9a15b1c1..6aa4073366 100644
--- a/server/src/test/java/org/apache/iotdb/db/query/dataset/SingleDataSetTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/query/dataset/SingleDataSetTest.java
@@ -22,9 +22,9 @@ import org.apache.iotdb.db.exception.StorageEngineException;
import org.apache.iotdb.db.exception.metadata.MetadataException;
import org.apache.iotdb.db.exception.query.QueryProcessException;
import org.apache.iotdb.db.qp.Planner;
-import org.apache.iotdb.db.qp.executor.IPlanExecutor;
-import org.apache.iotdb.db.qp.executor.PlanExecutor;
import org.apache.iotdb.db.qp.physical.PhysicalPlan;
+import org.apache.iotdb.db.query.filter.executor.IPlanExecutor;
+import org.apache.iotdb.db.query.filter.executor.PlanExecutor;
import org.apache.iotdb.db.utils.EnvironmentUtils;
import org.apache.iotdb.tsfile.exception.filter.QueryFilterOptimizationException;
import org.apache.iotdb.tsfile.read.common.RowRecord;
diff --git a/server/src/test/java/org/apache/iotdb/db/query/dataset/UDTFAlignByTimeDataSetTest.java b/server/src/test/java/org/apache/iotdb/db/query/dataset/UDTFAlignByTimeDataSetTest.java
index 34d7edc939..23ed66971f 100644
--- a/server/src/test/java/org/apache/iotdb/db/query/dataset/UDTFAlignByTimeDataSetTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/query/dataset/UDTFAlignByTimeDataSetTest.java
@@ -24,9 +24,9 @@ import org.apache.iotdb.db.exception.metadata.MetadataException;
import org.apache.iotdb.db.exception.query.QueryProcessException;
import org.apache.iotdb.db.metadata.path.PartialPath;
import org.apache.iotdb.db.qp.Planner;
-import org.apache.iotdb.db.qp.executor.IPlanExecutor;
-import org.apache.iotdb.db.qp.executor.PlanExecutor;
import org.apache.iotdb.db.qp.physical.crud.UDTFPlan;
+import org.apache.iotdb.db.query.filter.executor.IPlanExecutor;
+import org.apache.iotdb.db.query.filter.executor.PlanExecutor;
import org.apache.iotdb.db.service.IoTDB;
import org.apache.iotdb.db.utils.EnvironmentUtils;
import org.apache.iotdb.tsfile.exception.filter.QueryFilterOptimizationException;
diff --git a/server/src/test/java/org/apache/iotdb/db/query/dataset/groupby/GroupByFillDataSetTest.java b/server/src/test/java/org/apache/iotdb/db/query/dataset/groupby/GroupByFillDataSetTest.java
index ba6578af53..141673fe59 100644
--- a/server/src/test/java/org/apache/iotdb/db/query/dataset/groupby/GroupByFillDataSetTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/query/dataset/groupby/GroupByFillDataSetTest.java
@@ -21,9 +21,9 @@ package org.apache.iotdb.db.query.dataset.groupby;
import org.apache.iotdb.db.exception.query.QueryProcessException;
import org.apache.iotdb.db.qp.Planner;
-import org.apache.iotdb.db.qp.executor.IPlanExecutor;
-import org.apache.iotdb.db.qp.executor.PlanExecutor;
import org.apache.iotdb.db.qp.physical.crud.QueryPlan;
+import org.apache.iotdb.db.query.filter.executor.IPlanExecutor;
+import org.apache.iotdb.db.query.filter.executor.PlanExecutor;
import org.apache.iotdb.db.service.IoTDB;
import org.apache.iotdb.db.utils.EnvironmentUtils;
import org.apache.iotdb.tsfile.read.query.dataset.QueryDataSet;
diff --git a/server/src/test/java/org/apache/iotdb/db/query/dataset/groupby/GroupByLevelDataSetTest.java b/server/src/test/java/org/apache/iotdb/db/query/dataset/groupby/GroupByLevelDataSetTest.java
index 03cd7c431c..1602a3f207 100644
--- a/server/src/test/java/org/apache/iotdb/db/query/dataset/groupby/GroupByLevelDataSetTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/query/dataset/groupby/GroupByLevelDataSetTest.java
@@ -20,9 +20,9 @@ package org.apache.iotdb.db.query.dataset.groupby;
import org.apache.iotdb.db.exception.query.QueryProcessException;
import org.apache.iotdb.db.qp.Planner;
-import org.apache.iotdb.db.qp.executor.IPlanExecutor;
-import org.apache.iotdb.db.qp.executor.PlanExecutor;
import org.apache.iotdb.db.qp.physical.crud.QueryPlan;
+import org.apache.iotdb.db.query.filter.executor.IPlanExecutor;
+import org.apache.iotdb.db.query.filter.executor.PlanExecutor;
import org.apache.iotdb.db.service.IoTDB;
import org.apache.iotdb.db.utils.EnvironmentUtils;
import org.apache.iotdb.tsfile.read.query.dataset.QueryDataSet;
diff --git a/server/src/test/java/org/apache/iotdb/db/query/dataset/groupby/GroupByTimeDataSetTest.java b/server/src/test/java/org/apache/iotdb/db/query/dataset/groupby/GroupByTimeDataSetTest.java
index 30f3468b8b..05f999139b 100644
--- a/server/src/test/java/org/apache/iotdb/db/query/dataset/groupby/GroupByTimeDataSetTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/query/dataset/groupby/GroupByTimeDataSetTest.java
@@ -20,9 +20,9 @@ package org.apache.iotdb.db.query.dataset.groupby;
import org.apache.iotdb.db.exception.query.QueryProcessException;
import org.apache.iotdb.db.qp.Planner;
-import org.apache.iotdb.db.qp.executor.IPlanExecutor;
-import org.apache.iotdb.db.qp.executor.PlanExecutor;
import org.apache.iotdb.db.qp.physical.crud.QueryPlan;
+import org.apache.iotdb.db.query.filter.executor.IPlanExecutor;
+import org.apache.iotdb.db.query.filter.executor.PlanExecutor;
import org.apache.iotdb.db.service.IoTDB;
import org.apache.iotdb.db.utils.EnvironmentUtils;
import org.apache.iotdb.tsfile.read.query.dataset.QueryDataSet;
diff --git a/server/src/test/java/org/apache/iotdb/db/query/valuefilter/RawQueryWithValueFilterTest.java b/server/src/test/java/org/apache/iotdb/db/query/valuefilter/RawQueryWithValueFilterTest.java
index 522518c5ea..5d7193fc8d 100644
--- a/server/src/test/java/org/apache/iotdb/db/query/valuefilter/RawQueryWithValueFilterTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/query/valuefilter/RawQueryWithValueFilterTest.java
@@ -23,9 +23,9 @@ import org.apache.iotdb.db.conf.IoTDBConfig;
import org.apache.iotdb.db.conf.IoTDBDescriptor;
import org.apache.iotdb.db.exception.query.QueryProcessException;
import org.apache.iotdb.db.qp.Planner;
-import org.apache.iotdb.db.qp.executor.IPlanExecutor;
-import org.apache.iotdb.db.qp.executor.PlanExecutor;
import org.apache.iotdb.db.qp.physical.crud.QueryPlan;
+import org.apache.iotdb.db.query.filter.executor.IPlanExecutor;
+import org.apache.iotdb.db.query.filter.executor.PlanExecutor;
import org.apache.iotdb.db.utils.EnvironmentUtils;
import org.apache.iotdb.tsfile.read.query.dataset.QueryDataSet;
diff --git a/server/src/test/java/org/apache/iotdb/db/utils/TsFileRewriteToolTest.java b/server/src/test/java/org/apache/iotdb/db/utils/TsFileRewriteToolTest.java
index 864b4c0048..b49d9a5f1c 100644
--- a/server/src/test/java/org/apache/iotdb/db/utils/TsFileRewriteToolTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/utils/TsFileRewriteToolTest.java
@@ -29,8 +29,8 @@ import org.apache.iotdb.db.exception.metadata.IllegalPathException;
import org.apache.iotdb.db.exception.query.QueryProcessException;
import org.apache.iotdb.db.metadata.path.PartialPath;
import org.apache.iotdb.db.qp.Planner;
-import org.apache.iotdb.db.qp.executor.IPlanExecutor;
-import org.apache.iotdb.db.qp.executor.PlanExecutor;
+import org.apache.iotdb.db.query.filter.executor.IPlanExecutor;
+import org.apache.iotdb.db.query.filter.executor.PlanExecutor;
import org.apache.iotdb.db.tools.TsFileRewriteTool;
import org.apache.iotdb.tsfile.common.conf.TSFileConfig;
import org.apache.iotdb.tsfile.common.conf.TSFileDescriptor;