You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by lt...@apache.org on 2019/05/08 03:34:10 UTC
[incubator-iotdb] 01/01: refactor iotdb code for cluster
This is an automated email from the ASF dual-hosted git repository.
lta pushed a commit to branch refactor_iotdb_for_cluster
in repository https://gitbox.apache.org/repos/asf/incubator-iotdb.git
commit 25c3d66783f805b9ac4c5daf165a7f9eeb25d823
Author: lta <li...@163.com>
AuthorDate: Wed May 8 11:33:06 2019 +0800
refactor iotdb code for cluster
---
.../db/concurrent/IoTDBThreadPoolFactory.java | 8 +-
.../org/apache/iotdb/db/concurrent/ThreadName.java | 2 +-
.../java/org/apache/iotdb/db/conf/IoTDBConfig.java | 24 ++-
.../org/apache/iotdb/db/conf/IoTDBConstant.java | 6 +-
.../org/apache/iotdb/db/conf/IoTDBDescriptor.java | 7 +-
.../apache/iotdb/db/engine/pool/FlushManager.java | 6 +-
.../apache/iotdb/db/engine/pool/MergeManager.java | 8 +-
.../java/org/apache/iotdb/db/metadata/MGraph.java | 11 +-
.../org/apache/iotdb/db/metadata/MManager.java | 13 --
.../java/org/apache/iotdb/db/metadata/MTree.java | 41 +---
.../org/apache/iotdb/db/metadata/Metadata.java | 121 +++++++++++-
.../org/apache/iotdb/db/qp/QueryProcessor.java | 11 +-
.../db/qp/executor/IQueryProcessExecutor.java | 143 ++++++++++++++
.../iotdb/db/qp/executor/OverflowQPExecutor.java | 3 +-
.../iotdb/db/qp/executor/QueryProcessExecutor.java | 97 +--------
.../org/apache/iotdb/db/qp/logical/Operator.java | 2 +-
.../db/qp/logical/crud/BasicFunctionOperator.java | 3 +-
.../iotdb/db/qp/logical/crud/FilterOperator.java | 5 +-
.../apache/iotdb/db/qp/physical/PhysicalPlan.java | 4 +-
.../iotdb/db/qp/physical/crud/AggregationPlan.java | 1 +
.../iotdb/db/qp/physical/crud/DeletePlan.java | 1 +
.../iotdb/db/qp/physical/crud/FillQueryPlan.java | 1 +
.../iotdb/db/qp/physical/crud/GroupByPlan.java | 1 +
.../iotdb/db/qp/physical/crud/InsertPlan.java | 1 +
.../iotdb/db/qp/physical/crud/QueryPlan.java | 5 +-
.../iotdb/db/qp/physical/crud/UpdatePlan.java | 1 +
.../iotdb/db/qp/physical/sys/AuthorPlan.java | 1 +
.../iotdb/db/qp/physical/sys/LoadDataPlan.java | 1 +
.../iotdb/db/qp/physical/sys/MetadataPlan.java | 1 +
.../iotdb/db/qp/physical/sys/PropertyPlan.java | 1 +
.../{writelog => qp/physical}/transfer/Codec.java | 3 +-
.../physical}/transfer/CodecInstances.java | 5 +-
.../physical}/transfer/PhysicalPlanCodec.java | 14 +-
.../transfer/PhysicalPlanLogTransfer.java | 2 +-
.../physical}/transfer/SystemLogOperator.java | 2 +-
.../iotdb/db/qp/strategy/PhysicalGenerator.java | 6 +-
.../qp/strategy/optimizer/ConcatPathOptimizer.java | 5 +-
.../db/query/aggregation/AggreResultData.java | 2 +-
.../db/query/aggregation/impl/CountAggrFunc.java | 2 +-
.../db/query/aggregation/impl/FirstAggrFunc.java | 2 +-
.../db/query/aggregation/impl/LastAggrFunc.java | 2 +-
.../db/query/aggregation/impl/MaxTimeAggrFunc.java | 2 +-
.../query/aggregation/impl/MaxValueAggrFunc.java | 2 +-
.../db/query/aggregation/impl/MeanAggrFunc.java | 2 +-
.../db/query/aggregation/impl/MinTimeAggrFunc.java | 2 +-
.../query/aggregation/impl/MinValueAggrFunc.java | 2 +-
.../db/query/control/QueryResourceManager.java | 27 ++-
.../query/dataset/AggreResultDataPointReader.java | 2 +-
.../dataset/EngineDataSetWithTimeGenerator.java | 35 +---
.../dataset/EngineDataSetWithoutTimeGenerator.java | 4 +
.../groupby/GroupByEngineDataSet.java | 11 +-
.../groupby/GroupByWithOnlyTimeFilterDataSet.java} | 8 +-
.../groupby/GroupByWithValueFilterDataSet.java} | 8 +-
.../db/query/executor/AggregateEngineExecutor.java | 2 +-
.../EngineExecutorWithoutTimeGenerator.java | 102 +---------
.../iotdb/db/query/executor/EngineQueryRouter.java | 44 ++---
.../executor/ExecutorWithoutTimeGenerator.java | 80 ++++++++
.../db/query/executor/IEngineQueryRouter.java | 78 ++++++++
.../{aggregation => factory}/AggreFuncFactory.java | 3 +-
...nstructor.java => AbstractNodeConstructor.java} | 57 +++---
.../query/timegenerator/EngineNodeConstructor.java | 62 +-----
.../query/timegenerator/EngineTimeGenerator.java | 1 -
.../org/apache/iotdb/db/service/JDBCService.java | 10 +-
.../org/apache/iotdb/db/service/TSServiceImpl.java | 218 +++++++++++++--------
.../iotdb/db/sync/receiver/SyncServiceImpl.java | 2 +-
.../iotdb/db/sync/receiver/SyncServiceManager.java | 5 +-
.../apache/iotdb/db/writelog/io/RAFLogReader.java | 2 +-
.../db/writelog/node/ExclusiveWriteLogNode.java | 2 +-
.../db/concurrent/IoTDBThreadPoolFactoryTest.java | 2 +-
.../iotdb/db/integration/IoTDBSeriesReaderIT.java | 2 +-
.../iotdb/db/metadata/MManagerBasicTest.java | 29 +--
.../org/apache/iotdb/db/metadata/MTreeTest.java | 28 +--
.../org/apache/iotdb/db/metadata/MetadataTest.java | 93 +++++++++
.../org/apache/iotdb/db/qp/QueryProcessorTest.java | 88 ++++++++-
.../transfer/PhysicalPlanLogTransferTest.java | 2 +-
.../org/apache/iotdb/db/qp/plan/QPUpdateTest.java | 2 +-
.../apache/iotdb/db/qp/utils/MemIntQpExecutor.java | 3 +-
.../EngineDataSetWithTimeGeneratorTest.java | 2 +-
.../query/executor/GroupByEngineDataSetTest.java | 14 +-
.../org/apache/iotdb/db/tools/WalCheckerTest.java | 2 +-
.../apache/iotdb/db/utils/EnvironmentUtils.java | 2 +-
.../apache/iotdb/db/writelog/PerformanceTest.java | 3 +-
.../apache/iotdb/db/writelog/WriteLogNodeTest.java | 2 +-
.../iotdb/db/writelog/io/LogWriterReaderTest.java | 3 +-
.../apache/iotdb/tsfile/read/common/BatchData.java | 4 +-
.../org/apache/iotdb/tsfile/read/common/Field.java | 22 +++
.../org/apache/iotdb/tsfile/read/common/Path.java | 4 +-
.../tsfile/read/expression/ExpressionType.java | 2 +-
.../tsfile/read/expression/IBinaryExpression.java | 6 +-
.../iotdb/tsfile/read/expression/IExpression.java | 4 +-
.../read/expression/impl/BinaryExpression.java | 38 +++-
.../read/expression/impl/GlobalTimeExpression.java | 10 +-
.../expression/impl/SingleSeriesExpression.java | 10 +-
.../tsfile/read/filter/basic/BinaryFilter.java | 3 +
.../iotdb/tsfile/read/filter/basic/Filter.java | 2 +
.../tsfile/read/filter/basic/UnaryFilter.java | 3 +
.../tsfile/read/filter/operator/AndFilter.java | 7 +-
.../iotdb/tsfile/read/filter/operator/Eq.java | 6 +
.../iotdb/tsfile/read/filter/operator/Gt.java | 6 +
.../iotdb/tsfile/read/filter/operator/GtEq.java | 6 +
.../iotdb/tsfile/read/filter/operator/Lt.java | 6 +
.../iotdb/tsfile/read/filter/operator/LtEq.java | 6 +
.../iotdb/tsfile/read/filter/operator/NotEq.java | 6 +
.../tsfile/read/filter/operator/NotFilter.java | 5 +
.../tsfile/read/filter/operator/OrFilter.java | 5 +
.../query/dataset/DataSetWithTimeGenerator.java | 32 ---
.../tsfile/read/query/dataset/QueryDataSet.java | 36 ++++
.../java/org/apache/iotdb/tsfile/utils/Pair.java | 5 +-
108 files changed, 1189 insertions(+), 670 deletions(-)
diff --git a/iotdb/src/main/java/org/apache/iotdb/db/concurrent/IoTDBThreadPoolFactory.java b/iotdb/src/main/java/org/apache/iotdb/db/concurrent/IoTDBThreadPoolFactory.java
index f191607..7399903 100644
--- a/iotdb/src/main/java/org/apache/iotdb/db/concurrent/IoTDBThreadPoolFactory.java
+++ b/iotdb/src/main/java/org/apache/iotdb/db/concurrent/IoTDBThreadPoolFactory.java
@@ -110,18 +110,18 @@ public class IoTDBThreadPoolFactory {
}
/**
- * function for creating JDBC client thread pool.
+ * function for creating thrift rpc client thread pool.
*/
- public static ExecutorService createJDBCClientThreadPool(Args args, String poolName) {
+ public static ExecutorService createThriftRpcClientThreadPool(Args args, String poolName) {
SynchronousQueue<Runnable> executorQueue = new SynchronousQueue<>();
return new ThreadPoolExecutor(args.minWorkerThreads, args.maxWorkerThreads, args.stopTimeoutVal,
args.stopTimeoutUnit, executorQueue, new IoTThreadFactory(poolName));
}
/**
- * function for creating JDBC client thread pool.
+ * function for creating thrift rpc client thread pool.
*/
- public static ExecutorService createJDBCClientThreadPool(Args args, String poolName,
+ public static ExecutorService createThriftRpcClientThreadPool(Args args, String poolName,
Thread.UncaughtExceptionHandler handler) {
SynchronousQueue<Runnable> executorQueue = new SynchronousQueue<>();
return new ThreadPoolExecutor(args.minWorkerThreads, args.maxWorkerThreads, args.stopTimeoutVal,
diff --git a/iotdb/src/main/java/org/apache/iotdb/db/concurrent/ThreadName.java b/iotdb/src/main/java/org/apache/iotdb/db/concurrent/ThreadName.java
index 6053da6..a1c3e44 100644
--- a/iotdb/src/main/java/org/apache/iotdb/db/concurrent/ThreadName.java
+++ b/iotdb/src/main/java/org/apache/iotdb/db/concurrent/ThreadName.java
@@ -41,7 +41,7 @@ public enum ThreadName {
private String name;
- private ThreadName(String name) {
+ ThreadName(String name) {
this.name = name;
}
diff --git a/iotdb/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java b/iotdb/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
index 9790f37..4bf5ac8 100644
--- a/iotdb/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
+++ b/iotdb/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
@@ -23,6 +23,7 @@ import java.time.ZoneId;
import java.util.ArrayList;
import java.util.List;
import org.apache.iotdb.db.metadata.MManager;
+import org.apache.iotdb.db.service.TSServiceImpl;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -269,13 +270,18 @@ public class IoTDBConfig {
* data, choose "true". 2. It's more likely not to update historical data or you don't know
* exactly, choose "false".
*/
- private boolean update_historical_data_possibility = false;
+ private boolean updateHistoricalDataPossibility = false;
private String ipWhiteList = "0.0.0.0/0";
/**
* Examining period of cache file reader : 100 seconds.
*/
private long cacheFileReaderClearPeriod = 100000;
+ /**
+ * Replace implementation class of JDBC service
+ */
+ private String rpcImplClassName = TSServiceImpl.class.getName();
+
public IoTDBConfig() {
// empty constructor
}
@@ -792,12 +798,12 @@ public class IoTDBConfig {
this.languageVersion = languageVersion;
}
- public boolean isUpdate_historical_data_possibility() {
- return update_historical_data_possibility;
+ public boolean isUpdateHistoricalDataPossibility() {
+ return updateHistoricalDataPossibility;
}
- public void setUpdate_historical_data_possibility(boolean update_historical_data_possibility) {
- this.update_historical_data_possibility = update_historical_data_possibility;
+ public void setUpdateHistoricalDataPossibility(boolean updateHistoricalDataPossibility) {
+ this.updateHistoricalDataPossibility = updateHistoricalDataPossibility;
}
public String getIpWhiteList() {
@@ -815,4 +821,12 @@ public class IoTDBConfig {
public void setCacheFileReaderClearPeriod(long cacheFileReaderClearPeriod) {
this.cacheFileReaderClearPeriod = cacheFileReaderClearPeriod;
}
+
+ public String getRpcImplClassName() {
+ return rpcImplClassName;
+ }
+
+ public void setRpcImplClassName(String rpcImplClassName) {
+ this.rpcImplClassName = rpcImplClassName;
+ }
}
diff --git a/iotdb/src/main/java/org/apache/iotdb/db/conf/IoTDBConstant.java b/iotdb/src/main/java/org/apache/iotdb/db/conf/IoTDBConstant.java
index 5135e79..48e927f 100644
--- a/iotdb/src/main/java/org/apache/iotdb/db/conf/IoTDBConstant.java
+++ b/iotdb/src/main/java/org/apache/iotdb/db/conf/IoTDBConstant.java
@@ -20,7 +20,8 @@ package org.apache.iotdb.db.conf;
public class IoTDBConstant {
- private IoTDBConstant() {}
+ private IoTDBConstant() {
+ }
public static final String ENV_FILE_NAME = "iotdb-env";
public static final String IOTDB_CONF = "IOTDB_CONF";
@@ -55,4 +56,7 @@ public class IoTDBConstant {
public static final String MAX_TIME = "max_time";
public static final String MIN_TIME = "min_time";
public static final int MIN_SUPPORTED_JDK_VERSION = 8;
+
+ // for cluster, set read consistency level
+ public static final String SET_READ_CONSISTENCY_LEVEL_PATTERN = "set\\s+read.*level.*";
}
diff --git a/iotdb/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java b/iotdb/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
index 7ab5215..38892fd 100644
--- a/iotdb/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
+++ b/iotdb/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
@@ -51,6 +51,7 @@ public class IoTDBDescriptor {
*/
private void loadProps() {
InputStream inputStream;
+
String url = System.getProperty(IoTDBConstant.IOTDB_CONF, null);
if (url == null) {
url = System.getProperty(IoTDBConstant.IOTDB_HOME, null);
@@ -112,10 +113,10 @@ public class IoTDBDescriptor {
conf.setRpcAddress(properties.getProperty("rpc_address", conf.getRpcAddress()));
conf.setRpcPort(Integer.parseInt(properties.getProperty("rpc_port",
- Integer.toString(conf.getRpcPort()))));
+ Integer.toString(conf.getRpcPort()))));
conf.setEnableWal(Boolean.parseBoolean(properties.getProperty("enable_wal",
- Boolean.toString(conf.isEnableWal()))));
+ Boolean.toString(conf.isEnableWal()))));
conf.setFlushWalThreshold(Integer
.parseInt(properties.getProperty("flush_wal_threshold",
@@ -196,7 +197,7 @@ public class IoTDBDescriptor {
conf.setSyncServerPort(Integer
.parseInt(properties.getProperty("sync_server_port",
Integer.toString(conf.getSyncServerPort())).trim()));
- conf.setUpdate_historical_data_possibility(Boolean.parseBoolean(
+ conf.setUpdateHistoricalDataPossibility(Boolean.parseBoolean(
properties.getProperty("update_historical_data_possibility",
Boolean.toString(conf.isSyncEnable()))));
conf.setIpWhiteList(properties.getProperty("IP_white_list", conf.getIpWhiteList()));
diff --git a/iotdb/src/main/java/org/apache/iotdb/db/engine/pool/FlushManager.java b/iotdb/src/main/java/org/apache/iotdb/db/engine/pool/FlushManager.java
index 7c996aa..86feaf0 100644
--- a/iotdb/src/main/java/org/apache/iotdb/db/engine/pool/FlushManager.java
+++ b/iotdb/src/main/java/org/apache/iotdb/db/engine/pool/FlushManager.java
@@ -94,16 +94,16 @@ public class FlushManager {
*
* @param block
* if set to true, this method will wait for timeOut milliseconds.
- * @param timeOut
+ * @param timeout
* block time out in milliseconds.
* @throws ProcessorException
* if timeOut is reached or being interrupted while waiting to exit.
*/
- public void close(boolean block, long timeOut) throws ProcessorException {
+ public void close(boolean block, long timeout) throws ProcessorException {
pool.shutdown();
if (block) {
try {
- if (!pool.awaitTermination(timeOut, TimeUnit.MILLISECONDS)) {
+ if (!pool.awaitTermination(timeout, TimeUnit.MILLISECONDS)) {
throw new ProcessorException("Flush thread pool doesn't exit after "
+ EXIT_WAIT_TIME + " ms");
}
diff --git a/iotdb/src/main/java/org/apache/iotdb/db/engine/pool/MergeManager.java b/iotdb/src/main/java/org/apache/iotdb/db/engine/pool/MergeManager.java
index 75190f2..44874bb 100644
--- a/iotdb/src/main/java/org/apache/iotdb/db/engine/pool/MergeManager.java
+++ b/iotdb/src/main/java/org/apache/iotdb/db/engine/pool/MergeManager.java
@@ -62,16 +62,16 @@ public class MergeManager {
*
* @param block if set block to true, this method will wait for timeOut milliseconds to close the
* merge pool. false, return directly.
- * @param timeOut block time out in milliseconds.
+ * @param timeout block time out in milliseconds.
* @throws ProcessorException if timeOut reach or interrupted while waiting to exit.
*/
- public void forceClose(boolean block, long timeOut) throws ProcessorException {
+ public void forceClose(boolean block, long timeout) throws ProcessorException {
pool.shutdownNow();
if (block) {
try {
- if (!pool.awaitTermination(timeOut, TimeUnit.MILLISECONDS)) {
+ if (!pool.awaitTermination(timeout, TimeUnit.MILLISECONDS)) {
throw new ProcessorException(
- "Merge thread pool doesn't exit after " + timeOut + " ms");
+ "Merge thread pool doesn't exit after " + timeout + " ms");
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
diff --git a/iotdb/src/main/java/org/apache/iotdb/db/metadata/MGraph.java b/iotdb/src/main/java/org/apache/iotdb/db/metadata/MGraph.java
index cc6e1cf..4c887af 100644
--- a/iotdb/src/main/java/org/apache/iotdb/db/metadata/MGraph.java
+++ b/iotdb/src/main/java/org/apache/iotdb/db/metadata/MGraph.java
@@ -41,6 +41,7 @@ public class MGraph implements Serializable {
private static final long serialVersionUID = 8214849219614352834L;
private static final String DOUB_SEPARATOR = "\\.";
private static final String TIME_SERIES_INCORRECT = "Timeseries's root is not Correct. RootName: ";
+ public static final String TIME_SERIES_TREE_HEADER = "=== Timeseries Tree ===\n\n";
private MTree mtree;
private HashMap<String, PTree> ptreeMap;
@@ -169,16 +170,6 @@ public class MGraph implements Serializable {
}
/**
- * Check whether the storage group of the input path exists or not
- *
- * @param path Format: root.node.(node)*
- * @apiNote :for cluster
- */
- public boolean checkStorageExistOfPath(String path) {
- return mtree.checkStorageExistOfPath(path);
- }
-
- /**
* Get all paths for given seriesPath regular expression if given seriesPath belongs to MTree, or
* get all linked seriesPath for given seriesPath if given seriesPath belongs to PTree Notice:
* Regular expression in this method is formed by the amalgamation of seriesPath and the character
diff --git a/iotdb/src/main/java/org/apache/iotdb/db/metadata/MManager.java b/iotdb/src/main/java/org/apache/iotdb/db/metadata/MManager.java
index ea4ec6f..c45a3f0 100644
--- a/iotdb/src/main/java/org/apache/iotdb/db/metadata/MManager.java
+++ b/iotdb/src/main/java/org/apache/iotdb/db/metadata/MManager.java
@@ -357,19 +357,6 @@ public class MManager {
}
/**
- * function for checking if the storage group of given path exists in mTree or not.
- * @apiNote :for cluster
- */
- public boolean checkStorageExistOfPath(String path) {
- lock.readLock().lock();
- try {
- return mgraph.checkStorageExistOfPath(path);
- } finally {
- lock.readLock().unlock();
- }
- }
-
- /**
* function for adding a pTree.
*/
public void addAPTree(String ptreeRootName) throws IOException, MetadataArgsErrorException {
diff --git a/iotdb/src/main/java/org/apache/iotdb/db/metadata/MTree.java b/iotdb/src/main/java/org/apache/iotdb/db/metadata/MTree.java
index f5bf6a2..37ddce9 100644
--- a/iotdb/src/main/java/org/apache/iotdb/db/metadata/MTree.java
+++ b/iotdb/src/main/java/org/apache/iotdb/db/metadata/MTree.java
@@ -52,6 +52,15 @@ public class MTree implements Serializable {
private static final String NOT_SERIES_PATH = "The prefix of the seriesPath %s is not one storage group seriesPath";
private MNode root;
+ private static final Set<String> TIMESERIES_METADATA_NAMESET = new HashSet<>();
+ static {
+ TIMESERIES_METADATA_NAMESET.add("DataType");
+ TIMESERIES_METADATA_NAMESET.add("Encoding");
+ TIMESERIES_METADATA_NAMESET.add("Compressor");
+ TIMESERIES_METADATA_NAMESET.add("args");
+ TIMESERIES_METADATA_NAMESET.add("StorageGroup");
+ }
+
public MTree(String rootName) {
this.root = new MNode(rootName, null, false);
}
@@ -248,38 +257,6 @@ public class MTree implements Serializable {
}
/**
- * Check whether the storage group of the path exists or not
- * @param path input path
- * @return If it's storage group exists, return true. Else return false
- * @apiNote :for cluster
- */
- public boolean checkStorageExistOfPath(String path) {
- String[] nodeNames = path.split(DOUB_SEPARATOR);
- MNode cur = root;
- if (nodeNames.length <= 1 || !nodeNames[0].equals(root.getName())) {
- return false;
- }
- int i = 1;
- while (i < nodeNames.length - 1) {
- MNode temp = cur.getChild(nodeNames[i]);
- if (temp == null) {
- return false;
- }
- if(temp.isStorageLevel()){
- return true;
- }
- cur = cur.getChild(nodeNames[i]);
- i++;
- }
- MNode temp = cur.getChild(nodeNames[i]);
- if(temp != null && temp.isStorageLevel()) {
- return true;
- } else {
- return false;
- }
- }
-
- /**
* Check whether set file seriesPath for this node or not. If not, throw an exception
*/
private void checkStorageGroup(MNode node) throws PathErrorException {
diff --git a/iotdb/src/main/java/org/apache/iotdb/db/metadata/Metadata.java b/iotdb/src/main/java/org/apache/iotdb/db/metadata/Metadata.java
index 40fd0e4..5c56fe1 100644
--- a/iotdb/src/main/java/org/apache/iotdb/db/metadata/Metadata.java
+++ b/iotdb/src/main/java/org/apache/iotdb/db/metadata/Metadata.java
@@ -18,15 +18,21 @@
*/
package org.apache.iotdb.db.metadata;
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
import java.util.List;
import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Set;
import org.apache.iotdb.db.exception.PathErrorException;
import org.apache.iotdb.tsfile.write.schema.MeasurementSchema;
/**
* This class stores all the metadata info for every deviceId and every timeseries.
*/
-public class Metadata {
+public class Metadata implements Serializable {
private Map<String, List<MeasurementSchema>> seriesMap;
private Map<String, List<String>> deviceIdMap;
@@ -67,9 +73,122 @@ public class Metadata {
return deviceIdMap;
}
+ /**
+ * combine multiple metadatas
+ */
+ public static Metadata combineMetadatas(Metadata[] metadatas) {
+ Map<String, List<MeasurementSchema>> seriesMap = new HashMap<>();
+ Map<String, List<String>> deviceIdMap = new HashMap<>();
+ Map<String, Map<String, MeasurementSchema>> typeSchemaMap = new HashMap<>();
+
+ if (metadatas == null || metadatas.length == 0) {
+ return new Metadata(seriesMap, deviceIdMap);
+ }
+
+ for (int i = 0; i < metadatas.length; i++) {
+ Map<String, List<MeasurementSchema>> subSeriesMap = metadatas[i].seriesMap;
+ for (Entry<String, List<MeasurementSchema>> entry : subSeriesMap.entrySet()) {
+ Map<String, MeasurementSchema> map;
+ if (typeSchemaMap.containsKey(entry.getKey())) {
+ map = typeSchemaMap.get(entry.getKey());
+ } else {
+ map = new HashMap<>();
+ }
+ entry.getValue().forEach(schema -> map.put(schema.getMeasurementId(), schema));
+ if (!typeSchemaMap.containsKey(entry.getKey())) {
+ typeSchemaMap.put(entry.getKey(), map);
+ }
+ }
+
+ Map<String, List<String>> subDeviceIdMap = metadatas[i].deviceIdMap;
+ for (Entry<String, List<String>> entry : subDeviceIdMap.entrySet()) {
+ List<String> list;
+ if (deviceIdMap.containsKey(entry.getKey())) {
+ list = deviceIdMap.get(entry.getKey());
+ } else {
+ list = new ArrayList<>();
+ }
+ list.addAll(entry.getValue());
+ if (!deviceIdMap.containsKey(entry.getKey())) {
+ deviceIdMap.put(entry.getKey(), list);
+ }
+ }
+ }
+
+ for (Entry<String, Map<String, MeasurementSchema>> entry : typeSchemaMap.entrySet()) {
+ List<MeasurementSchema> list = new ArrayList<>();
+ list.addAll(entry.getValue().values());
+ seriesMap.put(entry.getKey(), list);
+ }
+
+ return new Metadata(seriesMap, deviceIdMap);
+ }
+
@Override
public String toString() {
return seriesMap.toString() + "\n" + deviceIdMap.toString();
}
+ @Override
+ public boolean equals(Object obj) {
+ if(this == obj){
+ return true;
+ }
+ if(obj == null){
+ return false;
+ }
+ if(this.getClass() != obj.getClass()){
+ return false;
+ }
+
+ Metadata metadata = (Metadata) obj;
+ return seriesMapEquals(seriesMap, metadata.seriesMap) && deviceIdMapEquals(deviceIdMap, metadata.deviceIdMap);
+ }
+
+ /**
+ * only used to check if seriesMap is equal to another seriesMap
+ */
+ private boolean seriesMapEquals(Map<String, List<MeasurementSchema>> map1, Map<String, List<MeasurementSchema>> map2) {
+ if (!map1.keySet().equals(map2.keySet())) {
+ return false;
+ }
+
+ for (Entry<String, List<MeasurementSchema>> entry : map1.entrySet()) {
+ List list1 = entry.getValue();
+ List list2 = map2.get(entry.getKey());
+
+ if (!listEquals(list1, list2)) {
+ return false;
+ }
+ }
+ return true;
+ }
+
+ /**
+ * only used to check if deviceIdMap is equal to another deviceIdMap
+ */
+ private boolean deviceIdMapEquals(Map<String, List<String>> map1, Map<String, List<String>> map2) {
+ if (!map1.keySet().equals(map2.keySet())) {
+ return false;
+ }
+
+ for (Entry<String, List<String>> entry : map1.entrySet()) {
+ List list1 = entry.getValue();
+ List list2 = map2.get(entry.getKey());
+
+ if (!listEquals(list1, list2)) {
+ return false;
+ }
+ }
+ return true;
+ }
+
+ private boolean listEquals(List list1, List list2) {
+ Set set1 = new HashSet();
+ set1.addAll(list1);
+ Set set2 = new HashSet();
+ set2.addAll(list2);
+
+ return set1.equals(set2);
+ }
}
diff --git a/iotdb/src/main/java/org/apache/iotdb/db/qp/QueryProcessor.java b/iotdb/src/main/java/org/apache/iotdb/db/qp/QueryProcessor.java
index c64c71a..f63514d 100644
--- a/iotdb/src/main/java/org/apache/iotdb/db/qp/QueryProcessor.java
+++ b/iotdb/src/main/java/org/apache/iotdb/db/qp/QueryProcessor.java
@@ -27,6 +27,7 @@ import org.apache.iotdb.db.exception.qp.IllegalASTFormatException;
import org.apache.iotdb.db.exception.qp.LogicalOperatorException;
import org.apache.iotdb.db.exception.qp.LogicalOptimizeException;
import org.apache.iotdb.db.exception.qp.QueryProcessorException;
+import org.apache.iotdb.db.qp.executor.IQueryProcessExecutor;
import org.apache.iotdb.db.qp.executor.QueryProcessExecutor;
import org.apache.iotdb.db.qp.logical.Operator;
import org.apache.iotdb.db.qp.logical.RootOperator;
@@ -49,13 +50,13 @@ import org.apache.iotdb.db.sql.parse.ParseUtils;
*/
public class QueryProcessor {
- private QueryProcessExecutor executor;
+ private IQueryProcessExecutor executor;
- public QueryProcessor(QueryProcessExecutor executor) {
+ public QueryProcessor(IQueryProcessExecutor executor) {
this.executor = executor;
}
- public QueryProcessExecutor getExecutor() {
+ public IQueryProcessExecutor getExecutor() {
return executor;
}
@@ -122,7 +123,7 @@ public class QueryProcessor {
* @throws LogicalOptimizeException
* exception in logical optimizing
*/
- private Operator logicalOptimize(Operator operator, QueryProcessExecutor executor)
+ private Operator logicalOptimize(Operator operator, IQueryProcessExecutor executor)
throws LogicalOperatorException {
switch (operator.getType()) {
case AUTHOR:
@@ -156,7 +157,7 @@ public class QueryProcessor {
* @throws LogicalOptimizeException
* exception in SFW optimizing
*/
- private SFWOperator optimizeSFWOperator(SFWOperator root, QueryProcessExecutor executor)
+ private SFWOperator optimizeSFWOperator(SFWOperator root, IQueryProcessExecutor executor)
throws LogicalOperatorException {
ConcatPathOptimizer concatPathOptimizer = new ConcatPathOptimizer(executor);
root = (SFWOperator) concatPathOptimizer.transform(root);
diff --git a/iotdb/src/main/java/org/apache/iotdb/db/qp/executor/IQueryProcessExecutor.java b/iotdb/src/main/java/org/apache/iotdb/db/qp/executor/IQueryProcessExecutor.java
new file mode 100644
index 0000000..15bcb5a
--- /dev/null
+++ b/iotdb/src/main/java/org/apache/iotdb/db/qp/executor/IQueryProcessExecutor.java
@@ -0,0 +1,143 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.db.qp.executor;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.Map;
+import org.apache.iotdb.db.exception.FileNodeManagerException;
+import org.apache.iotdb.db.exception.PathErrorException;
+import org.apache.iotdb.db.exception.ProcessorException;
+import org.apache.iotdb.db.qp.physical.PhysicalPlan;
+import org.apache.iotdb.db.qp.physical.crud.QueryPlan;
+import org.apache.iotdb.db.query.context.QueryContext;
+import org.apache.iotdb.db.query.executor.IEngineQueryRouter;
+import org.apache.iotdb.db.query.fill.IFill;
+import org.apache.iotdb.tsfile.exception.filter.QueryFilterOptimizationException;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+import org.apache.iotdb.tsfile.read.common.Path;
+import org.apache.iotdb.tsfile.read.expression.IExpression;
+import org.apache.iotdb.tsfile.read.query.dataset.QueryDataSet;
+import org.apache.iotdb.tsfile.utils.Pair;
+
+public interface IQueryProcessExecutor {
+
+ boolean processNonQuery(PhysicalPlan plan) throws ProcessorException;
+
+ /**
+ * process query plan of qp layer, construct queryDataSet.
+ *
+ * @param queryPlan QueryPlan
+ * @return QueryDataSet
+ */
+ QueryDataSet processQuery(QueryPlan queryPlan, QueryContext context)
+ throws IOException, FileNodeManagerException, PathErrorException,
+ QueryFilterOptimizationException, ProcessorException;
+
+ /**
+ * process aggregate plan of qp layer, construct queryDataSet.
+ *
+ */
+ QueryDataSet aggregate(List<Path> paths, List<String> aggres, IExpression expression,
+ QueryContext context)
+ throws ProcessorException, IOException, PathErrorException, FileNodeManagerException, QueryFilterOptimizationException;
+
+ /**
+ * process group by plan of qp layer, construct queryDataSet.
+ */
+ QueryDataSet groupBy(List<Path> paths, List<String> aggres, IExpression expression,
+ long unit, long origin, List<Pair<Long, Long>> intervals, QueryContext context)
+ throws ProcessorException, IOException, PathErrorException, FileNodeManagerException, QueryFilterOptimizationException;
+
+ /**
+ * process fill plan of qp layer, construct queryDataSet.
+ */
+ QueryDataSet fill(List<Path> fillPaths, long queryTime, Map<TSDataType, IFill> fillTypes,
+ QueryContext context)
+ throws ProcessorException, IOException, PathErrorException, FileNodeManagerException;
+
+ /**
+ * execute update command and return whether the operator is successful.
+ *
+ * @param path : update series seriesPath
+ * @param startTime start time in update command
+ * @param endTime end time in update command
+ * @param value - in type of string
+ * @return - whether the operator is successful.
+ */
+ boolean update(Path path, long startTime, long endTime, String value)
+ throws ProcessorException;
+
+ /**
+ * execute delete command and return whether the operator is successful.
+ *
+ * @param paths : delete series paths
+ * @param deleteTime end time in delete command
+ * @return - whether the operator is successful.
+ */
+ boolean delete(List<Path> paths, long deleteTime) throws ProcessorException;
+
+ /**
+ * execute delete command and return whether the operator is successful.
+ *
+ * @param path : delete series seriesPath
+ * @param deleteTime end time in delete command
+ * @return - whether the operator is successful.
+ */
+ boolean delete(Path path, long deleteTime) throws ProcessorException;
+
+ /**
+ * insert a single value. Only used in test
+ *
+ * @param path seriesPath to be inserted
+ * @param insertTime - it's time point but not a range
+ * @param value value to be inserted
+ * @return - Operate Type.
+ */
+ int insert(Path path, long insertTime, String value) throws ProcessorException;
+
+ /**
+ * execute insert command and return whether the operator is successful.
+ *
+ * @param deviceId deviceId to be inserted
+ * @param insertTime - it's time point but not a range
+ * @param measurementList measurements to be inserted
+ * @param insertValues values to be inserted
+ * @return - Operate Type.
+ */
+ int multiInsert(String deviceId, long insertTime, String[] measurementList,
+ String[] insertValues) throws ProcessorException;
+
+ boolean judgePathExists(Path fullPath);
+
+ /**
+ * Get data type of series
+ */
+ TSDataType getSeriesType(Path path) throws PathErrorException;
+
+ /**
+ * Get all paths of a full path
+ */
+ List<String> getAllPaths(String originPath) throws PathErrorException;
+
+ int getFetchSize();
+
+ void setFetchSize(int fetchSize);
+
+}
diff --git a/iotdb/src/main/java/org/apache/iotdb/db/qp/executor/OverflowQPExecutor.java b/iotdb/src/main/java/org/apache/iotdb/db/qp/executor/OverflowQPExecutor.java
index b80c393..9ca337e 100644
--- a/iotdb/src/main/java/org/apache/iotdb/db/qp/executor/OverflowQPExecutor.java
+++ b/iotdb/src/main/java/org/apache/iotdb/db/qp/executor/OverflowQPExecutor.java
@@ -52,6 +52,7 @@ import org.apache.iotdb.db.qp.physical.sys.LoadDataPlan;
import org.apache.iotdb.db.qp.physical.sys.MetadataPlan;
import org.apache.iotdb.db.qp.physical.sys.PropertyPlan;
import org.apache.iotdb.db.query.context.QueryContext;
+import org.apache.iotdb.db.query.executor.EngineQueryRouter;
import org.apache.iotdb.db.query.fill.IFill;
import org.apache.iotdb.db.utils.AuthUtils;
import org.apache.iotdb.db.utils.LoadDataUtils;
@@ -226,7 +227,7 @@ public class OverflowQPExecutor extends QueryProcessExecutor {
}
@Override
- protected boolean delete(Path path, long timestamp) throws ProcessorException {
+ public boolean delete(Path path, long timestamp) throws ProcessorException {
String deviceId = path.getDevice();
String measurementId = path.getMeasurement();
try {
diff --git a/iotdb/src/main/java/org/apache/iotdb/db/qp/executor/QueryProcessExecutor.java b/iotdb/src/main/java/org/apache/iotdb/db/qp/executor/QueryProcessExecutor.java
index 376997e..99476f2 100644
--- a/iotdb/src/main/java/org/apache/iotdb/db/qp/executor/QueryProcessExecutor.java
+++ b/iotdb/src/main/java/org/apache/iotdb/db/qp/executor/QueryProcessExecutor.java
@@ -22,7 +22,6 @@ import java.io.IOException;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
-import java.util.Map;
import java.util.Set;
import org.apache.iotdb.db.exception.FileNodeManagerException;
import org.apache.iotdb.db.exception.PathErrorException;
@@ -35,28 +34,18 @@ import org.apache.iotdb.db.qp.physical.crud.GroupByPlan;
import org.apache.iotdb.db.qp.physical.crud.QueryPlan;
import org.apache.iotdb.db.query.context.QueryContext;
import org.apache.iotdb.db.query.executor.EngineQueryRouter;
-import org.apache.iotdb.db.query.fill.IFill;
+import org.apache.iotdb.db.query.executor.IEngineQueryRouter;
import org.apache.iotdb.tsfile.exception.filter.QueryFilterOptimizationException;
-import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
import org.apache.iotdb.tsfile.read.common.Path;
-import org.apache.iotdb.tsfile.read.expression.IExpression;
import org.apache.iotdb.tsfile.read.expression.QueryExpression;
import org.apache.iotdb.tsfile.read.query.dataset.QueryDataSet;
-import org.apache.iotdb.tsfile.utils.Pair;
-public abstract class QueryProcessExecutor {
+public abstract class QueryProcessExecutor implements IQueryProcessExecutor {
protected ThreadLocal<Integer> fetchSize = new ThreadLocal<>();
- protected EngineQueryRouter queryRouter = new EngineQueryRouter();
+ protected IEngineQueryRouter queryRouter = new EngineQueryRouter();
- public QueryProcessExecutor() {
- }
-
- /**
- * process query plan of qp layer, construct queryDataSet.
- * @param queryPlan QueryPlan
- * @return QueryDataSet
- */
+ @Override
public QueryDataSet processQuery(QueryPlan queryPlan, QueryContext context)
throws IOException, FileNodeManagerException, PathErrorException,
QueryFilterOptimizationException, ProcessorException {
@@ -72,7 +61,7 @@ public abstract class QueryProcessExecutor {
if (queryPlan instanceof AggregationPlan) {
return aggregate(queryPlan.getPaths(), queryPlan.getAggregations(),
- ((AggregationPlan) queryPlan).getExpression(), context);
+ queryPlan.getExpression(), context);
}
if (queryPlan instanceof FillQueryPlan) {
@@ -83,14 +72,7 @@ public abstract class QueryProcessExecutor {
return queryRouter.query(queryExpression, context);
}
- public abstract TSDataType getSeriesType(Path fullPath) throws PathErrorException;
-
- public abstract boolean judgePathExists(Path fullPath);
-
- public boolean processNonQuery(PhysicalPlan plan) throws ProcessorException {
- throw new UnsupportedOperationException();
- }
-
+ @Override
public int getFetchSize() {
if (fetchSize.get() == null) {
return 100;
@@ -98,42 +80,12 @@ public abstract class QueryProcessExecutor {
return fetchSize.get();
}
+ @Override
public void setFetchSize(int fetchSize) {
this.fetchSize.set(fetchSize);
}
- public abstract QueryDataSet aggregate(List<Path> paths, List<String> aggres,
- IExpression expression, QueryContext context) throws ProcessorException, IOException,
- PathErrorException, FileNodeManagerException, QueryFilterOptimizationException;
-
- public abstract QueryDataSet groupBy(List<Path> paths, List<String> aggres,
- IExpression expression, long unit, long origin, List<Pair<Long, Long>> intervals,
- QueryContext context) throws ProcessorException, IOException, PathErrorException,
- FileNodeManagerException, QueryFilterOptimizationException;
-
- public abstract QueryDataSet fill(List<Path> fillPaths, long queryTime, Map<TSDataType,
- IFill> fillTypes, QueryContext context)
- throws ProcessorException, IOException, PathErrorException, FileNodeManagerException;
-
- /**
- * executeWithGlobalTimeFilter update command and return whether the operator is successful.
- *
- * @param path : update series seriesPath
- * @param startTime start time in update command
- * @param endTime end time in update command
- * @param value - in type of string
- * @return - whether the operator is successful.
- */
- public abstract boolean update(Path path, long startTime, long endTime, String value)
- throws ProcessorException;
-
- /**
- * executeWithGlobalTimeFilter delete command and return whether the operator is successful.
- *
- * @param paths : delete series paths
- * @param deleteTime end time in delete command
- * @return - whether the operator is successful.
- */
+ @Override
public boolean delete(List<Path> paths, long deleteTime) throws ProcessorException {
try {
boolean result = true;
@@ -162,37 +114,4 @@ public abstract class QueryProcessExecutor {
}
}
- /**
- * executeWithGlobalTimeFilter delete command and return whether the operator is successful.
- *
- * @param path : delete series seriesPath
- * @param deleteTime end time in delete command
- * @return - whether the operator is successful.
- */
- protected abstract boolean delete(Path path, long deleteTime) throws ProcessorException;
-
- /**
- * insert a single value. Only used in test
- *
- * @param path seriesPath to be inserted
- * @param insertTime - it's time point but not a range
- * @param value value to be inserted
- * @return - Operate Type.
- */
- public abstract int insert(Path path, long insertTime, String value) throws ProcessorException;
-
- /**
- * executeWithGlobalTimeFilter insert command and return whether the operator is successful.
- *
- * @param deviceId deviceId to be inserted
- * @param insertTime - it's time point but not a range
- * @param measurementList measurements to be inserted
- * @param insertValues values to be inserted
- * @return - Operate Type.
- */
- public abstract int multiInsert(String deviceId, long insertTime, String[] measurementList,
- String[] insertValues) throws ProcessorException;
-
- public abstract List<String> getAllPaths(String originPath) throws PathErrorException;
-
}
diff --git a/iotdb/src/main/java/org/apache/iotdb/db/qp/logical/Operator.java b/iotdb/src/main/java/org/apache/iotdb/db/qp/logical/Operator.java
index c7a9eb7..498b429 100644
--- a/iotdb/src/main/java/org/apache/iotdb/db/qp/logical/Operator.java
+++ b/iotdb/src/main/java/org/apache/iotdb/db/qp/logical/Operator.java
@@ -72,6 +72,6 @@ public abstract class Operator {
SET_STORAGE_GROUP, CREATE_TIMESERIES, DELETE_TIMESERIES, CREATE_USER, DELETE_USER, MODIFY_PASSWORD,
GRANT_USER_PRIVILEGE, REVOKE_USER_PRIVILEGE, GRANT_USER_ROLE, REVOKE_USER_ROLE, CREATE_ROLE,
DELETE_ROLE, GRANT_ROLE_PRIVILEGE, REVOKE_ROLE_PRIVILEGE, LIST_USER, LIST_ROLE,
- LIST_USER_PRIVILEGE, LIST_ROLE_PRIVILEGE, LIST_USER_ROLES, LIST_ROLE_USERS;
+ LIST_USER_PRIVILEGE, LIST_ROLE_PRIVILEGE, LIST_USER_ROLES, LIST_ROLE_USERS
}
}
diff --git a/iotdb/src/main/java/org/apache/iotdb/db/qp/logical/crud/BasicFunctionOperator.java b/iotdb/src/main/java/org/apache/iotdb/db/qp/logical/crud/BasicFunctionOperator.java
index 1472e54..1610554 100644
--- a/iotdb/src/main/java/org/apache/iotdb/db/qp/logical/crud/BasicFunctionOperator.java
+++ b/iotdb/src/main/java/org/apache/iotdb/db/qp/logical/crud/BasicFunctionOperator.java
@@ -21,6 +21,7 @@ package org.apache.iotdb.db.qp.logical.crud;
import org.apache.iotdb.db.exception.PathErrorException;
import org.apache.iotdb.db.exception.qp.LogicalOperatorException;
import org.apache.iotdb.db.qp.constant.SQLConstant;
+import org.apache.iotdb.db.qp.executor.IQueryProcessExecutor;
import org.apache.iotdb.db.qp.executor.QueryProcessExecutor;
import org.apache.iotdb.db.qp.logical.Operator;
import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
@@ -97,7 +98,7 @@ public class BasicFunctionOperator extends FunctionOperator {
@Override
protected Pair<IUnaryExpression, String> transformToSingleQueryFilter(
- QueryProcessExecutor executor)
+ IQueryProcessExecutor executor)
throws LogicalOperatorException, PathErrorException {
TSDataType type = executor.getSeriesType(path);
if (type == null) {
diff --git a/iotdb/src/main/java/org/apache/iotdb/db/qp/logical/crud/FilterOperator.java b/iotdb/src/main/java/org/apache/iotdb/db/qp/logical/crud/FilterOperator.java
index a6c032b..1f38654 100644
--- a/iotdb/src/main/java/org/apache/iotdb/db/qp/logical/crud/FilterOperator.java
+++ b/iotdb/src/main/java/org/apache/iotdb/db/qp/logical/crud/FilterOperator.java
@@ -26,6 +26,7 @@ import java.util.List;
import org.apache.iotdb.db.exception.qp.LogicalOperatorException;
import org.apache.iotdb.db.exception.qp.QueryProcessorException;
import org.apache.iotdb.db.qp.constant.SQLConstant;
+import org.apache.iotdb.db.qp.executor.IQueryProcessExecutor;
import org.apache.iotdb.db.qp.executor.QueryProcessExecutor;
import org.apache.iotdb.db.qp.logical.Operator;
import org.apache.iotdb.tsfile.read.common.Path;
@@ -111,7 +112,7 @@ public class FilterOperator extends Operator implements Comparable<FilterOperato
*
* @return QueryFilter in TsFile
*/
- public IExpression transformToExpression(QueryProcessExecutor executor)
+ public IExpression transformToExpression(IQueryProcessExecutor executor)
throws QueryProcessorException {
if (isSingle) {
Pair<IUnaryExpression, String> ret = transformToSingleQueryFilter(executor);
@@ -149,7 +150,7 @@ public class FilterOperator extends Operator implements Comparable<FilterOperato
* @throws QueryProcessorException exception in filter transforming
*/
protected Pair<IUnaryExpression, String> transformToSingleQueryFilter(
- QueryProcessExecutor executor)
+ IQueryProcessExecutor executor)
throws QueryProcessorException {
if (childOperators.isEmpty()) {
throw new LogicalOperatorException(
diff --git a/iotdb/src/main/java/org/apache/iotdb/db/qp/physical/PhysicalPlan.java b/iotdb/src/main/java/org/apache/iotdb/db/qp/physical/PhysicalPlan.java
index 77dd6e4..d2f7bb9 100644
--- a/iotdb/src/main/java/org/apache/iotdb/db/qp/physical/PhysicalPlan.java
+++ b/iotdb/src/main/java/org/apache/iotdb/db/qp/physical/PhysicalPlan.java
@@ -18,6 +18,7 @@
*/
package org.apache.iotdb.db.qp.physical;
+import java.io.Serializable;
import java.util.List;
import org.apache.iotdb.db.qp.logical.Operator;
import org.apache.iotdb.tsfile.read.common.Path;
@@ -25,8 +26,9 @@ import org.apache.iotdb.tsfile.read.common.Path;
/**
* This class is a abstract class for all type of PhysicalPlan.
*/
-public abstract class PhysicalPlan {
+public abstract class PhysicalPlan implements Serializable {
+ private static final long serialVersionUID = -6274856391535568352L;
private boolean isQuery;
private Operator.OperatorType operatorType;
diff --git a/iotdb/src/main/java/org/apache/iotdb/db/qp/physical/crud/AggregationPlan.java b/iotdb/src/main/java/org/apache/iotdb/db/qp/physical/crud/AggregationPlan.java
index 43a5e28..6e5f430 100644
--- a/iotdb/src/main/java/org/apache/iotdb/db/qp/physical/crud/AggregationPlan.java
+++ b/iotdb/src/main/java/org/apache/iotdb/db/qp/physical/crud/AggregationPlan.java
@@ -24,6 +24,7 @@ import org.apache.iotdb.db.qp.logical.Operator;
public class AggregationPlan extends QueryPlan {
+ private static final long serialVersionUID = -2049810573809076643L;
private List<String> aggregations = new ArrayList<>();
public AggregationPlan() {
diff --git a/iotdb/src/main/java/org/apache/iotdb/db/qp/physical/crud/DeletePlan.java b/iotdb/src/main/java/org/apache/iotdb/db/qp/physical/crud/DeletePlan.java
index b89717b..ae183bf 100644
--- a/iotdb/src/main/java/org/apache/iotdb/db/qp/physical/crud/DeletePlan.java
+++ b/iotdb/src/main/java/org/apache/iotdb/db/qp/physical/crud/DeletePlan.java
@@ -27,6 +27,7 @@ import org.apache.iotdb.tsfile.read.common.Path;
public class DeletePlan extends PhysicalPlan {
+ private static final long serialVersionUID = -6532570247476907037L;
private long deleteTime;
private List<Path> paths = new ArrayList<>();
diff --git a/iotdb/src/main/java/org/apache/iotdb/db/qp/physical/crud/FillQueryPlan.java b/iotdb/src/main/java/org/apache/iotdb/db/qp/physical/crud/FillQueryPlan.java
index 7f0399f..4fd666d 100644
--- a/iotdb/src/main/java/org/apache/iotdb/db/qp/physical/crud/FillQueryPlan.java
+++ b/iotdb/src/main/java/org/apache/iotdb/db/qp/physical/crud/FillQueryPlan.java
@@ -25,6 +25,7 @@ import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
public class FillQueryPlan extends QueryPlan {
+ private static final long serialVersionUID = -2091710518816582444L;
private long queryTime;
private Map<TSDataType, IFill> fillType;
diff --git a/iotdb/src/main/java/org/apache/iotdb/db/qp/physical/crud/GroupByPlan.java b/iotdb/src/main/java/org/apache/iotdb/db/qp/physical/crud/GroupByPlan.java
index f4087eb..d016c36 100644
--- a/iotdb/src/main/java/org/apache/iotdb/db/qp/physical/crud/GroupByPlan.java
+++ b/iotdb/src/main/java/org/apache/iotdb/db/qp/physical/crud/GroupByPlan.java
@@ -24,6 +24,7 @@ import org.apache.iotdb.tsfile.utils.Pair;
public class GroupByPlan extends AggregationPlan {
+ private static final long serialVersionUID = 8769258112457178898L;
private long unit;
private long origin;
private List<Pair<Long, Long>> intervals; // show intervals
diff --git a/iotdb/src/main/java/org/apache/iotdb/db/qp/physical/crud/InsertPlan.java b/iotdb/src/main/java/org/apache/iotdb/db/qp/physical/crud/InsertPlan.java
index 3e0c13a..cdb519c 100644
--- a/iotdb/src/main/java/org/apache/iotdb/db/qp/physical/crud/InsertPlan.java
+++ b/iotdb/src/main/java/org/apache/iotdb/db/qp/physical/crud/InsertPlan.java
@@ -28,6 +28,7 @@ import org.apache.iotdb.tsfile.read.common.Path;
public class InsertPlan extends PhysicalPlan {
+ private static final long serialVersionUID = 6102845312368561515L;
private String deviceId;
private String[] measurements;
private String[] values;
diff --git a/iotdb/src/main/java/org/apache/iotdb/db/qp/physical/crud/QueryPlan.java b/iotdb/src/main/java/org/apache/iotdb/db/qp/physical/crud/QueryPlan.java
index 4e60c5b..6bf7c0e 100644
--- a/iotdb/src/main/java/org/apache/iotdb/db/qp/physical/crud/QueryPlan.java
+++ b/iotdb/src/main/java/org/apache/iotdb/db/qp/physical/crud/QueryPlan.java
@@ -20,7 +20,7 @@ package org.apache.iotdb.db.qp.physical.crud;
import java.util.List;
import org.apache.iotdb.db.exception.qp.QueryProcessorException;
-import org.apache.iotdb.db.qp.executor.QueryProcessExecutor;
+import org.apache.iotdb.db.qp.executor.IQueryProcessExecutor;
import org.apache.iotdb.db.qp.logical.Operator;
import org.apache.iotdb.db.qp.physical.PhysicalPlan;
import org.apache.iotdb.tsfile.read.common.Path;
@@ -28,6 +28,7 @@ import org.apache.iotdb.tsfile.read.expression.IExpression;
public class QueryPlan extends PhysicalPlan {
+ private static final long serialVersionUID = -5865840981549195660L;
private List<Path> paths = null;
private IExpression expression = null;
@@ -43,7 +44,7 @@ public class QueryPlan extends PhysicalPlan {
/**
* Check if all paths exist.
*/
- public void checkPaths(QueryProcessExecutor executor) throws QueryProcessorException {
+ public void checkPaths(IQueryProcessExecutor executor) throws QueryProcessorException {
for (Path path : paths) {
if (!executor.judgePathExists(path)) {
throw new QueryProcessorException("Path doesn't exist: " + path);
diff --git a/iotdb/src/main/java/org/apache/iotdb/db/qp/physical/crud/UpdatePlan.java b/iotdb/src/main/java/org/apache/iotdb/db/qp/physical/crud/UpdatePlan.java
index 5da9c2a..60768d6 100644
--- a/iotdb/src/main/java/org/apache/iotdb/db/qp/physical/crud/UpdatePlan.java
+++ b/iotdb/src/main/java/org/apache/iotdb/db/qp/physical/crud/UpdatePlan.java
@@ -31,6 +31,7 @@ import org.apache.iotdb.tsfile.utils.StringContainer;
public class UpdatePlan extends PhysicalPlan {
+ private static final long serialVersionUID = 8952248212926920033L;
private List<Pair<Long, Long>> intervals = new ArrayList<>();
private String value;
private Path path;
diff --git a/iotdb/src/main/java/org/apache/iotdb/db/qp/physical/sys/AuthorPlan.java b/iotdb/src/main/java/org/apache/iotdb/db/qp/physical/sys/AuthorPlan.java
index b4b108a..3ec812c 100644
--- a/iotdb/src/main/java/org/apache/iotdb/db/qp/physical/sys/AuthorPlan.java
+++ b/iotdb/src/main/java/org/apache/iotdb/db/qp/physical/sys/AuthorPlan.java
@@ -32,6 +32,7 @@ import org.apache.iotdb.tsfile.read.common.Path;
public class AuthorPlan extends PhysicalPlan {
+ private static final long serialVersionUID = 6501894026593590182L;
private final AuthorOperator.AuthorType authorType;
private String userName;
private String roleName;
diff --git a/iotdb/src/main/java/org/apache/iotdb/db/qp/physical/sys/LoadDataPlan.java b/iotdb/src/main/java/org/apache/iotdb/db/qp/physical/sys/LoadDataPlan.java
index 7e2eb7a..f9b4553 100644
--- a/iotdb/src/main/java/org/apache/iotdb/db/qp/physical/sys/LoadDataPlan.java
+++ b/iotdb/src/main/java/org/apache/iotdb/db/qp/physical/sys/LoadDataPlan.java
@@ -27,6 +27,7 @@ import org.apache.iotdb.tsfile.read.common.Path;
public class LoadDataPlan extends PhysicalPlan {
+ private static final long serialVersionUID = -6631296704227106470L;
private final String inputFilePath;
private final String measureType;
diff --git a/iotdb/src/main/java/org/apache/iotdb/db/qp/physical/sys/MetadataPlan.java b/iotdb/src/main/java/org/apache/iotdb/db/qp/physical/sys/MetadataPlan.java
index 93a98a3..e3b71f5 100644
--- a/iotdb/src/main/java/org/apache/iotdb/db/qp/physical/sys/MetadataPlan.java
+++ b/iotdb/src/main/java/org/apache/iotdb/db/qp/physical/sys/MetadataPlan.java
@@ -32,6 +32,7 @@ import org.apache.iotdb.tsfile.read.common.Path;
public class MetadataPlan extends PhysicalPlan {
+ private static final long serialVersionUID = -3717406842093744475L;
private final MetadataOperator.NamespaceType namespaceType;
private Path path;
private TSDataType dataType;
diff --git a/iotdb/src/main/java/org/apache/iotdb/db/qp/physical/sys/PropertyPlan.java b/iotdb/src/main/java/org/apache/iotdb/db/qp/physical/sys/PropertyPlan.java
index 2f2c591..94ec623 100644
--- a/iotdb/src/main/java/org/apache/iotdb/db/qp/physical/sys/PropertyPlan.java
+++ b/iotdb/src/main/java/org/apache/iotdb/db/qp/physical/sys/PropertyPlan.java
@@ -31,6 +31,7 @@ import org.apache.iotdb.tsfile.read.common.Path;
*/
public class PropertyPlan extends PhysicalPlan {
+ private static final long serialVersionUID = -1462399624512066104L;
private final PropertyOperator.PropertyType propertyType;
private Path propertyPath;
private Path metadataPath;
diff --git a/iotdb/src/main/java/org/apache/iotdb/db/writelog/transfer/Codec.java b/iotdb/src/main/java/org/apache/iotdb/db/qp/physical/transfer/Codec.java
similarity index 91%
rename from iotdb/src/main/java/org/apache/iotdb/db/writelog/transfer/Codec.java
rename to iotdb/src/main/java/org/apache/iotdb/db/qp/physical/transfer/Codec.java
index 76c0598..51fc06b 100644
--- a/iotdb/src/main/java/org/apache/iotdb/db/writelog/transfer/Codec.java
+++ b/iotdb/src/main/java/org/apache/iotdb/db/qp/physical/transfer/Codec.java
@@ -16,10 +16,9 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.iotdb.db.writelog.transfer;
+package org.apache.iotdb.db.qp.physical.transfer;
import java.io.IOException;
-import org.apache.iotdb.db.auth.AuthException;
import org.apache.iotdb.db.qp.physical.PhysicalPlan;
interface Codec<T extends PhysicalPlan> {
diff --git a/iotdb/src/main/java/org/apache/iotdb/db/writelog/transfer/CodecInstances.java b/iotdb/src/main/java/org/apache/iotdb/db/qp/physical/transfer/CodecInstances.java
similarity index 99%
rename from iotdb/src/main/java/org/apache/iotdb/db/writelog/transfer/CodecInstances.java
rename to iotdb/src/main/java/org/apache/iotdb/db/qp/physical/transfer/CodecInstances.java
index 31b8527..4fcaac6 100644
--- a/iotdb/src/main/java/org/apache/iotdb/db/writelog/transfer/CodecInstances.java
+++ b/iotdb/src/main/java/org/apache/iotdb/db/qp/physical/transfer/CodecInstances.java
@@ -16,7 +16,7 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.iotdb.db.writelog.transfer;
+package org.apache.iotdb.db.qp.physical.transfer;
import java.io.IOException;
import java.nio.ByteBuffer;
@@ -81,8 +81,9 @@ public class CodecInstances {
*/
static String readString(ByteBuffer buffer) {
int valueLen = buffer.getInt();
- if(valueLen == NULL_VALUE_LEN)
+ if (valueLen == NULL_VALUE_LEN) {
return null;
+ }
return ReadWriteIOUtils.readStringWithoutLength(buffer, valueLen);
}
diff --git a/iotdb/src/main/java/org/apache/iotdb/db/writelog/transfer/PhysicalPlanCodec.java b/iotdb/src/main/java/org/apache/iotdb/db/qp/physical/transfer/PhysicalPlanCodec.java
similarity index 78%
rename from iotdb/src/main/java/org/apache/iotdb/db/writelog/transfer/PhysicalPlanCodec.java
rename to iotdb/src/main/java/org/apache/iotdb/db/qp/physical/transfer/PhysicalPlanCodec.java
index b2189ba..11ba358 100644
--- a/iotdb/src/main/java/org/apache/iotdb/db/writelog/transfer/PhysicalPlanCodec.java
+++ b/iotdb/src/main/java/org/apache/iotdb/db/qp/physical/transfer/PhysicalPlanCodec.java
@@ -16,22 +16,10 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.iotdb.db.writelog.transfer;
+package org.apache.iotdb.db.qp.physical.transfer;
import java.io.IOException;
-import java.nio.ByteBuffer;
-import java.util.ArrayList;
-import java.util.Arrays;
import java.util.HashMap;
-import java.util.List;
-import org.apache.iotdb.db.conf.IoTDBConfig;
-import org.apache.iotdb.db.conf.IoTDBDescriptor;
-import org.apache.iotdb.db.qp.physical.crud.DeletePlan;
-import org.apache.iotdb.db.qp.physical.crud.InsertPlan;
-import org.apache.iotdb.db.qp.physical.crud.UpdatePlan;
-import org.apache.iotdb.tsfile.read.common.Path;
-import org.apache.iotdb.tsfile.utils.BytesUtils;
-import org.apache.iotdb.tsfile.utils.Pair;
public enum PhysicalPlanCodec {
diff --git a/iotdb/src/main/java/org/apache/iotdb/db/writelog/transfer/PhysicalPlanLogTransfer.java b/iotdb/src/main/java/org/apache/iotdb/db/qp/physical/transfer/PhysicalPlanLogTransfer.java
similarity index 98%
rename from iotdb/src/main/java/org/apache/iotdb/db/writelog/transfer/PhysicalPlanLogTransfer.java
rename to iotdb/src/main/java/org/apache/iotdb/db/qp/physical/transfer/PhysicalPlanLogTransfer.java
index 838ddb7..6d11b11 100644
--- a/iotdb/src/main/java/org/apache/iotdb/db/writelog/transfer/PhysicalPlanLogTransfer.java
+++ b/iotdb/src/main/java/org/apache/iotdb/db/qp/physical/transfer/PhysicalPlanLogTransfer.java
@@ -16,7 +16,7 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.iotdb.db.writelog.transfer;
+package org.apache.iotdb.db.qp.physical.transfer;
import java.io.IOException;
import java.nio.BufferOverflowException;
diff --git a/iotdb/src/main/java/org/apache/iotdb/db/writelog/transfer/SystemLogOperator.java b/iotdb/src/main/java/org/apache/iotdb/db/qp/physical/transfer/SystemLogOperator.java
similarity index 96%
rename from iotdb/src/main/java/org/apache/iotdb/db/writelog/transfer/SystemLogOperator.java
rename to iotdb/src/main/java/org/apache/iotdb/db/qp/physical/transfer/SystemLogOperator.java
index 19b4d29..3993980 100644
--- a/iotdb/src/main/java/org/apache/iotdb/db/writelog/transfer/SystemLogOperator.java
+++ b/iotdb/src/main/java/org/apache/iotdb/db/qp/physical/transfer/SystemLogOperator.java
@@ -16,7 +16,7 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.iotdb.db.writelog.transfer;
+package org.apache.iotdb.db.qp.physical.transfer;
/**
* To avoid conflict with org.apache.iotdb.tsfiledb.qp.constant.SQLConstant.Operator.
diff --git a/iotdb/src/main/java/org/apache/iotdb/db/qp/strategy/PhysicalGenerator.java b/iotdb/src/main/java/org/apache/iotdb/db/qp/strategy/PhysicalGenerator.java
index d33e044..ff13a2c 100644
--- a/iotdb/src/main/java/org/apache/iotdb/db/qp/strategy/PhysicalGenerator.java
+++ b/iotdb/src/main/java/org/apache/iotdb/db/qp/strategy/PhysicalGenerator.java
@@ -24,7 +24,7 @@ import org.apache.iotdb.db.auth.AuthException;
import org.apache.iotdb.db.exception.ProcessorException;
import org.apache.iotdb.db.exception.qp.LogicalOperatorException;
import org.apache.iotdb.db.exception.qp.QueryProcessorException;
-import org.apache.iotdb.db.qp.executor.QueryProcessExecutor;
+import org.apache.iotdb.db.qp.executor.IQueryProcessExecutor;
import org.apache.iotdb.db.qp.logical.Operator;
import org.apache.iotdb.db.qp.logical.crud.BasicFunctionOperator;
import org.apache.iotdb.db.qp.logical.crud.DeleteOperator;
@@ -57,9 +57,9 @@ import org.slf4j.LoggerFactory;
public class PhysicalGenerator {
private static final Logger logger = LoggerFactory.getLogger(PhysicalGenerator.class);
- private QueryProcessExecutor executor;
+ private IQueryProcessExecutor executor;
- public PhysicalGenerator(QueryProcessExecutor executor) {
+ public PhysicalGenerator(IQueryProcessExecutor executor) {
this.executor = executor;
}
diff --git a/iotdb/src/main/java/org/apache/iotdb/db/qp/strategy/optimizer/ConcatPathOptimizer.java b/iotdb/src/main/java/org/apache/iotdb/db/qp/strategy/optimizer/ConcatPathOptimizer.java
index 5626923..9736f7c 100644
--- a/iotdb/src/main/java/org/apache/iotdb/db/qp/strategy/optimizer/ConcatPathOptimizer.java
+++ b/iotdb/src/main/java/org/apache/iotdb/db/qp/strategy/optimizer/ConcatPathOptimizer.java
@@ -26,6 +26,7 @@ import org.apache.iotdb.db.exception.PathErrorException;
import org.apache.iotdb.db.exception.qp.LogicalOperatorException;
import org.apache.iotdb.db.exception.qp.LogicalOptimizeException;
import org.apache.iotdb.db.qp.constant.SQLConstant;
+import org.apache.iotdb.db.qp.executor.IQueryProcessExecutor;
import org.apache.iotdb.db.qp.executor.QueryProcessExecutor;
import org.apache.iotdb.db.qp.logical.Operator;
import org.apache.iotdb.db.qp.logical.crud.BasicFunctionOperator;
@@ -46,9 +47,9 @@ public class ConcatPathOptimizer implements ILogicalOptimizer {
private static final Logger LOG = LoggerFactory.getLogger(ConcatPathOptimizer.class);
private static final String WARNING_NO_SUFFIX_PATHS = "given SFWOperator doesn't have suffix paths, cannot concat seriesPath";
- private QueryProcessExecutor executor;
+ private IQueryProcessExecutor executor;
- public ConcatPathOptimizer(QueryProcessExecutor executor) {
+ public ConcatPathOptimizer(IQueryProcessExecutor executor) {
this.executor = executor;
}
diff --git a/iotdb/src/main/java/org/apache/iotdb/db/query/aggregation/AggreResultData.java b/iotdb/src/main/java/org/apache/iotdb/db/query/aggregation/AggreResultData.java
index 0efae01..41cef8c 100644
--- a/iotdb/src/main/java/org/apache/iotdb/db/query/aggregation/AggreResultData.java
+++ b/iotdb/src/main/java/org/apache/iotdb/db/query/aggregation/AggreResultData.java
@@ -44,7 +44,7 @@ public class AggreResultData {
this.isSetValue = false;
}
- public void reSet() {
+ public void reset() {
isSetValue = false;
isSetTime = false;
}
diff --git a/iotdb/src/main/java/org/apache/iotdb/db/query/aggregation/impl/CountAggrFunc.java b/iotdb/src/main/java/org/apache/iotdb/db/query/aggregation/impl/CountAggrFunc.java
index 020aaf1..99ef06c 100644
--- a/iotdb/src/main/java/org/apache/iotdb/db/query/aggregation/impl/CountAggrFunc.java
+++ b/iotdb/src/main/java/org/apache/iotdb/db/query/aggregation/impl/CountAggrFunc.java
@@ -40,7 +40,7 @@ public class CountAggrFunc extends AggregateFunction {
@Override
public void init() {
- resultData.reSet();
+ resultData.reset();
resultData.setTimestamp(0);
resultData.setLongRet(0);
}
diff --git a/iotdb/src/main/java/org/apache/iotdb/db/query/aggregation/impl/FirstAggrFunc.java b/iotdb/src/main/java/org/apache/iotdb/db/query/aggregation/impl/FirstAggrFunc.java
index 3e8bb05..0688303 100644
--- a/iotdb/src/main/java/org/apache/iotdb/db/query/aggregation/impl/FirstAggrFunc.java
+++ b/iotdb/src/main/java/org/apache/iotdb/db/query/aggregation/impl/FirstAggrFunc.java
@@ -37,7 +37,7 @@ public class FirstAggrFunc extends AggregateFunction {
@Override
public void init() {
- resultData.reSet();
+ resultData.reset();
}
@Override
diff --git a/iotdb/src/main/java/org/apache/iotdb/db/query/aggregation/impl/LastAggrFunc.java b/iotdb/src/main/java/org/apache/iotdb/db/query/aggregation/impl/LastAggrFunc.java
index 8c12728..b8ea98b 100644
--- a/iotdb/src/main/java/org/apache/iotdb/db/query/aggregation/impl/LastAggrFunc.java
+++ b/iotdb/src/main/java/org/apache/iotdb/db/query/aggregation/impl/LastAggrFunc.java
@@ -37,7 +37,7 @@ public class LastAggrFunc extends AggregateFunction {
@Override
public void init() {
- resultData.reSet();
+ resultData.reset();
}
@Override
diff --git a/iotdb/src/main/java/org/apache/iotdb/db/query/aggregation/impl/MaxTimeAggrFunc.java b/iotdb/src/main/java/org/apache/iotdb/db/query/aggregation/impl/MaxTimeAggrFunc.java
index 73b8f1c..ba3460b 100644
--- a/iotdb/src/main/java/org/apache/iotdb/db/query/aggregation/impl/MaxTimeAggrFunc.java
+++ b/iotdb/src/main/java/org/apache/iotdb/db/query/aggregation/impl/MaxTimeAggrFunc.java
@@ -37,7 +37,7 @@ public class MaxTimeAggrFunc extends AggregateFunction {
@Override
public void init() {
- resultData.reSet();
+ resultData.reset();
}
@Override
diff --git a/iotdb/src/main/java/org/apache/iotdb/db/query/aggregation/impl/MaxValueAggrFunc.java b/iotdb/src/main/java/org/apache/iotdb/db/query/aggregation/impl/MaxValueAggrFunc.java
index 8e2d2a7..40bea9c 100644
--- a/iotdb/src/main/java/org/apache/iotdb/db/query/aggregation/impl/MaxValueAggrFunc.java
+++ b/iotdb/src/main/java/org/apache/iotdb/db/query/aggregation/impl/MaxValueAggrFunc.java
@@ -36,7 +36,7 @@ public class MaxValueAggrFunc extends AggregateFunction {
@Override
public void init() {
- resultData.reSet();
+ resultData.reset();
}
@Override
diff --git a/iotdb/src/main/java/org/apache/iotdb/db/query/aggregation/impl/MeanAggrFunc.java b/iotdb/src/main/java/org/apache/iotdb/db/query/aggregation/impl/MeanAggrFunc.java
index da6cc95..5373018 100644
--- a/iotdb/src/main/java/org/apache/iotdb/db/query/aggregation/impl/MeanAggrFunc.java
+++ b/iotdb/src/main/java/org/apache/iotdb/db/query/aggregation/impl/MeanAggrFunc.java
@@ -42,7 +42,7 @@ public class MeanAggrFunc extends AggregateFunction {
@Override
public void init() {
- resultData.reSet();
+ resultData.reset();
sum = 0.0;
cnt = 0;
}
diff --git a/iotdb/src/main/java/org/apache/iotdb/db/query/aggregation/impl/MinTimeAggrFunc.java b/iotdb/src/main/java/org/apache/iotdb/db/query/aggregation/impl/MinTimeAggrFunc.java
index f083812..278fde2 100644
--- a/iotdb/src/main/java/org/apache/iotdb/db/query/aggregation/impl/MinTimeAggrFunc.java
+++ b/iotdb/src/main/java/org/apache/iotdb/db/query/aggregation/impl/MinTimeAggrFunc.java
@@ -36,7 +36,7 @@ public class MinTimeAggrFunc extends AggregateFunction {
@Override
public void init() {
- resultData.reSet();
+ resultData.reset();
}
@Override
diff --git a/iotdb/src/main/java/org/apache/iotdb/db/query/aggregation/impl/MinValueAggrFunc.java b/iotdb/src/main/java/org/apache/iotdb/db/query/aggregation/impl/MinValueAggrFunc.java
index ad64fb3..137f8fe 100644
--- a/iotdb/src/main/java/org/apache/iotdb/db/query/aggregation/impl/MinValueAggrFunc.java
+++ b/iotdb/src/main/java/org/apache/iotdb/db/query/aggregation/impl/MinValueAggrFunc.java
@@ -36,7 +36,7 @@ public class MinValueAggrFunc extends AggregateFunction {
@Override
public void init() {
- resultData.reSet();
+ resultData.reset();
}
@Override
diff --git a/iotdb/src/main/java/org/apache/iotdb/db/query/control/QueryResourceManager.java b/iotdb/src/main/java/org/apache/iotdb/db/query/control/QueryResourceManager.java
index e4ef67e..c5f029e 100644
--- a/iotdb/src/main/java/org/apache/iotdb/db/query/control/QueryResourceManager.java
+++ b/iotdb/src/main/java/org/apache/iotdb/db/query/control/QueryResourceManager.java
@@ -142,6 +142,23 @@ public class QueryResourceManager {
}
}
+ /**
+ * Begin query and set query tokens of all filter paths in expression. This method is used in
+ * filter calculation.
+ * @param remoteDeviceIdSet device id set which can not handle locally
+ * Note : the method is for cluster
+ */
+ public void beginQueryOfGivenExpression(long jobId, IExpression expression,
+ Set<String> remoteDeviceIdSet) throws FileNodeManagerException {
+ Set<String> deviceIdSet = new HashSet<>();
+ getUniquePaths(expression, deviceIdSet);
+ deviceIdSet.removeAll(remoteDeviceIdSet);
+ for (String deviceId : deviceIdSet) {
+ putQueryTokenForCurrentRequestThread(jobId, deviceId,
+ FileNodeManager.getInstance().beginQuery(deviceId));
+ }
+ }
+
public QueryDataSource getQueryDataSource(Path selectedPath,
QueryContext context)
throws FileNodeManagerException {
@@ -165,12 +182,12 @@ public class QueryResourceManager {
// no resource need to be released.
return;
}
- for (Map.Entry<String, List<Integer>> entry : queryTokensMap.get(jobId).entrySet()) {
- for (int token : entry.getValue()) {
- FileNodeManager.getInstance().endQuery(entry.getKey(), token);
- }
+ for (Map.Entry<String, List<Integer>> entry : queryTokensMap.get(jobId).entrySet()) {
+ for (int token : entry.getValue()) {
+ FileNodeManager.getInstance().endQuery(entry.getKey(), token);
}
- queryTokensMap.remove(jobId);
+ }
+ queryTokensMap.remove(jobId);
// remove usage of opened file paths of current thread
filePathsManager.removeUsedFilesForGivenJob(jobId);
}
diff --git a/iotdb/src/main/java/org/apache/iotdb/db/query/dataset/AggreResultDataPointReader.java b/iotdb/src/main/java/org/apache/iotdb/db/query/dataset/AggreResultDataPointReader.java
index c2edab0..6545bfa 100644
--- a/iotdb/src/main/java/org/apache/iotdb/db/query/dataset/AggreResultDataPointReader.java
+++ b/iotdb/src/main/java/org/apache/iotdb/db/query/dataset/AggreResultDataPointReader.java
@@ -36,7 +36,7 @@ public class AggreResultDataPointReader implements IPointReader {
@Override
public TimeValuePair next() {
TimeValuePair timeValuePair = TimeValuePairUtils.getCurrentTimeValuePair(aggreResultData);
- aggreResultData.reSet();
+ aggreResultData.reset();
return timeValuePair;
}
diff --git a/iotdb/src/main/java/org/apache/iotdb/db/query/dataset/EngineDataSetWithTimeGenerator.java b/iotdb/src/main/java/org/apache/iotdb/db/query/dataset/EngineDataSetWithTimeGenerator.java
index 6e76e66..04b46f1 100644
--- a/iotdb/src/main/java/org/apache/iotdb/db/query/dataset/EngineDataSetWithTimeGenerator.java
+++ b/iotdb/src/main/java/org/apache/iotdb/db/query/dataset/EngineDataSetWithTimeGenerator.java
@@ -98,36 +98,11 @@ public class EngineDataSetWithTimeGenerator extends QueryDataSet {
return hasCachedRowRecord;
}
- private Field getField(Object value, TSDataType dataType) {
- Field field = new Field(dataType);
-
- if (value == null) {
- field.setNull();
- return field;
- }
+ public EngineTimeGenerator getTimeGenerator() {
+ return timeGenerator;
+ }
- switch (dataType) {
- case DOUBLE:
- field.setDoubleV((double) value);
- break;
- case FLOAT:
- field.setFloatV((float) value);
- break;
- case INT64:
- field.setLongV((long) value);
- break;
- case INT32:
- field.setIntV((int) value);
- break;
- case BOOLEAN:
- field.setBoolV((boolean) value);
- break;
- case TEXT:
- field.setBinaryV((Binary) value);
- break;
- default:
- throw new UnSupportedDataTypeException("UnSupported: " + dataType);
- }
- return field;
+ public List<EngineReaderByTimeStamp> getReaders() {
+ return readers;
}
}
diff --git a/iotdb/src/main/java/org/apache/iotdb/db/query/dataset/EngineDataSetWithoutTimeGenerator.java b/iotdb/src/main/java/org/apache/iotdb/db/query/dataset/EngineDataSetWithoutTimeGenerator.java
index 73fc71f..bc9bb08 100644
--- a/iotdb/src/main/java/org/apache/iotdb/db/query/dataset/EngineDataSetWithoutTimeGenerator.java
+++ b/iotdb/src/main/java/org/apache/iotdb/db/query/dataset/EngineDataSetWithoutTimeGenerator.java
@@ -154,4 +154,8 @@ public class EngineDataSetWithoutTimeGenerator extends QueryDataSet {
timeSet.remove(t);
return t;
}
+
+ public List<IPointReader> getReaders() {
+ return readers;
+ }
}
diff --git a/iotdb/src/main/java/org/apache/iotdb/db/query/executor/groupby/GroupByEngineDataSet.java b/iotdb/src/main/java/org/apache/iotdb/db/query/dataset/groupby/GroupByEngineDataSet.java
similarity index 96%
rename from iotdb/src/main/java/org/apache/iotdb/db/query/executor/groupby/GroupByEngineDataSet.java
rename to iotdb/src/main/java/org/apache/iotdb/db/query/dataset/groupby/GroupByEngineDataSet.java
index 2ccecf5..cd1b95b 100644
--- a/iotdb/src/main/java/org/apache/iotdb/db/query/executor/groupby/GroupByEngineDataSet.java
+++ b/iotdb/src/main/java/org/apache/iotdb/db/query/dataset/groupby/GroupByEngineDataSet.java
@@ -17,14 +17,14 @@
* under the License.
*/
-package org.apache.iotdb.db.query.executor.groupby;
+package org.apache.iotdb.db.query.dataset.groupby;
import java.util.ArrayList;
import java.util.List;
import org.apache.iotdb.db.exception.PathErrorException;
import org.apache.iotdb.db.exception.ProcessorException;
import org.apache.iotdb.db.metadata.MManager;
-import org.apache.iotdb.db.query.aggregation.AggreFuncFactory;
+import org.apache.iotdb.db.query.factory.AggreFuncFactory;
import org.apache.iotdb.db.query.aggregation.AggreResultData;
import org.apache.iotdb.db.query.aggregation.AggregateFunction;
import org.apache.iotdb.tsfile.exception.write.UnSupportedDataTypeException;
@@ -90,15 +90,12 @@ public abstract class GroupByEngineDataSet extends QueryDataSet {
return true;
}
- // end
- if (usedIndex >= mergedIntervals.size()) {
- return false;
- }
-
// skip the intervals in coverage of last time-partition
while (usedIndex < mergedIntervals.size() && mergedIntervals.get(usedIndex).right < endTime) {
usedIndex++;
}
+
+ //end
if (usedIndex >= mergedIntervals.size()) {
return false;
}
diff --git a/iotdb/src/main/java/org/apache/iotdb/db/query/executor/groupby/GroupByWithOnlyTimeFilterDataSetDataSet.java b/iotdb/src/main/java/org/apache/iotdb/db/query/dataset/groupby/GroupByWithOnlyTimeFilterDataSet.java
similarity index 97%
rename from iotdb/src/main/java/org/apache/iotdb/db/query/executor/groupby/GroupByWithOnlyTimeFilterDataSetDataSet.java
rename to iotdb/src/main/java/org/apache/iotdb/db/query/dataset/groupby/GroupByWithOnlyTimeFilterDataSet.java
index e82b567..e2688ba 100644
--- a/iotdb/src/main/java/org/apache/iotdb/db/query/executor/groupby/GroupByWithOnlyTimeFilterDataSetDataSet.java
+++ b/iotdb/src/main/java/org/apache/iotdb/db/query/dataset/groupby/GroupByWithOnlyTimeFilterDataSet.java
@@ -17,7 +17,7 @@
* under the License.
*/
-package org.apache.iotdb.db.query.executor.groupby;
+package org.apache.iotdb.db.query.dataset.groupby;
import java.io.IOException;
import java.util.ArrayList;
@@ -45,7 +45,7 @@ import org.apache.iotdb.tsfile.read.expression.impl.GlobalTimeExpression;
import org.apache.iotdb.tsfile.read.filter.basic.Filter;
import org.apache.iotdb.tsfile.utils.Pair;
-public class GroupByWithOnlyTimeFilterDataSetDataSet extends GroupByEngineDataSet {
+public class GroupByWithOnlyTimeFilterDataSet extends GroupByEngineDataSet {
protected List<IPointReader> unSequenceReaderList;
protected List<IAggregateReader> sequenceReaderList;
@@ -56,7 +56,7 @@ public class GroupByWithOnlyTimeFilterDataSetDataSet extends GroupByEngineDataSe
/**
* constructor.
*/
- public GroupByWithOnlyTimeFilterDataSetDataSet(long jobId, List<Path> paths, long unit,
+ public GroupByWithOnlyTimeFilterDataSet(long jobId, List<Path> paths, long unit,
long origin, List<Pair<Long, Long>> mergedIntervals) {
super(jobId, paths, unit, origin, mergedIntervals);
this.unSequenceReaderList = new ArrayList<>();
@@ -103,7 +103,7 @@ public class GroupByWithOnlyTimeFilterDataSetDataSet extends GroupByEngineDataSe
public RowRecord next() throws IOException {
if (!hasCachedTimeInterval) {
throw new IOException("need to call hasNext() before calling next() "
- + "in GroupByWithOnlyTimeFilterDataSetDataSet.");
+ + "in GroupByWithOnlyTimeFilterDataSet.");
}
hasCachedTimeInterval = false;
RowRecord record = new RowRecord(startTime);
diff --git a/iotdb/src/main/java/org/apache/iotdb/db/query/executor/groupby/GroupByWithValueFilterDataSetDataSet.java b/iotdb/src/main/java/org/apache/iotdb/db/query/dataset/groupby/GroupByWithValueFilterDataSet.java
similarity index 94%
rename from iotdb/src/main/java/org/apache/iotdb/db/query/executor/groupby/GroupByWithValueFilterDataSetDataSet.java
rename to iotdb/src/main/java/org/apache/iotdb/db/query/dataset/groupby/GroupByWithValueFilterDataSet.java
index 7a6b3d2..f7ffa29 100644
--- a/iotdb/src/main/java/org/apache/iotdb/db/query/executor/groupby/GroupByWithValueFilterDataSetDataSet.java
+++ b/iotdb/src/main/java/org/apache/iotdb/db/query/dataset/groupby/GroupByWithValueFilterDataSet.java
@@ -17,7 +17,7 @@
* under the License.
*/
-package org.apache.iotdb.db.query.executor.groupby;
+package org.apache.iotdb.db.query.dataset.groupby;
import java.io.IOException;
import java.util.ArrayList;
@@ -38,7 +38,7 @@ import org.apache.iotdb.tsfile.read.expression.IExpression;
import org.apache.iotdb.tsfile.read.query.timegenerator.TimeGenerator;
import org.apache.iotdb.tsfile.utils.Pair;
-public class GroupByWithValueFilterDataSetDataSet extends GroupByEngineDataSet {
+public class GroupByWithValueFilterDataSet extends GroupByEngineDataSet {
private List<EngineReaderByTimeStamp> allDataReaderList;
private TimeGenerator timestampGenerator;
@@ -59,7 +59,7 @@ public class GroupByWithValueFilterDataSetDataSet extends GroupByEngineDataSet {
/**
* constructor.
*/
- public GroupByWithValueFilterDataSetDataSet(long jobId, List<Path> paths, long unit, long origin,
+ public GroupByWithValueFilterDataSet(long jobId, List<Path> paths, long unit, long origin,
List<Pair<Long, Long>> mergedIntervals) {
super(jobId, paths, unit, origin, mergedIntervals);
this.allDataReaderList = new ArrayList<>();
@@ -85,7 +85,7 @@ public class GroupByWithValueFilterDataSetDataSet extends GroupByEngineDataSet {
public RowRecord next() throws IOException {
if (!hasCachedTimeInterval) {
throw new IOException("need to call hasNext() before calling next()"
- + " in GroupByWithOnlyTimeFilterDataSetDataSet.");
+ + " in GroupByWithOnlyTimeFilterDataSet.");
}
hasCachedTimeInterval = false;
for (AggregateFunction function : functions) {
diff --git a/iotdb/src/main/java/org/apache/iotdb/db/query/executor/AggregateEngineExecutor.java b/iotdb/src/main/java/org/apache/iotdb/db/query/executor/AggregateEngineExecutor.java
index ba76c2f..508a787 100644
--- a/iotdb/src/main/java/org/apache/iotdb/db/query/executor/AggregateEngineExecutor.java
+++ b/iotdb/src/main/java/org/apache/iotdb/db/query/executor/AggregateEngineExecutor.java
@@ -28,7 +28,7 @@ import org.apache.iotdb.db.exception.FileNodeManagerException;
import org.apache.iotdb.db.exception.PathErrorException;
import org.apache.iotdb.db.exception.ProcessorException;
import org.apache.iotdb.db.metadata.MManager;
-import org.apache.iotdb.db.query.aggregation.AggreFuncFactory;
+import org.apache.iotdb.db.query.factory.AggreFuncFactory;
import org.apache.iotdb.db.query.aggregation.AggreResultData;
import org.apache.iotdb.db.query.aggregation.AggregateFunction;
import org.apache.iotdb.db.query.aggregation.impl.LastAggrFunc;
diff --git a/iotdb/src/main/java/org/apache/iotdb/db/query/executor/EngineExecutorWithoutTimeGenerator.java b/iotdb/src/main/java/org/apache/iotdb/db/query/executor/EngineExecutorWithoutTimeGenerator.java
index 07073cd..b9464ba 100644
--- a/iotdb/src/main/java/org/apache/iotdb/db/query/executor/EngineExecutorWithoutTimeGenerator.java
+++ b/iotdb/src/main/java/org/apache/iotdb/db/query/executor/EngineExecutorWithoutTimeGenerator.java
@@ -22,18 +22,11 @@ package org.apache.iotdb.db.query.executor;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
-import org.apache.iotdb.db.engine.querycontext.QueryDataSource;
import org.apache.iotdb.db.exception.FileNodeManagerException;
-import org.apache.iotdb.db.exception.PathErrorException;
-import org.apache.iotdb.db.metadata.MManager;
import org.apache.iotdb.db.query.context.QueryContext;
import org.apache.iotdb.db.query.control.QueryResourceManager;
import org.apache.iotdb.db.query.dataset.EngineDataSetWithoutTimeGenerator;
-import org.apache.iotdb.db.query.factory.SeriesReaderFactory;
-import org.apache.iotdb.db.query.reader.AllDataReader;
import org.apache.iotdb.db.query.reader.IPointReader;
-import org.apache.iotdb.db.query.reader.merge.PriorityMergeReader;
-import org.apache.iotdb.db.query.reader.sequence.SequenceDataReader;
import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
import org.apache.iotdb.tsfile.read.common.Path;
import org.apache.iotdb.tsfile.read.expression.QueryExpression;
@@ -44,7 +37,7 @@ import org.apache.iotdb.tsfile.read.query.dataset.QueryDataSet;
/**
* IoTDB query executor with global time filter.
*/
-public class EngineExecutorWithoutTimeGenerator {
+public class EngineExecutorWithoutTimeGenerator extends ExecutorWithoutTimeGenerator {
private QueryExpression queryExpression;
@@ -53,67 +46,16 @@ public class EngineExecutorWithoutTimeGenerator {
}
/**
- * with global time filter.
+ * without filter or with global time filter.
*/
- public QueryDataSet executeWithGlobalTimeFilter(QueryContext context)
+ public QueryDataSet execute(QueryContext context)
throws FileNodeManagerException {
- Filter timeFilter = ((GlobalTimeExpression) queryExpression.getExpression()).getFilter();
-
- List<IPointReader> readersOfSelectedSeries = new ArrayList<>();
- List<TSDataType> dataTypes = new ArrayList<>();
-
- QueryResourceManager.getInstance()
- .beginQueryOfGivenQueryPaths(context.getJobId(), queryExpression.getSelectedSeries());
-
- for (Path path : queryExpression.getSelectedSeries()) {
-
- QueryDataSource queryDataSource = QueryResourceManager.getInstance().getQueryDataSource(path,
- context);
-
- // add data type
- try {
- dataTypes.add(MManager.getInstance().getSeriesType(path.getFullPath()));
- } catch (PathErrorException e) {
- throw new FileNodeManagerException(e);
- }
-
- // sequence reader for one sealed tsfile
- SequenceDataReader tsFilesReader;
- try {
- tsFilesReader = new SequenceDataReader(queryDataSource.getSeqDataSource(),
- timeFilter, context);
- } catch (IOException e) {
- throw new FileNodeManagerException(e);
- }
-
- // unseq reader for all chunk groups in unSeqFile
- PriorityMergeReader unSeqMergeReader;
- try {
- unSeqMergeReader = SeriesReaderFactory.getInstance()
- .createUnSeqMergeReader(queryDataSource.getOverflowSeriesDataSource(), timeFilter);
- } catch (IOException e) {
- throw new FileNodeManagerException(e);
- }
-
- // merge sequence data with unsequence data.
- readersOfSelectedSeries.add(new AllDataReader(tsFilesReader, unSeqMergeReader));
+ Filter timeFilter = null;
+ if (queryExpression.hasQueryFilter()) {
+ timeFilter = ((GlobalTimeExpression) queryExpression.getExpression()).getFilter();
}
- try {
- return new EngineDataSetWithoutTimeGenerator(queryExpression.getSelectedSeries(), dataTypes,
- readersOfSelectedSeries);
- } catch (IOException e) {
- throw new FileNodeManagerException(e);
- }
- }
-
- /**
- * without filter.
- */
- public QueryDataSet executeWithoutFilter(QueryContext context)
- throws FileNodeManagerException {
-
List<IPointReader> readersOfSelectedSeries = new ArrayList<>();
List<TSDataType> dataTypes = new ArrayList<>();
@@ -122,36 +64,8 @@ public class EngineExecutorWithoutTimeGenerator {
for (Path path : queryExpression.getSelectedSeries()) {
- QueryDataSource queryDataSource = QueryResourceManager.getInstance().getQueryDataSource(path,
- context);
-
- // add data type
- try {
- dataTypes.add(MManager.getInstance().getSeriesType(path.getFullPath()));
- } catch (PathErrorException e) {
- throw new FileNodeManagerException(e);
- }
-
- // sequence insert data
- SequenceDataReader tsFilesReader;
- try {
- tsFilesReader = new SequenceDataReader(queryDataSource.getSeqDataSource(),
- null, context);
- } catch (IOException e) {
- throw new FileNodeManagerException(e);
- }
-
- // unseq insert data
- PriorityMergeReader unSeqMergeReader;
- try {
- unSeqMergeReader = SeriesReaderFactory.getInstance()
- .createUnSeqMergeReader(queryDataSource.getOverflowSeriesDataSource(), null);
- } catch (IOException e) {
- throw new FileNodeManagerException(e);
- }
-
- // merge sequence data with unsequence data.
- readersOfSelectedSeries.add(new AllDataReader(tsFilesReader, unSeqMergeReader));
+ IPointReader reader = createSeriesReader(context, path, dataTypes, timeFilter);
+ readersOfSelectedSeries.add(reader);
}
try {
diff --git a/iotdb/src/main/java/org/apache/iotdb/db/query/executor/EngineQueryRouter.java b/iotdb/src/main/java/org/apache/iotdb/db/query/executor/EngineQueryRouter.java
index 96ad5ad..03c600d 100644
--- a/iotdb/src/main/java/org/apache/iotdb/db/query/executor/EngineQueryRouter.java
+++ b/iotdb/src/main/java/org/apache/iotdb/db/query/executor/EngineQueryRouter.java
@@ -27,8 +27,8 @@ import org.apache.iotdb.db.exception.FileNodeManagerException;
import org.apache.iotdb.db.exception.PathErrorException;
import org.apache.iotdb.db.exception.ProcessorException;
import org.apache.iotdb.db.query.context.QueryContext;
-import org.apache.iotdb.db.query.executor.groupby.GroupByWithOnlyTimeFilterDataSetDataSet;
-import org.apache.iotdb.db.query.executor.groupby.GroupByWithValueFilterDataSetDataSet;
+import org.apache.iotdb.db.query.dataset.groupby.GroupByWithOnlyTimeFilterDataSet;
+import org.apache.iotdb.db.query.dataset.groupby.GroupByWithValueFilterDataSet;
import org.apache.iotdb.db.query.fill.IFill;
import org.apache.iotdb.tsfile.exception.filter.QueryFilterOptimizationException;
import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
@@ -47,13 +47,9 @@ import org.apache.iotdb.tsfile.utils.Pair;
* Query entrance class of IoTDB query process. All query clause will be transformed to physical
* plan, physical plan will be executed by EngineQueryRouter.
*/
-public class EngineQueryRouter {
+public class EngineQueryRouter implements IEngineQueryRouter{
-
-
- /**
- * execute physical plan.
- */
+ @Override
public QueryDataSet query(QueryExpression queryExpression, QueryContext context)
throws FileNodeManagerException {
@@ -66,7 +62,7 @@ public class EngineQueryRouter {
if (optimizedExpression.getType() == ExpressionType.GLOBAL_TIME) {
EngineExecutorWithoutTimeGenerator engineExecutor =
new EngineExecutorWithoutTimeGenerator(queryExpression);
- return engineExecutor.executeWithGlobalTimeFilter(context);
+ return engineExecutor.execute(context);
} else {
EngineExecutorWithTimeGenerator engineExecutor = new EngineExecutorWithTimeGenerator(
queryExpression);
@@ -79,13 +75,11 @@ public class EngineQueryRouter {
} else {
EngineExecutorWithoutTimeGenerator engineExecutor = new EngineExecutorWithoutTimeGenerator(
queryExpression);
- return engineExecutor.executeWithoutFilter(context);
+ return engineExecutor.execute(context);
}
}
- /**
- * execute aggregation query.
- */
+ @Override
public QueryDataSet aggregate(List<Path> selectedSeries, List<String> aggres,
IExpression expression, QueryContext context) throws QueryFilterOptimizationException,
FileNodeManagerException, IOException, PathErrorException, ProcessorException {
@@ -107,17 +101,7 @@ public class EngineQueryRouter {
}
}
- /**
- * execute groupBy query.
- *
- * @param selectedSeries select path list
- * @param aggres aggregation name list
- * @param expression filter expression
- * @param unit time granularity for interval partitioning, unit is ms.
- * @param origin the datum time point for interval division is divided into a time interval for
- * each TimeUnit time from this point forward and backward.
- * @param intervals time intervals, closed interval.
- */
+ @Override
public QueryDataSet groupBy(List<Path> selectedSeries, List<String> aggres,
IExpression expression, long unit, long origin, List<Pair<Long, Long>> intervals,
QueryContext context)
@@ -164,12 +148,12 @@ public class EngineQueryRouter {
IExpression optimizedExpression = ExpressionOptimizer.getInstance()
.optimize(expression, selectedSeries);
if (optimizedExpression.getType() == ExpressionType.GLOBAL_TIME) {
- GroupByWithOnlyTimeFilterDataSetDataSet groupByEngine = new GroupByWithOnlyTimeFilterDataSetDataSet(
+ GroupByWithOnlyTimeFilterDataSet groupByEngine = new GroupByWithOnlyTimeFilterDataSet(
nextJobId, selectedSeries, unit, origin, mergedIntervalList);
groupByEngine.initGroupBy(context, aggres, optimizedExpression);
return groupByEngine;
} else {
- GroupByWithValueFilterDataSetDataSet groupByEngine = new GroupByWithValueFilterDataSetDataSet(
+ GroupByWithValueFilterDataSet groupByEngine = new GroupByWithValueFilterDataSet(
nextJobId,
selectedSeries, unit, origin, mergedIntervalList);
groupByEngine.initGroupBy(context, aggres, optimizedExpression);
@@ -177,13 +161,7 @@ public class EngineQueryRouter {
}
}
- /**
- * execute fill query.
- *
- * @param fillPaths select path list
- * @param queryTime timestamp
- * @param fillType type IFill map
- */
+ @Override
public QueryDataSet fill(List<Path> fillPaths, long queryTime, Map<TSDataType, IFill> fillType,
QueryContext context)
throws FileNodeManagerException, PathErrorException, IOException {
diff --git a/iotdb/src/main/java/org/apache/iotdb/db/query/executor/ExecutorWithoutTimeGenerator.java b/iotdb/src/main/java/org/apache/iotdb/db/query/executor/ExecutorWithoutTimeGenerator.java
new file mode 100644
index 0000000..4e5a66a
--- /dev/null
+++ b/iotdb/src/main/java/org/apache/iotdb/db/query/executor/ExecutorWithoutTimeGenerator.java
@@ -0,0 +1,80 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.db.query.executor;
+
+import java.io.IOException;
+import java.util.List;
+import org.apache.iotdb.db.engine.querycontext.QueryDataSource;
+import org.apache.iotdb.db.exception.FileNodeManagerException;
+import org.apache.iotdb.db.exception.PathErrorException;
+import org.apache.iotdb.db.metadata.MManager;
+import org.apache.iotdb.db.query.context.QueryContext;
+import org.apache.iotdb.db.query.control.QueryResourceManager;
+import org.apache.iotdb.db.query.factory.SeriesReaderFactory;
+import org.apache.iotdb.db.query.reader.AllDataReader;
+import org.apache.iotdb.db.query.reader.IPointReader;
+import org.apache.iotdb.db.query.reader.merge.PriorityMergeReader;
+import org.apache.iotdb.db.query.reader.sequence.SequenceDataReader;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+import org.apache.iotdb.tsfile.read.common.Path;
+import org.apache.iotdb.tsfile.read.filter.basic.Filter;
+
+public abstract class ExecutorWithoutTimeGenerator {
+ /**
+ * Create reader of a series
+ *
+ * @param context query context
+ * @param path series path
+ * @param dataTypes list of data type
+ * @param timeFilter time filter
+ * @return reader of the series
+ */
+ public static IPointReader createSeriesReader(QueryContext context, Path path,
+ List<TSDataType> dataTypes, Filter timeFilter)
+ throws FileNodeManagerException {
+ QueryDataSource queryDataSource = QueryResourceManager.getInstance().getQueryDataSource(path,
+ context);
+ // add data type
+ try {
+ dataTypes.add(MManager.getInstance().getSeriesType(path.getFullPath()));
+ } catch (PathErrorException e) {
+ throw new FileNodeManagerException(e);
+ }
+
+ // sequence reader for one sealed tsfile
+ SequenceDataReader tsFilesReader;
+ try {
+ tsFilesReader = new SequenceDataReader(queryDataSource.getSeqDataSource(),
+ timeFilter, context);
+ } catch (IOException e) {
+ throw new FileNodeManagerException(e);
+ }
+
+ // unseq reader for all chunk groups in unSeqFile
+ PriorityMergeReader unSeqMergeReader;
+ try {
+ unSeqMergeReader = SeriesReaderFactory.getInstance()
+ .createUnSeqMergeReader(queryDataSource.getOverflowSeriesDataSource(), timeFilter);
+ } catch (IOException e) {
+ throw new FileNodeManagerException(e);
+ }
+ // merge sequence data with unsequence data.
+ return new AllDataReader(tsFilesReader, unSeqMergeReader);
+ }
+}
diff --git a/iotdb/src/main/java/org/apache/iotdb/db/query/executor/IEngineQueryRouter.java b/iotdb/src/main/java/org/apache/iotdb/db/query/executor/IEngineQueryRouter.java
new file mode 100644
index 0000000..01c1aed
--- /dev/null
+++ b/iotdb/src/main/java/org/apache/iotdb/db/query/executor/IEngineQueryRouter.java
@@ -0,0 +1,78 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.db.query.executor;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.Map;
+import org.apache.iotdb.db.exception.FileNodeManagerException;
+import org.apache.iotdb.db.exception.PathErrorException;
+import org.apache.iotdb.db.exception.ProcessorException;
+import org.apache.iotdb.db.query.context.QueryContext;
+import org.apache.iotdb.db.query.fill.IFill;
+import org.apache.iotdb.tsfile.exception.filter.QueryFilterOptimizationException;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+import org.apache.iotdb.tsfile.read.common.Path;
+import org.apache.iotdb.tsfile.read.expression.IExpression;
+import org.apache.iotdb.tsfile.read.expression.QueryExpression;
+import org.apache.iotdb.tsfile.read.query.dataset.QueryDataSet;
+import org.apache.iotdb.tsfile.utils.Pair;
+
+public interface IEngineQueryRouter {
+
+ /**
+ * Execute physical plan.
+ */
+ QueryDataSet query(QueryExpression queryExpression, QueryContext context)
+ throws FileNodeManagerException, PathErrorException;
+
+ /**
+ * Execute aggregation query.
+ */
+ QueryDataSet aggregate(List<Path> selectedSeries, List<String> aggres,
+ IExpression expression, QueryContext context)
+ throws QueryFilterOptimizationException, FileNodeManagerException, IOException, PathErrorException, ProcessorException;
+
+ /**
+ * Execute groupBy query.
+ *
+ * @param selectedSeries select path list
+ * @param aggres aggregation name list
+ * @param expression filter expression
+ * @param unit time granularity for interval partitioning, unit is ms.
+ * @param origin the datum time point for interval division is divided into a time interval for
+ * each TimeUnit time from this point forward and backward.
+ * @param intervals time intervals, closed interval.
+ */
+ QueryDataSet groupBy(List<Path> selectedSeries, List<String> aggres,
+ IExpression expression, long unit, long origin, List<Pair<Long, Long>> intervals,
+ QueryContext context)
+ throws ProcessorException, QueryFilterOptimizationException, FileNodeManagerException,
+ PathErrorException, IOException;
+ /**
+ * Execute fill query.
+ *
+ * @param fillPaths select path list
+ * @param queryTime timestamp
+ * @param fillType type IFill map
+ */
+ QueryDataSet fill(List<Path> fillPaths, long queryTime, Map<TSDataType, IFill> fillType,
+ QueryContext context) throws FileNodeManagerException, PathErrorException, IOException;
+
+}
diff --git a/iotdb/src/main/java/org/apache/iotdb/db/query/aggregation/AggreFuncFactory.java b/iotdb/src/main/java/org/apache/iotdb/db/query/factory/AggreFuncFactory.java
similarity index 96%
rename from iotdb/src/main/java/org/apache/iotdb/db/query/aggregation/AggreFuncFactory.java
rename to iotdb/src/main/java/org/apache/iotdb/db/query/factory/AggreFuncFactory.java
index c2d1883..495d853 100644
--- a/iotdb/src/main/java/org/apache/iotdb/db/query/aggregation/AggreFuncFactory.java
+++ b/iotdb/src/main/java/org/apache/iotdb/db/query/factory/AggreFuncFactory.java
@@ -17,9 +17,10 @@
* under the License.
*/
-package org.apache.iotdb.db.query.aggregation;
+package org.apache.iotdb.db.query.factory;
import org.apache.iotdb.db.exception.ProcessorException;
+import org.apache.iotdb.db.query.aggregation.AggregateFunction;
import org.apache.iotdb.db.query.aggregation.impl.CountAggrFunc;
import org.apache.iotdb.db.query.aggregation.impl.FirstAggrFunc;
import org.apache.iotdb.db.query.aggregation.impl.LastAggrFunc;
diff --git a/iotdb/src/main/java/org/apache/iotdb/db/query/timegenerator/EngineNodeConstructor.java b/iotdb/src/main/java/org/apache/iotdb/db/query/timegenerator/AbstractNodeConstructor.java
similarity index 70%
copy from iotdb/src/main/java/org/apache/iotdb/db/query/timegenerator/EngineNodeConstructor.java
copy to iotdb/src/main/java/org/apache/iotdb/db/query/timegenerator/AbstractNodeConstructor.java
index 4ffe62b..712868f 100644
--- a/iotdb/src/main/java/org/apache/iotdb/db/query/timegenerator/EngineNodeConstructor.java
+++ b/iotdb/src/main/java/org/apache/iotdb/db/query/timegenerator/AbstractNodeConstructor.java
@@ -16,12 +16,10 @@
* specific language governing permissions and limitations
* under the License.
*/
-
package org.apache.iotdb.db.query.timegenerator;
import static org.apache.iotdb.tsfile.read.expression.ExpressionType.AND;
import static org.apache.iotdb.tsfile.read.expression.ExpressionType.OR;
-import static org.apache.iotdb.tsfile.read.expression.ExpressionType.SERIES;
import java.io.IOException;
import org.apache.iotdb.db.engine.querycontext.QueryDataSource;
@@ -42,48 +40,44 @@ import org.apache.iotdb.tsfile.read.query.timegenerator.node.AndNode;
import org.apache.iotdb.tsfile.read.query.timegenerator.node.Node;
import org.apache.iotdb.tsfile.read.query.timegenerator.node.OrNode;
-public class EngineNodeConstructor {
-
-
- public EngineNodeConstructor() {
- }
+public abstract class AbstractNodeConstructor {
/**
- * construct expression node.
+ * Construct expression node.
*
* @param expression expression
* @return Node object
* @throws IOException IOException
* @throws FileNodeManagerException FileNodeManagerException
*/
- public Node construct(IExpression expression, QueryContext context)
+ public abstract Node construct(IExpression expression, QueryContext context)
+ throws FileNodeManagerException;
+
+ /**
+ * Construct not series type node.
+ * @param expression expression
+ * @return Node object
+ * @throws FileNodeManagerException FileNodeManagerException
+ */
+ protected Node constructNotSeriesNode(IExpression expression, QueryContext context)
throws FileNodeManagerException {
- if (expression.getType() == SERIES) {
- try {
- return new EngineLeafNode(generateSeriesReader((SingleSeriesExpression) expression,
- context));
- } catch (IOException e) {
- throw new FileNodeManagerException(e);
- }
+ Node leftChild;
+ Node rightChild;
+ if (expression.getType() == OR) {
+ leftChild = this.construct(((IBinaryExpression) expression).getLeft(), context);
+ rightChild = this.construct(((IBinaryExpression) expression).getRight(), context);
+ return new OrNode(leftChild, rightChild);
+ } else if (expression.getType() == AND) {
+ leftChild = this.construct(((IBinaryExpression) expression).getLeft(), context);
+ rightChild = this.construct(((IBinaryExpression) expression).getRight(), context);
+ return new AndNode(leftChild, rightChild);
} else {
- Node leftChild;
- Node rightChild;
- if (expression.getType() == OR) {
- leftChild = this.construct(((IBinaryExpression) expression).getLeft(), context);
- rightChild = this.construct(((IBinaryExpression) expression).getRight(), context);
- return new OrNode(leftChild, rightChild);
- } else if (expression.getType() == AND) {
- leftChild = this.construct(((IBinaryExpression) expression).getLeft(), context);
- rightChild = this.construct(((IBinaryExpression) expression).getRight(), context);
- return new AndNode(leftChild, rightChild);
- } else {
- throw new UnSupportedDataTypeException(
- "Unsupported QueryFilterType when construct OperatorNode: " + expression.getType());
- }
+ throw new UnSupportedDataTypeException(
+ "Unsupported QueryFilterType when construct OperatorNode: " + expression.getType());
}
}
- private IReader generateSeriesReader(SingleSeriesExpression singleSeriesExpression,
+ protected IReader generateSeriesReader(SingleSeriesExpression singleSeriesExpression,
QueryContext context)
throws IOException, FileNodeManagerException {
@@ -108,5 +102,4 @@ public class EngineNodeConstructor {
return new AllDataReader(tsFilesReader, unSeqMergeReader);
}
}
-
}
diff --git a/iotdb/src/main/java/org/apache/iotdb/db/query/timegenerator/EngineNodeConstructor.java b/iotdb/src/main/java/org/apache/iotdb/db/query/timegenerator/EngineNodeConstructor.java
index 4ffe62b..45e6ac6 100644
--- a/iotdb/src/main/java/org/apache/iotdb/db/query/timegenerator/EngineNodeConstructor.java
+++ b/iotdb/src/main/java/org/apache/iotdb/db/query/timegenerator/EngineNodeConstructor.java
@@ -19,43 +19,29 @@
package org.apache.iotdb.db.query.timegenerator;
-import static org.apache.iotdb.tsfile.read.expression.ExpressionType.AND;
-import static org.apache.iotdb.tsfile.read.expression.ExpressionType.OR;
import static org.apache.iotdb.tsfile.read.expression.ExpressionType.SERIES;
import java.io.IOException;
-import org.apache.iotdb.db.engine.querycontext.QueryDataSource;
import org.apache.iotdb.db.exception.FileNodeManagerException;
import org.apache.iotdb.db.query.context.QueryContext;
-import org.apache.iotdb.db.query.control.QueryResourceManager;
-import org.apache.iotdb.db.query.factory.SeriesReaderFactory;
-import org.apache.iotdb.db.query.reader.AllDataReader;
-import org.apache.iotdb.db.query.reader.IReader;
-import org.apache.iotdb.db.query.reader.merge.PriorityMergeReader;
-import org.apache.iotdb.db.query.reader.sequence.SequenceDataReader;
-import org.apache.iotdb.tsfile.exception.write.UnSupportedDataTypeException;
-import org.apache.iotdb.tsfile.read.expression.IBinaryExpression;
import org.apache.iotdb.tsfile.read.expression.IExpression;
import org.apache.iotdb.tsfile.read.expression.impl.SingleSeriesExpression;
-import org.apache.iotdb.tsfile.read.filter.basic.Filter;
-import org.apache.iotdb.tsfile.read.query.timegenerator.node.AndNode;
import org.apache.iotdb.tsfile.read.query.timegenerator.node.Node;
-import org.apache.iotdb.tsfile.read.query.timegenerator.node.OrNode;
-
-public class EngineNodeConstructor {
+public class EngineNodeConstructor extends AbstractNodeConstructor {
public EngineNodeConstructor() {
}
/**
- * construct expression node.
+ * Construct expression node.
*
* @param expression expression
* @return Node object
* @throws IOException IOException
* @throws FileNodeManagerException FileNodeManagerException
*/
+ @Override
public Node construct(IExpression expression, QueryContext context)
throws FileNodeManagerException {
if (expression.getType() == SERIES) {
@@ -66,47 +52,7 @@ public class EngineNodeConstructor {
throw new FileNodeManagerException(e);
}
} else {
- Node leftChild;
- Node rightChild;
- if (expression.getType() == OR) {
- leftChild = this.construct(((IBinaryExpression) expression).getLeft(), context);
- rightChild = this.construct(((IBinaryExpression) expression).getRight(), context);
- return new OrNode(leftChild, rightChild);
- } else if (expression.getType() == AND) {
- leftChild = this.construct(((IBinaryExpression) expression).getLeft(), context);
- rightChild = this.construct(((IBinaryExpression) expression).getRight(), context);
- return new AndNode(leftChild, rightChild);
- } else {
- throw new UnSupportedDataTypeException(
- "Unsupported QueryFilterType when construct OperatorNode: " + expression.getType());
- }
- }
- }
-
- private IReader generateSeriesReader(SingleSeriesExpression singleSeriesExpression,
- QueryContext context)
- throws IOException, FileNodeManagerException {
-
- QueryDataSource queryDataSource = QueryResourceManager.getInstance().getQueryDataSource(
- singleSeriesExpression.getSeriesPath(), context);
-
- Filter filter = singleSeriesExpression.getFilter();
-
- // reader for all sequence data
- SequenceDataReader tsFilesReader = new SequenceDataReader(queryDataSource.getSeqDataSource(),
- filter, context);
-
- // reader for all unSequence data
- PriorityMergeReader unSeqMergeReader = SeriesReaderFactory.getInstance()
- .createUnSeqMergeReader(queryDataSource.getOverflowSeriesDataSource(), filter);
-
- if (!tsFilesReader.hasNext()) {
- //only have unsequence data.
- return unSeqMergeReader;
- } else {
- //merge sequence data with unsequence data.
- return new AllDataReader(tsFilesReader, unSeqMergeReader);
+ return constructNotSeriesNode(expression, context);
}
}
-
}
diff --git a/iotdb/src/main/java/org/apache/iotdb/db/query/timegenerator/EngineTimeGenerator.java b/iotdb/src/main/java/org/apache/iotdb/db/query/timegenerator/EngineTimeGenerator.java
index 350ea6f..13d7dbc 100644
--- a/iotdb/src/main/java/org/apache/iotdb/db/query/timegenerator/EngineTimeGenerator.java
+++ b/iotdb/src/main/java/org/apache/iotdb/db/query/timegenerator/EngineTimeGenerator.java
@@ -61,7 +61,6 @@ public class EngineTimeGenerator implements TimeGenerator {
@Override
public Object getValue(Path path, long time) {
- // TODO implement the optimization
return null;
}
diff --git a/iotdb/src/main/java/org/apache/iotdb/db/service/JDBCService.java b/iotdb/src/main/java/org/apache/iotdb/db/service/JDBCService.java
index d9ccecf..1326e23 100644
--- a/iotdb/src/main/java/org/apache/iotdb/db/service/JDBCService.java
+++ b/iotdb/src/main/java/org/apache/iotdb/db/service/JDBCService.java
@@ -129,7 +129,7 @@ public class JDBCService implements JDBCServiceMBean, IService {
jdbcServiceThread.setName(ThreadName.JDBC_SERVICE.getName());
jdbcServiceThread.start();
startLatch.await();
- } catch (IOException | InterruptedException e) {
+ } catch (IOException | InterruptedException | ClassNotFoundException | IllegalAccessException | InstantiationException e) {
String errorMessage = String
.format("Failed to start %s because of %s", this.getID().getName(),
e.getMessage());
@@ -187,9 +187,11 @@ public class JDBCService implements JDBCServiceMBean, IService {
private CountDownLatch threadStartLatch;
private CountDownLatch threadStopLatch;
- public JDBCServiceThread(CountDownLatch threadStartLatch, CountDownLatch threadStopLatch) throws IOException {
+ public JDBCServiceThread(CountDownLatch threadStartLatch, CountDownLatch threadStopLatch)
+ throws IOException, ClassNotFoundException, IllegalAccessException, InstantiationException {
protocolFactory = new TBinaryProtocol.Factory();
- impl = new TSServiceImpl();
+ IoTDBConfig config = IoTDBDescriptor.getInstance().getConfig();
+ impl = (TSServiceImpl) Class.forName(config.getRpcImplClassName()).newInstance();
processor = new TSIService.Processor<>(impl);
this.threadStartLatch = threadStartLatch;
this.threadStopLatch = threadStopLatch;
@@ -202,7 +204,7 @@ public class JDBCService implements JDBCServiceMBean, IService {
serverTransport = new TServerSocket(new InetSocketAddress(config.getRpcAddress(),
config.getRpcPort()));
poolArgs = new TThreadPoolServer.Args(serverTransport);
- poolArgs.executorService = IoTDBThreadPoolFactory.createJDBCClientThreadPool(poolArgs,
+ poolArgs.executorService = IoTDBThreadPoolFactory.createThriftRpcClientThreadPool(poolArgs,
ThreadName.JDBC_CLIENT.getName());
poolArgs.processor(processor);
poolArgs.protocolFactory(protocolFactory);
diff --git a/iotdb/src/main/java/org/apache/iotdb/db/service/TSServiceImpl.java b/iotdb/src/main/java/org/apache/iotdb/db/service/TSServiceImpl.java
index f355445..b9dbf04 100644
--- a/iotdb/src/main/java/org/apache/iotdb/db/service/TSServiceImpl.java
+++ b/iotdb/src/main/java/org/apache/iotdb/db/service/TSServiceImpl.java
@@ -27,6 +27,7 @@ import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
+import java.util.regex.Pattern;
import org.apache.iotdb.db.auth.AuthException;
import org.apache.iotdb.db.auth.AuthorityChecker;
import org.apache.iotdb.db.auth.authorizer.IAuthorizer;
@@ -81,6 +82,7 @@ import org.apache.iotdb.service.rpc.thrift.TS_SessionHandle;
import org.apache.iotdb.service.rpc.thrift.TS_Status;
import org.apache.iotdb.service.rpc.thrift.TS_StatusCode;
import org.apache.iotdb.tsfile.exception.filter.QueryFilterOptimizationException;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
import org.apache.iotdb.tsfile.read.common.Path;
import org.apache.iotdb.tsfile.read.query.dataset.QueryDataSet;
import org.apache.thrift.TException;
@@ -95,21 +97,21 @@ import org.slf4j.LoggerFactory;
public class TSServiceImpl implements TSIService.Iface, ServerContext {
private static final Logger LOGGER = LoggerFactory.getLogger(TSServiceImpl.class);
- private static final String INFO_NOT_LOGIN = "{}: Not login.";
- private static final String ERROR_NOT_LOGIN = "Not login";
+ protected static final String INFO_NOT_LOGIN = "{}: Not login.";
+ protected static final String ERROR_NOT_LOGIN = "Not login";
- private QueryProcessor processor = new QueryProcessor(new OverflowQPExecutor());
+ protected QueryProcessor processor;
// Record the username for every rpc connection. Username.get() is null if
// login is failed.
- private ThreadLocal<String> username = new ThreadLocal<>();
- private ThreadLocal<HashMap<String, PhysicalPlan>> queryStatus = new ThreadLocal<>();
- private ThreadLocal<HashMap<String, QueryDataSet>> queryRet = new ThreadLocal<>();
- private ThreadLocal<ZoneId> zoneIds = new ThreadLocal<>();
+ protected ThreadLocal<String> username = new ThreadLocal<>();
+ protected ThreadLocal<HashMap<String, PhysicalPlan>> queryStatus = new ThreadLocal<>();
+ protected ThreadLocal<HashMap<String, QueryDataSet>> queryRet = new ThreadLocal<>();
+ protected ThreadLocal<ZoneId> zoneIds = new ThreadLocal<>();
private IoTDBConfig config = IoTDBDescriptor.getInstance().getConfig();
- private ThreadLocal<Map<Long, QueryContext>> contextMapLocal = new ThreadLocal<>();
+ protected ThreadLocal<Map<Long, QueryContext>> contextMapLocal = new ThreadLocal<>();
public TSServiceImpl() throws IOException {
- // do nothing because there is no need
+ processor = new QueryProcessor(new OverflowQPExecutor());
}
@Override
@@ -118,7 +120,7 @@ public class TSServiceImpl implements TSIService.Iface, ServerContext {
req.getUsername());
boolean status;
- IAuthorizer authorizer = null;
+ IAuthorizer authorizer;
try {
authorizer = LocalFileAuthorizer.getInstance();
} catch (AuthException e) {
@@ -191,29 +193,30 @@ public class TSServiceImpl implements TSIService.Iface, ServerContext {
releaseQueryResource(req);
clearAllStatusForCurrentRequest();
- } catch (FileNodeManagerException e) {
+ } catch (Exception e) {
LOGGER.error("Error in closeOperation : {}", e.getMessage());
}
return new TSCloseOperationResp(new TS_Status(TS_StatusCode.SUCCESS_STATUS));
}
- private void releaseQueryResource(TSCloseOperationReq req) throws FileNodeManagerException {
+ public void releaseQueryResource(TSCloseOperationReq req) throws Exception {
Map<Long, QueryContext> contextMap = contextMapLocal.get();
if (contextMap == null) {
return;
}
- if(req == null || req.queryId == -1) {
+ if (req == null || req.queryId == -1) {
// end query for all the query tokens created by current thread
for (QueryContext context : contextMap.values()) {
QueryResourceManager.getInstance().endQueryForGivenJob(context.getJobId());
}
+ contextMapLocal.set(new HashMap<>());
} else {
QueryResourceManager.getInstance()
.endQueryForGivenJob(contextMap.remove(req.queryId).getJobId());
}
}
- private void clearAllStatusForCurrentRequest() {
+ public void clearAllStatusForCurrentRequest() {
if (this.queryRet.get() != null) {
this.queryRet.get().clear();
}
@@ -222,7 +225,7 @@ public class TSServiceImpl implements TSIService.Iface, ServerContext {
}
}
- private TS_Status getErrorStatus(String message) {
+ public TS_Status getErrorStatus(String message) {
TS_Status status = new TS_Status(TS_StatusCode.ERROR_STATUS);
status.setErrorMessage(message);
return status;
@@ -241,10 +244,9 @@ public class TSServiceImpl implements TSIService.Iface, ServerContext {
case "SHOW_TIMESERIES":
String path = req.getColumnPath();
try {
- List<List<String>> showTimeseriesList = MManager.getInstance()
- .getShowTimeseriesPath(path);
+ List<List<String>> showTimeseriesList = getTimeSeriesForPath(path);
resp.setShowTimeseriesList(showTimeseriesList);
- } catch (PathErrorException e) {
+ } catch (PathErrorException | InterruptedException | ProcessorException e) {
status = getErrorStatus(
String.format("Failed to fetch timeseries %s's metadata because: %s",
req.getColumnPath(), e));
@@ -264,9 +266,9 @@ public class TSServiceImpl implements TSIService.Iface, ServerContext {
break;
case "SHOW_STORAGE_GROUP":
try {
- Set<String> storageGroups = MManager.getInstance().getAllStorageGroup();
+ Set<String> storageGroups = getAllStorageGroups();
resp.setShowStorageGroups(storageGroups);
- } catch (PathErrorException e) {
+ } catch (PathErrorException | InterruptedException e) {
status = getErrorStatus(
String.format("Failed to fetch storage groups' metadata because: %s", e));
resp.setStatus(status);
@@ -282,9 +284,14 @@ public class TSServiceImpl implements TSIService.Iface, ServerContext {
status = new TS_Status(TS_StatusCode.SUCCESS_STATUS);
break;
case "METADATA_IN_JSON":
- String metadataInJson = null;
+ String metadataInJson;
try {
- metadataInJson = MManager.getInstance().getMetadataInString();
+ metadataInJson = getMetadataInString();
+ } catch (PathErrorException | InterruptedException | ProcessorException e) {
+ status = getErrorStatus(
+ String.format("Failed to fetch all metadata in json because: %s", e));
+ resp.setStatus(status);
+ return resp;
} catch (OutOfMemoryError outOfMemoryError) { // TODO OOME
LOGGER.error("Failed to fetch all metadata in json", outOfMemoryError);
status = getErrorStatus(
@@ -299,14 +306,14 @@ public class TSServiceImpl implements TSIService.Iface, ServerContext {
Metadata metadata;
try {
String column = req.getColumnPath();
- metadata = MManager.getInstance().getMetadata();
+ metadata = getMetadata();
Map<String, List<String>> deviceMap = metadata.getDeviceMap();
if (deviceMap == null || !deviceMap.containsKey(column)) {
resp.setColumnsList(new ArrayList<>());
} else {
resp.setColumnsList(deviceMap.get(column));
}
- } catch (PathErrorException e) {
+ } catch (PathErrorException | InterruptedException | ProcessorException e) {
LOGGER.error("cannot get delta object map", e);
status = getErrorStatus(String.format("Failed to fetch delta object map because: %s", e));
resp.setStatus(status);
@@ -321,8 +328,8 @@ public class TSServiceImpl implements TSIService.Iface, ServerContext {
break;
case "COLUMN":
try {
- resp.setDataType(MManager.getInstance().getSeriesType(req.getColumnPath()).toString());
- } catch (PathErrorException e) {
+ resp.setDataType(getSeriesType(req.getColumnPath()).toString());
+ } catch (PathErrorException | InterruptedException | ProcessorException e) {
// TODO aggregate seriesPath e.g. last(root.ln.wf01.wt01.status)
// status = new TS_Status(TS_StatusCode.ERROR_STATUS);
// status.setErrorMessage(String.format("Failed to fetch %s's data type because: %s",
@@ -334,8 +341,8 @@ public class TSServiceImpl implements TSIService.Iface, ServerContext {
break;
case "ALL_COLUMNS":
try {
- resp.setColumnsList(MManager.getInstance().getPaths(req.getColumnPath()));
- } catch (PathErrorException e) {
+ resp.setColumnsList(getPaths(req.getColumnPath()));
+ } catch (PathErrorException | InterruptedException | ProcessorException e) {
status = getErrorStatus(String
.format("Failed to fetch %s's all columns because: %s", req.getColumnPath(), e));
resp.setStatus(status);
@@ -359,14 +366,43 @@ public class TSServiceImpl implements TSIService.Iface, ServerContext {
return resp;
}
+ protected Set<String> getAllStorageGroups() throws PathErrorException, InterruptedException {
+ return MManager.getInstance().getAllStorageGroup();
+ }
+
+ protected List<List<String>> getTimeSeriesForPath(String path)
+ throws PathErrorException, InterruptedException, ProcessorException {
+ return MManager.getInstance().getShowTimeseriesPath(path);
+ }
+
+ protected String getMetadataInString()
+ throws InterruptedException, PathErrorException, ProcessorException {
+ return MManager.getInstance().getMetadataInString();
+ }
+
+ protected Metadata getMetadata()
+ throws PathErrorException, InterruptedException, ProcessorException {
+ return MManager.getInstance().getMetadata();
+ }
+
+ protected TSDataType getSeriesType(String path)
+ throws PathErrorException, InterruptedException, ProcessorException {
+ return MManager.getInstance().getSeriesType(path);
+ }
+
+ protected List<String> getPaths(String path)
+ throws PathErrorException, InterruptedException, ProcessorException {
+ return MManager.getInstance().getPaths(path);
+ }
+
/**
- * Judge whether the statement is ADMIN COMMAND and if true, executeWithGlobalTimeFilter it.
+ * Judge whether the statement is ADMIN COMMAND and if true, execute it.
*
* @param statement command
* @return true if the statement is ADMIN COMMAND
* @throws IOException exception
*/
- private boolean execAdminCommand(String statement) throws IOException {
+ public boolean execAdminCommand(String statement) throws IOException {
if (!"root".equals(username.get())) {
return false;
}
@@ -427,7 +463,7 @@ public class TSServiceImpl implements TSIService.Iface, ServerContext {
}
} catch (Exception e) {
String errMessage = String.format(
- "Fail to generate physcial plan and executeWithGlobalTimeFilter for statement "
+ "Fail to generate physcial plan and execute for statement "
+ "%s beacuse %s",
statement, e.getMessage());
result.add(Statement.EXECUTE_FAILED);
@@ -465,6 +501,15 @@ public class TSServiceImpl implements TSIService.Iface, ServerContext {
return getTSExecuteStatementResp(TS_StatusCode.ERROR_STATUS, e.getMessage());
}
+ try {
+ if (execSetConsistencyLevel(statement)) {
+ return getTSExecuteStatementResp(TS_StatusCode.SUCCESS_STATUS,
+ "Execute set consistency level successfully");
+ }
+ } catch (Exception e) {
+ return getTSExecuteStatementResp(TS_StatusCode.ERROR_STATUS, e.getMessage());
+ }
+
PhysicalPlan physicalPlan;
try {
physicalPlan = processor.parseSQLToPhysicalPlan(statement, zoneIds.get());
@@ -488,6 +533,22 @@ public class TSServiceImpl implements TSIService.Iface, ServerContext {
}
}
+ /**
+ * Set consistency level
+ */
+ public boolean execSetConsistencyLevel(String statement) throws Exception {
+ if (statement == null) {
+ return false;
+ }
+ statement = statement.toLowerCase().trim();
+ if (Pattern.matches(IoTDBConstant.SET_READ_CONSISTENCY_LEVEL_PATTERN, statement)) {
+ throw new Exception(
+ "IoTDB Stand-alone version does not support setting read-write consistency level");
+ } else {
+ return false;
+ }
+ }
+
@Override
public TSExecuteStatementResp executeQueryStatement(TSExecuteStatementReq req) throws TException {
@@ -510,8 +571,9 @@ public class TSServiceImpl implements TSIService.Iface, ServerContext {
}
// check file level set
+
try {
- MManager.getInstance().checkFileLevel(paths);
+ checkFileLevelSet(paths);
} catch (PathErrorException e) {
LOGGER.error("meet error while checking file level.", e);
return getTSExecuteStatementResp(TS_StatusCode.ERROR_STATUS, e.getMessage());
@@ -592,6 +654,10 @@ public class TSServiceImpl implements TSIService.Iface, ServerContext {
}
}
+ public void checkFileLevelSet(List<Path> paths) throws PathErrorException {
+ MManager.getInstance().checkFileLevel(paths);
+ }
+
@Override
public TSFetchResultsResp fetchResults(TSFetchResultsReq req) throws TException {
try {
@@ -627,19 +693,16 @@ public class TSServiceImpl implements TSIService.Iface, ServerContext {
}
}
- private QueryDataSet createNewDataSet(String statement, int fetchSize, TSFetchResultsReq req)
+ public QueryDataSet createNewDataSet(String statement, int fetchSize, TSFetchResultsReq req)
throws PathErrorException, QueryFilterOptimizationException, FileNodeManagerException,
ProcessorException, IOException {
PhysicalPlan physicalPlan = queryStatus.get().get(statement);
processor.getExecutor().setFetchSize(fetchSize);
QueryContext context = new QueryContext(QueryResourceManager.getInstance().assignJobId());
- Map<Long, QueryContext> contextMap = contextMapLocal.get();
- if (contextMap == null) {
- contextMap = new HashMap<>();
- contextMapLocal.set(contextMap);
- }
- contextMap.put(req.queryId, context);
+
+ initContextMap();
+ contextMapLocal.get().put(req.queryId, context);
QueryDataSet queryDataSet = processor.getExecutor().processQuery((QueryPlan) physicalPlan,
context);
@@ -647,6 +710,14 @@ public class TSServiceImpl implements TSIService.Iface, ServerContext {
return queryDataSet;
}
+ public void initContextMap(){
+ Map<Long, QueryContext> contextMap = contextMapLocal.get();
+ if (contextMap == null) {
+ contextMap = new HashMap<>();
+ contextMapLocal.set(contextMap);
+ }
+ }
+
@Override
public TSExecuteStatementResp executeUpdateStatement(TSExecuteStatementReq req)
throws TException {
@@ -665,7 +736,7 @@ public class TSServiceImpl implements TSIService.Iface, ServerContext {
}
}
- private TSExecuteStatementResp executeUpdateStatement(PhysicalPlan plan) {
+ public TSExecuteStatementResp executeUpdateStatement(PhysicalPlan plan) {
List<Path> paths = plan.getPaths();
try {
@@ -683,7 +754,7 @@ public class TSServiceImpl implements TSIService.Iface, ServerContext {
// Do we need to add extra information of executive condition
boolean execRet;
try {
- execRet = processor.getExecutor().processNonQuery(plan);
+ execRet = executeNonQuery(plan);
} catch (ProcessorException e) {
LOGGER.debug("meet error while processing non-query. {}", e.getMessage());
return getTSExecuteStatementResp(TS_StatusCode.ERROR_STATUS, e.getMessage());
@@ -700,6 +771,10 @@ public class TSServiceImpl implements TSIService.Iface, ServerContext {
return resp;
}
+ protected boolean executeNonQuery(PhysicalPlan plan) throws ProcessorException {
+ return processor.getExecutor().processNonQuery(plan);
+ }
+
private TSExecuteStatementResp executeUpdateStatement(String statement)
throws ProcessorException {
@@ -717,43 +792,10 @@ public class TSServiceImpl implements TSIService.Iface, ServerContext {
"Statement is a query statement.");
}
- // if operation belongs to add/delete/update
- List<Path> paths = physicalPlan.getPaths();
-
- try {
- if (!checkAuthorization(paths, physicalPlan)) {
- return getTSExecuteStatementResp(TS_StatusCode.ERROR_STATUS,
- "No permissions for this operation " + physicalPlan.getOperatorType());
- }
- } catch (AuthException e) {
- LOGGER.error("meet error while checking authorization.", e);
- return getTSExecuteStatementResp(TS_StatusCode.ERROR_STATUS,
- "Uninitialized authorizer : " + e.getMessage());
- }
-
- // TODO
- // In current version, we only return OK/ERROR
- // Do we need to add extra information of executive condition
- boolean execRet;
- try {
- execRet = processor.getExecutor().processNonQuery(physicalPlan);
- } catch (ProcessorException e) {
- LOGGER.error("meet error while processing non-query.", e);
- return getTSExecuteStatementResp(TS_StatusCode.ERROR_STATUS, e.getMessage());
- }
- TS_StatusCode statusCode = execRet ? TS_StatusCode.SUCCESS_STATUS : TS_StatusCode.ERROR_STATUS;
- String msg = execRet ? "Execute successfully" : "Execute statement error.";
- TSExecuteStatementResp resp = getTSExecuteStatementResp(statusCode, msg);
- TSHandleIdentifier operationId = new TSHandleIdentifier(
- ByteBuffer.wrap(username.get().getBytes()),
- ByteBuffer.wrap(("PASS".getBytes())));
- TSOperationHandle operationHandle;
- operationHandle = new TSOperationHandle(operationId, false);
- resp.setOperationHandle(operationHandle);
- return resp;
+ return executeUpdateStatement(physicalPlan);
}
- private void recordANewQuery(String statement, PhysicalPlan physicalPlan) {
+ public void recordANewQuery(String statement, PhysicalPlan physicalPlan) {
queryStatus.get().put(statement, physicalPlan);
// refresh current queryRet for statement
if (queryRet.get().containsKey(statement)) {
@@ -766,11 +808,11 @@ public class TSServiceImpl implements TSIService.Iface, ServerContext {
*
* @return true: If logined; false: If not logined
*/
- private boolean checkLogin() {
+ public boolean checkLogin() {
return username.get() != null;
}
- private boolean checkAuthorization(List<Path> paths, PhysicalPlan plan) throws AuthException {
+ public boolean checkAuthorization(List<Path> paths, PhysicalPlan plan) throws AuthException {
String targetUser = null;
if (plan instanceof AuthorPlan) {
targetUser = ((AuthorPlan) plan).getUserName();
@@ -778,7 +820,7 @@ public class TSServiceImpl implements TSIService.Iface, ServerContext {
return AuthorityChecker.check(username.get(), paths, plan.getOperatorType(), targetUser);
}
- private TSExecuteStatementResp getTSExecuteStatementResp(TS_StatusCode code, String msg) {
+ public TSExecuteStatementResp getTSExecuteStatementResp(TS_StatusCode code, String msg) {
TSExecuteStatementResp resp = new TSExecuteStatementResp();
TS_Status tsStatus = new TS_Status(code);
tsStatus.setErrorMessage(msg);
@@ -791,7 +833,7 @@ public class TSServiceImpl implements TSIService.Iface, ServerContext {
return resp;
}
- private TSExecuteBatchStatementResp getTSBathExecuteStatementResp(TS_StatusCode code, String msg,
+ public TSExecuteBatchStatementResp getTSBathExecuteStatementResp(TS_StatusCode code, String msg,
List<Integer> result) {
TSExecuteBatchStatementResp resp = new TSExecuteBatchStatementResp();
TS_Status tsStatus = new TS_Status(code);
@@ -801,7 +843,7 @@ public class TSServiceImpl implements TSIService.Iface, ServerContext {
return resp;
}
- private TSFetchResultsResp getTSFetchResultsResp(TS_StatusCode code, String msg) {
+ public TSFetchResultsResp getTSFetchResultsResp(TS_StatusCode code, String msg) {
TSFetchResultsResp resp = new TSFetchResultsResp();
TS_Status tsStatus = new TS_Status(code);
tsStatus.setErrorMessage(msg);
@@ -810,14 +852,22 @@ public class TSServiceImpl implements TSIService.Iface, ServerContext {
}
public void handleClientExit() throws TException {
+ closeClusterService();
closeOperation(null);
closeSession(null);
}
+ /**
+ * Close cluster service
+ */
+ public void closeClusterService() {
+
+ }
+
@Override
public TSGetTimeZoneResp getTimeZone() throws TException {
- TS_Status tsStatus = null;
- TSGetTimeZoneResp resp = null;
+ TS_Status tsStatus;
+ TSGetTimeZoneResp resp;
try {
tsStatus = new TS_Status(TS_StatusCode.SUCCESS_STATUS);
resp = new TSGetTimeZoneResp(tsStatus, zoneIds.get().toString());
@@ -832,7 +882,7 @@ public class TSServiceImpl implements TSIService.Iface, ServerContext {
@Override
public TSSetTimeZoneResp setTimeZone(TSSetTimeZoneReq req) throws TException {
- TS_Status tsStatus = null;
+ TS_Status tsStatus;
try {
String timeZoneID = req.getTimeZone();
zoneIds.set(ZoneId.of(timeZoneID));
diff --git a/iotdb/src/main/java/org/apache/iotdb/db/sync/receiver/SyncServiceImpl.java b/iotdb/src/main/java/org/apache/iotdb/db/sync/receiver/SyncServiceImpl.java
index 400065e..ad2722d 100644
--- a/iotdb/src/main/java/org/apache/iotdb/db/sync/receiver/SyncServiceImpl.java
+++ b/iotdb/src/main/java/org/apache/iotdb/db/sync/receiver/SyncServiceImpl.java
@@ -477,7 +477,7 @@ public class SyncServiceImpl implements SyncService.Iface {
try {
if (!fileNodeManager.appendFileToFileNode(storageGroup, fileNode, path)) {
// it is a file with overflow data
- if (config.isUpdate_historical_data_possibility()) {
+ if (config.isUpdateHistoricalDataPossibility()) {
loadOldData(path);
} else {
List<String> overlapFiles = fileNodeManager.getOverlapFilesFromFileNode(
diff --git a/iotdb/src/main/java/org/apache/iotdb/db/sync/receiver/SyncServiceManager.java b/iotdb/src/main/java/org/apache/iotdb/db/sync/receiver/SyncServiceManager.java
index c0c1c00..47e8f10 100644
--- a/iotdb/src/main/java/org/apache/iotdb/db/sync/receiver/SyncServiceManager.java
+++ b/iotdb/src/main/java/org/apache/iotdb/db/sync/receiver/SyncServiceManager.java
@@ -20,6 +20,7 @@ package org.apache.iotdb.db.sync.receiver;
import java.net.InetSocketAddress;
import java.util.concurrent.CountDownLatch;
+import org.apache.iotdb.db.concurrent.IoTDBThreadPoolFactory;
import org.apache.iotdb.db.concurrent.ThreadName;
import org.apache.iotdb.db.conf.IoTDBConfig;
import org.apache.iotdb.db.conf.IoTDBConstant;
@@ -149,8 +150,10 @@ public class SyncServiceManager implements IService {
protocolFactory = new TBinaryProtocol.Factory();
processor = new SyncService.Processor<>(new SyncServiceImpl());
poolArgs = new TThreadPoolServer.Args(serverTransport);
- poolArgs.processor(processor);
+ poolArgs.executorService = IoTDBThreadPoolFactory.createThriftRpcClientThreadPool(poolArgs,
+ ThreadName.SYNC_CLIENT.getName());
poolArgs.protocolFactory(protocolFactory);
+ poolArgs.processor(processor);
poolServer = new TThreadPoolServer(poolArgs);
poolServer.setServerEventHandler(new SyncServiceEventHandler(threadStartLatch));
poolServer.serve();
diff --git a/iotdb/src/main/java/org/apache/iotdb/db/writelog/io/RAFLogReader.java b/iotdb/src/main/java/org/apache/iotdb/db/writelog/io/RAFLogReader.java
index 1834569..13c5737 100644
--- a/iotdb/src/main/java/org/apache/iotdb/db/writelog/io/RAFLogReader.java
+++ b/iotdb/src/main/java/org/apache/iotdb/db/writelog/io/RAFLogReader.java
@@ -25,7 +25,7 @@ import java.io.RandomAccessFile;
import java.util.NoSuchElementException;
import java.util.zip.CRC32;
import org.apache.iotdb.db.qp.physical.PhysicalPlan;
-import org.apache.iotdb.db.writelog.transfer.PhysicalPlanLogTransfer;
+import org.apache.iotdb.db.qp.physical.transfer.PhysicalPlanLogTransfer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
diff --git a/iotdb/src/main/java/org/apache/iotdb/db/writelog/node/ExclusiveWriteLogNode.java b/iotdb/src/main/java/org/apache/iotdb/db/writelog/node/ExclusiveWriteLogNode.java
index 2c1fd72..d8e99ae 100644
--- a/iotdb/src/main/java/org/apache/iotdb/db/writelog/node/ExclusiveWriteLogNode.java
+++ b/iotdb/src/main/java/org/apache/iotdb/db/writelog/node/ExclusiveWriteLogNode.java
@@ -36,7 +36,7 @@ import org.apache.iotdb.db.writelog.io.ILogWriter;
import org.apache.iotdb.db.writelog.io.LogWriter;
import org.apache.iotdb.db.writelog.recover.ExclusiveLogRecoverPerformer;
import org.apache.iotdb.db.writelog.recover.RecoverPerformer;
-import org.apache.iotdb.db.writelog.transfer.PhysicalPlanLogTransfer;
+import org.apache.iotdb.db.qp.physical.transfer.PhysicalPlanLogTransfer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
diff --git a/iotdb/src/test/java/org/apache/iotdb/db/concurrent/IoTDBThreadPoolFactoryTest.java b/iotdb/src/test/java/org/apache/iotdb/db/concurrent/IoTDBThreadPoolFactoryTest.java
index 40c8179..93b5db4 100644
--- a/iotdb/src/test/java/org/apache/iotdb/db/concurrent/IoTDBThreadPoolFactoryTest.java
+++ b/iotdb/src/test/java/org/apache/iotdb/db/concurrent/IoTDBThreadPoolFactoryTest.java
@@ -173,7 +173,7 @@ public class IoTDBThreadPoolFactoryTest {
int threadCount = 50;
latch = new CountDownLatch(threadCount);
ExecutorService exec = IoTDBThreadPoolFactory
- .createJDBCClientThreadPool(args, POOL_NAME, handler);
+ .createThriftRpcClientThreadPool(args, POOL_NAME, handler);
for (int i = 0; i < threadCount; i++) {
Runnable task = new TestThread(reason);
exec.execute(task);
diff --git a/iotdb/src/test/java/org/apache/iotdb/db/integration/IoTDBSeriesReaderIT.java b/iotdb/src/test/java/org/apache/iotdb/db/integration/IoTDBSeriesReaderIT.java
index 6b8d413..aa2ef7c 100644
--- a/iotdb/src/test/java/org/apache/iotdb/db/integration/IoTDBSeriesReaderIT.java
+++ b/iotdb/src/test/java/org/apache/iotdb/db/integration/IoTDBSeriesReaderIT.java
@@ -138,7 +138,7 @@ public class IoTDBSeriesReaderIT {
statement.execute(sql);
}
- // statement.executeWithGlobalTimeFilter("flush");
+ // statement.execute("flush");
// insert large amount of data time range : 13700 ~ 24000
for (int time = 13700; time < 24000; time++) {
diff --git a/iotdb/src/test/java/org/apache/iotdb/db/metadata/MManagerBasicTest.java b/iotdb/src/test/java/org/apache/iotdb/db/metadata/MManagerBasicTest.java
index 36dba77..4fbf8bf 100644
--- a/iotdb/src/test/java/org/apache/iotdb/db/metadata/MManagerBasicTest.java
+++ b/iotdb/src/test/java/org/apache/iotdb/db/metadata/MManagerBasicTest.java
@@ -19,6 +19,8 @@
package org.apache.iotdb.db.metadata;
import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
import java.io.IOException;
@@ -315,27 +317,28 @@ public class MManagerBasicTest {
}
}
+ @Test
public void testCheckStorageExistOfPath() {
MManager manager = MManager.getInstance();
try {
- assertEquals(false, manager.checkStorageExistOfPath("root"));
- assertEquals(false, manager.checkStorageExistOfPath("root.vehicle"));
- assertEquals(false, manager.checkStorageExistOfPath("root.vehicle.device"));
- assertEquals(false, manager.checkStorageExistOfPath("root.vehicle.device.sensor"));
+ assertTrue(manager.getAllPathGroupByFileName("root").keySet().isEmpty());
+ assertTrue(manager.getAllFileNamesByPath("root.vehicle").isEmpty());
+ assertTrue(manager.getAllFileNamesByPath("root.vehicle.device").isEmpty());
+ assertTrue(manager.getAllFileNamesByPath("root.vehicle.device.sensor").isEmpty());
manager.setStorageLevelToMTree("root.vehicle");
- assertEquals(true, manager.checkStorageExistOfPath("root.vehicle"));
- assertEquals(true, manager.checkStorageExistOfPath("root.vehicle.device"));
- assertEquals(true, manager.checkStorageExistOfPath("root.vehicle.device.sensor"));
- assertEquals(false, manager.checkStorageExistOfPath("root.vehicle1"));
- assertEquals(false, manager.checkStorageExistOfPath("root.vehicle1.device"));
+ assertFalse(manager.getAllFileNamesByPath("root.vehicle").isEmpty());
+ assertFalse(manager.getAllFileNamesByPath("root.vehicle.device").isEmpty());
+ assertFalse(manager.getAllFileNamesByPath("root.vehicle.device.sensor").isEmpty());
+ assertTrue(manager.getAllFileNamesByPath("root.vehicle1").isEmpty());
+ assertTrue(manager.getAllFileNamesByPath("root.vehicle1.device").isEmpty());
manager.setStorageLevelToMTree("root.vehicle1.device");
- assertEquals(false, manager.checkStorageExistOfPath("root.vehicle1.device1"));
- assertEquals(false, manager.checkStorageExistOfPath("root.vehicle1.device2"));
- assertEquals(false, manager.checkStorageExistOfPath("root.vehicle1.device3"));
- assertEquals(true, manager.checkStorageExistOfPath("root.vehicle1.device"));
+ assertTrue(manager.getAllFileNamesByPath("root.vehicle1.device1").isEmpty());
+ assertTrue(manager.getAllFileNamesByPath("root.vehicle1.device2").isEmpty());
+ assertTrue(manager.getAllFileNamesByPath("root.vehicle1.device3").isEmpty());
+ assertFalse(manager.getAllFileNamesByPath("root.vehicle1.device").isEmpty());
} catch (PathErrorException | IOException e) {
e.printStackTrace();
fail(e.getMessage());
diff --git a/iotdb/src/test/java/org/apache/iotdb/db/metadata/MTreeTest.java b/iotdb/src/test/java/org/apache/iotdb/db/metadata/MTreeTest.java
index 4f838e1..af1175a 100644
--- a/iotdb/src/test/java/org/apache/iotdb/db/metadata/MTreeTest.java
+++ b/iotdb/src/test/java/org/apache/iotdb/db/metadata/MTreeTest.java
@@ -19,6 +19,7 @@
package org.apache.iotdb.db.metadata;
import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
@@ -286,27 +287,28 @@ public class MTreeTest {
}
}
+ @Test
public void testCheckStorageExistOfPath() {
// set storage group first
MTree root = new MTree("root");
try {
- assertEquals(false, root.checkStorageExistOfPath("root"));
- assertEquals(false, root.checkStorageExistOfPath("root.vehicle"));
- assertEquals(false, root.checkStorageExistOfPath("root.vehicle.device"));
- assertEquals(false, root.checkStorageExistOfPath("root.vehicle.device.sensor"));
+ assertTrue(root.getAllFileNamesByPath("root").isEmpty());
+ assertTrue(root.getAllFileNamesByPath("root.vehicle").isEmpty());
+ assertTrue(root.getAllFileNamesByPath("root.vehicle.device").isEmpty());
+ assertTrue(root.getAllFileNamesByPath("root.vehicle.device.sensor").isEmpty());
root.setStorageGroup("root.vehicle");
- assertEquals(true, root.checkStorageExistOfPath("root.vehicle"));
- assertEquals(true, root.checkStorageExistOfPath("root.vehicle.device"));
- assertEquals(true, root.checkStorageExistOfPath("root.vehicle.device.sensor"));
- assertEquals(false, root.checkStorageExistOfPath("root.vehicle1"));
- assertEquals(false, root.checkStorageExistOfPath("root.vehicle1.device"));
+ assertFalse(root.getAllFileNamesByPath("root.vehicle").isEmpty());
+ assertFalse(root.getAllFileNamesByPath("root.vehicle.device").isEmpty());
+ assertFalse(root.getAllFileNamesByPath("root.vehicle.device.sensor").isEmpty());
+ assertTrue(root.getAllFileNamesByPath("root.vehicle1").isEmpty());
+ assertTrue(root.getAllFileNamesByPath("root.vehicle1.device").isEmpty());
root.setStorageGroup("root.vehicle1.device");
- assertEquals(false, root.checkStorageExistOfPath("root.vehicle1.device1"));
- assertEquals(false, root.checkStorageExistOfPath("root.vehicle1.device2"));
- assertEquals(false, root.checkStorageExistOfPath("root.vehicle1.device3"));
- assertEquals(true, root.checkStorageExistOfPath("root.vehicle1.device"));
+ assertTrue(root.getAllFileNamesByPath("root.vehicle1.device1").isEmpty());
+ assertTrue(root.getAllFileNamesByPath("root.vehicle1.device2").isEmpty());
+ assertTrue(root.getAllFileNamesByPath("root.vehicle1.device3").isEmpty());
+ assertFalse(root.getAllFileNamesByPath("root.vehicle1.device").isEmpty());
} catch (PathErrorException e) {
e.printStackTrace();
fail(e.getMessage());
diff --git a/iotdb/src/test/java/org/apache/iotdb/db/metadata/MetadataTest.java b/iotdb/src/test/java/org/apache/iotdb/db/metadata/MetadataTest.java
new file mode 100644
index 0000000..9e1adc7
--- /dev/null
+++ b/iotdb/src/test/java/org/apache/iotdb/db/metadata/MetadataTest.java
@@ -0,0 +1,93 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.db.metadata;
+
+import static org.junit.Assert.*;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import org.apache.iotdb.db.exception.PathErrorException;
+import org.apache.iotdb.db.utils.EnvironmentUtils;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSEncoding;
+import org.apache.iotdb.tsfile.write.schema.MeasurementSchema;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+public class MetadataTest {
+
+ @Before
+ public void setUp() throws Exception {
+ }
+
+ @After
+ public void tearDown() throws Exception {
+ EnvironmentUtils.cleanEnv();
+ }
+
+ @Test
+ public void testCombineMetadatas() {
+ MManager manager = MManager.getInstance();
+
+ try {
+ manager.setStorageLevelToMTree("root.t.d1");
+ manager.addPathToMTree("root.t.d1.s0", "INT32", "RLE");
+ manager.addPathToMTree("root.t.d1.s1", "DOUBLE", "RLE");
+ manager.setStorageLevelToMTree("root.t.d2");
+ manager.addPathToMTree("root.t.d2.s1", "DOUBLE", "RLE");
+ Metadata metadata1 = manager.getMetadata();
+
+ manager.clear();
+
+ manager.setStorageLevelToMTree("root.t.d3");
+ manager.addPathToMTree("root.t.d3.s1", "DOUBLE", "RLE");
+ manager.addPathToMTree("root.t.d3.s2", "TEXT", "RLE");
+ manager.setStorageLevelToMTree("root.t1.d1");
+ manager.addPathToMTree("root.t1.d1.s1", "DOUBLE", "RLE");
+ manager.addPathToMTree("root.t1.d1.s2", "TEXT", "RLE");
+ Metadata metadata2 = manager.getMetadata();
+
+ manager.clear();
+
+ manager.setStorageLevelToMTree("root.t.d1");
+ manager.addPathToMTree("root.t.d1.s0", "INT32", "RLE");
+ manager.addPathToMTree("root.t.d1.s1", "DOUBLE", "RLE");
+ manager.setStorageLevelToMTree("root.t.d2");
+ manager.addPathToMTree("root.t.d2.s1", "DOUBLE", "RLE");
+ manager.setStorageLevelToMTree("root.t.d3");
+ manager.addPathToMTree("root.t.d3.s1", "DOUBLE", "RLE");
+ manager.addPathToMTree("root.t.d3.s2", "TEXT", "RLE");
+ manager.setStorageLevelToMTree("root.t1.d1");
+ manager.addPathToMTree("root.t1.d1.s1", "DOUBLE", "RLE");
+ manager.addPathToMTree("root.t1.d1.s2", "TEXT", "RLE");
+ Metadata metadata = manager.getMetadata();
+
+ Metadata combineMetadata = Metadata.combineMetadatas(new Metadata[]{metadata1, metadata2});
+ assertTrue(metadata.equals(combineMetadata));
+ } catch (PathErrorException | IOException e) {
+ e.printStackTrace();
+ fail(e.getMessage());
+ }
+ }
+}
\ No newline at end of file
diff --git a/iotdb/src/test/java/org/apache/iotdb/db/qp/QueryProcessorTest.java b/iotdb/src/test/java/org/apache/iotdb/db/qp/QueryProcessorTest.java
index 4145936..e9daf05 100644
--- a/iotdb/src/test/java/org/apache/iotdb/db/qp/QueryProcessorTest.java
+++ b/iotdb/src/test/java/org/apache/iotdb/db/qp/QueryProcessorTest.java
@@ -20,22 +20,76 @@ package org.apache.iotdb.db.qp;
import static org.junit.Assert.assertEquals;
-import org.apache.iotdb.db.exception.ArgsErrorException;
-import org.apache.iotdb.db.exception.ProcessorException;
-import org.apache.iotdb.db.exception.qp.QueryProcessorException;
+import java.util.Collections;
+import org.apache.iotdb.db.metadata.MManager;
import org.apache.iotdb.db.qp.executor.OverflowQPExecutor;
import org.apache.iotdb.db.qp.logical.Operator.OperatorType;
import org.apache.iotdb.db.qp.physical.PhysicalPlan;
-import org.apache.iotdb.db.qp.utils.MemIntQpExecutor;
+import org.apache.iotdb.db.utils.EnvironmentUtils;
+import org.apache.iotdb.tsfile.common.conf.TSFileConfig;
+import org.apache.iotdb.tsfile.file.metadata.enums.CompressionType;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSEncoding;
+import org.junit.After;
+import org.junit.Before;
import org.junit.Test;
public class QueryProcessorTest {
- private QueryProcessor processor = new QueryProcessor(new MemIntQpExecutor());
+ private CompressionType compressionType = CompressionType.valueOf(TSFileConfig.compressor);
+ private MManager mManager = MManager.getInstance();
+ private QueryProcessor processor = new QueryProcessor(new OverflowQPExecutor());
+
+ @Before
+ public void setUp() throws Exception {
+ EnvironmentUtils.envSetUp();
+ mManager.setStorageLevelToMTree("root.vehicle");
+ mManager.setStorageLevelToMTree("root.vehicle1");
+ mManager.addPathToMTree("root.vehicle.device1.sensor1", TSDataType.valueOf("INT32"),
+ TSEncoding.valueOf("RLE"), compressionType, Collections
+ .emptyMap());
+ mManager.addPathToMTree("root.vehicle.device1.sensor2", TSDataType.valueOf("INT32"),
+ TSEncoding.valueOf("RLE"), compressionType, Collections
+ .emptyMap());
+ mManager.addPathToMTree("root.vehicle.device1.sensor3", TSDataType.valueOf("INT32"),
+ TSEncoding.valueOf("RLE"), compressionType, Collections
+ .emptyMap());
+ mManager.addPathToMTree("root.vehicle.device2.sensor1", TSDataType.valueOf("INT32"),
+ TSEncoding.valueOf("RLE"), compressionType, Collections
+ .emptyMap());
+ mManager.addPathToMTree("root.vehicle.device2.sensor2", TSDataType.valueOf("INT32"),
+ TSEncoding.valueOf("RLE"), compressionType, Collections
+ .emptyMap());
+ mManager.addPathToMTree("root.vehicle.device2.sensor3", TSDataType.valueOf("INT32"),
+ TSEncoding.valueOf("RLE"), compressionType, Collections
+ .emptyMap());
+ mManager.addPathToMTree("root.vehicle1.device1.sensor1", TSDataType.valueOf("INT32"),
+ TSEncoding.valueOf("RLE"), compressionType, Collections
+ .emptyMap());
+ mManager.addPathToMTree("root.vehicle1.device1.sensor2", TSDataType.valueOf("INT32"),
+ TSEncoding.valueOf("RLE"), compressionType, Collections
+ .emptyMap());
+ mManager.addPathToMTree("root.vehicle1.device1.sensor3", TSDataType.valueOf("INT32"),
+ TSEncoding.valueOf("RLE"), compressionType, Collections
+ .emptyMap());
+ mManager.addPathToMTree("root.vehicle1.device2.sensor1", TSDataType.valueOf("INT32"),
+ TSEncoding.valueOf("RLE"), compressionType, Collections
+ .emptyMap());
+ mManager.addPathToMTree("root.vehicle1.device2.sensor2", TSDataType.valueOf("INT32"),
+ TSEncoding.valueOf("RLE"), compressionType, Collections
+ .emptyMap());
+ mManager.addPathToMTree("root.vehicle1.device2.sensor3", TSDataType.valueOf("INT32"),
+ TSEncoding.valueOf("RLE"), compressionType, Collections
+ .emptyMap());
+ }
+
+ @After
+ public void tearDown() throws Exception {
+ EnvironmentUtils.cleanEnv();
+ }
@Test
- public void parseSQLToPhysicalPlan()
- throws ArgsErrorException, ProcessorException, QueryProcessorException {
+ public void parseSQLToPhysicalPlan() throws Exception {
String createSGStatement = "set storage group to root.vehicle";
PhysicalPlan plan1 = processor.parseSQLToPhysicalPlan(createSGStatement);
assertEquals(OperatorType.SET_STORAGE_GROUP, plan1.getOperatorType());
@@ -56,5 +110,25 @@ public class QueryProcessorTest {
PhysicalPlan plan5 = processor.parseSQLToPhysicalPlan(propertyStatement);
assertEquals(OperatorType.PROPERTY, plan5.getOperatorType());
+ String deleteStatement = "DELETE FROM root.device0.sensor0,root.device0.sensor1 WHERE time <= 5000";
+ PhysicalPlan plan6 = processor.parseSQLToPhysicalPlan(deleteStatement);
+ assertEquals(OperatorType.DELETE, plan6.getOperatorType());
+
+ String queryStatement = "select * from root.vehicle where root.vehicle.device1.sensor1 > 50";
+ PhysicalPlan plan7 = processor.parseSQLToPhysicalPlan(queryStatement);
+ assertEquals(OperatorType.QUERY, plan7.getOperatorType());
+
+ String aggregationStatement = "select sum(*) from root.vehicle where root.vehicle.device1.sensor1 > 50";
+ PhysicalPlan plan8 = processor.parseSQLToPhysicalPlan(aggregationStatement);
+ assertEquals(OperatorType.AGGREGATION, plan8.getOperatorType());
+
+ String groupbyStatement = "select sum(*) from root.vehicle where root.vehicle.device1.sensor1 > 50 group by (20ms, [100,1100])";
+ PhysicalPlan plan9 = processor.parseSQLToPhysicalPlan(groupbyStatement);
+ assertEquals(OperatorType.GROUPBY, plan9.getOperatorType());
+
+ String fillStatement = "select sensor1 from root.vehicle.device1 where time = 50 Fill(int32[linear, 5m, 5m], boolean[previous, 5m])";
+ PhysicalPlan plan10 = processor.parseSQLToPhysicalPlan(fillStatement);
+ assertEquals(OperatorType.FILL, plan10.getOperatorType());
+
}
}
\ No newline at end of file
diff --git a/iotdb/src/test/java/org/apache/iotdb/db/writelog/transfer/PhysicalPlanLogTransferTest.java b/iotdb/src/test/java/org/apache/iotdb/db/qp/physical/transfer/PhysicalPlanLogTransferTest.java
similarity index 99%
rename from iotdb/src/test/java/org/apache/iotdb/db/writelog/transfer/PhysicalPlanLogTransferTest.java
rename to iotdb/src/test/java/org/apache/iotdb/db/qp/physical/transfer/PhysicalPlanLogTransferTest.java
index 6a23f0d..c747fb6 100644
--- a/iotdb/src/test/java/org/apache/iotdb/db/writelog/transfer/PhysicalPlanLogTransferTest.java
+++ b/iotdb/src/test/java/org/apache/iotdb/db/qp/physical/transfer/PhysicalPlanLogTransferTest.java
@@ -16,7 +16,7 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.iotdb.db.writelog.transfer;
+package org.apache.iotdb.db.qp.physical.transfer;
import static org.junit.Assert.assertArrayEquals;
import static org.junit.Assert.assertEquals;
diff --git a/iotdb/src/test/java/org/apache/iotdb/db/qp/plan/QPUpdateTest.java b/iotdb/src/test/java/org/apache/iotdb/db/qp/plan/QPUpdateTest.java
index 7caaa59..7baf34c 100644
--- a/iotdb/src/test/java/org/apache/iotdb/db/qp/plan/QPUpdateTest.java
+++ b/iotdb/src/test/java/org/apache/iotdb/db/qp/plan/QPUpdateTest.java
@@ -188,7 +188,7 @@ public class QPUpdateTest {
String sqlStr = "insert into root.qp_update_test.device_1 (timestamp, sensor_1, sensor_2) values (13, 50, 40)";
PhysicalPlan plan1 = processor.parseSQLToPhysicalPlan(sqlStr);
- // executeWithGlobalTimeFilter insert
+ // execute insert
boolean upRet = processor.getExecutor().processNonQuery(plan1);
assertTrue(upRet);
diff --git a/iotdb/src/test/java/org/apache/iotdb/db/qp/utils/MemIntQpExecutor.java b/iotdb/src/test/java/org/apache/iotdb/db/qp/utils/MemIntQpExecutor.java
index de521c0..71df644 100644
--- a/iotdb/src/test/java/org/apache/iotdb/db/qp/utils/MemIntQpExecutor.java
+++ b/iotdb/src/test/java/org/apache/iotdb/db/qp/utils/MemIntQpExecutor.java
@@ -37,6 +37,7 @@ import org.apache.iotdb.db.qp.physical.crud.DeletePlan;
import org.apache.iotdb.db.qp.physical.crud.InsertPlan;
import org.apache.iotdb.db.qp.physical.crud.UpdatePlan;
import org.apache.iotdb.db.query.context.QueryContext;
+import org.apache.iotdb.db.query.executor.EngineQueryRouter;
import org.apache.iotdb.db.query.fill.IFill;
import org.apache.iotdb.tsfile.exception.filter.QueryFilterOptimizationException;
import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
@@ -161,7 +162,7 @@ public class MemIntQpExecutor extends QueryProcessExecutor {
}
@Override
- protected boolean delete(Path path, long deleteTime) {
+ public boolean delete(Path path, long deleteTime) {
if (!demoMemDataBase.containsKey(path.toString())) {
return true;
}
diff --git a/iotdb/src/test/java/org/apache/iotdb/db/query/dataset/EngineDataSetWithTimeGeneratorTest.java b/iotdb/src/test/java/org/apache/iotdb/db/query/dataset/EngineDataSetWithTimeGeneratorTest.java
index 6dec434..b53e4d8 100644
--- a/iotdb/src/test/java/org/apache/iotdb/db/query/dataset/EngineDataSetWithTimeGeneratorTest.java
+++ b/iotdb/src/test/java/org/apache/iotdb/db/query/dataset/EngineDataSetWithTimeGeneratorTest.java
@@ -24,8 +24,8 @@ import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
import org.apache.iotdb.db.qp.QueryProcessor;
-import org.apache.iotdb.db.qp.executor.OverflowQPExecutor;
import org.apache.iotdb.db.qp.executor.QueryProcessExecutor;
+import org.apache.iotdb.db.qp.executor.OverflowQPExecutor;
import org.apache.iotdb.db.qp.physical.crud.QueryPlan;
import org.apache.iotdb.db.utils.EnvironmentUtils;
import org.apache.iotdb.tsfile.read.query.dataset.QueryDataSet;
diff --git a/iotdb/src/test/java/org/apache/iotdb/db/query/executor/GroupByEngineDataSetTest.java b/iotdb/src/test/java/org/apache/iotdb/db/query/executor/GroupByEngineDataSetTest.java
index b4f808d..066f05f 100644
--- a/iotdb/src/test/java/org/apache/iotdb/db/query/executor/GroupByEngineDataSetTest.java
+++ b/iotdb/src/test/java/org/apache/iotdb/db/query/executor/GroupByEngineDataSetTest.java
@@ -20,8 +20,8 @@ package org.apache.iotdb.db.query.executor;
import java.util.ArrayList;
import java.util.List;
-import org.apache.iotdb.db.query.executor.groupby.GroupByEngineDataSet;
-import org.apache.iotdb.db.query.executor.groupby.GroupByWithValueFilterDataSetDataSet;
+import org.apache.iotdb.db.query.dataset.groupby.GroupByEngineDataSet;
+import org.apache.iotdb.db.query.dataset.groupby.GroupByWithValueFilterDataSet;
import org.apache.iotdb.tsfile.utils.Pair;
import org.junit.Assert;
import org.junit.Test;
@@ -39,7 +39,7 @@ public class GroupByEngineDataSetTest {
long[] startTimeArray = {805, 810, 830};
long[] endTimeArray = {810, 830, 850};
- GroupByEngineDataSet groupByEngine = new GroupByWithValueFilterDataSetDataSet(jobId, null, unit, startTimePoint, pairList);
+ GroupByEngineDataSet groupByEngine = new GroupByWithValueFilterDataSet(jobId, null, unit, startTimePoint, pairList);
int cnt = 0;
while (groupByEngine.hasNext()){
Pair pair = groupByEngine.nextTimePartition();
@@ -64,7 +64,7 @@ public class GroupByEngineDataSetTest {
long[] startTimeArray = {805, 810, 830, 850, 1200, 1210};
long[] endTimeArray = {810, 830, 850, 870, 1210, 1230};
- GroupByEngineDataSet groupByEngine = new GroupByWithValueFilterDataSetDataSet(jobId, null, unit, startTimePoint, pairList);
+ GroupByEngineDataSet groupByEngine = new GroupByWithValueFilterDataSet(jobId, null, unit, startTimePoint, pairList);
int cnt = 0;
while (groupByEngine.hasNext()){
Pair pair = groupByEngine.nextTimePartition();
@@ -89,7 +89,7 @@ public class GroupByEngineDataSetTest {
long[] startTimeArray = {805, 820, 850, 860, 1200, 1220};
long[] endTimeArray = {820, 840, 860, 880, 1220, 1240};
- GroupByEngineDataSet groupByEngine = new GroupByWithValueFilterDataSetDataSet(jobId, null, unit, startTimePoint, pairList);
+ GroupByEngineDataSet groupByEngine = new GroupByWithValueFilterDataSet(jobId, null, unit, startTimePoint, pairList);
int cnt = 0;
while (groupByEngine.hasNext()){
Pair pair = groupByEngine.nextTimePartition();
@@ -114,7 +114,7 @@ public class GroupByEngineDataSetTest {
long[] startTimeArray = {805, 1200};
long[] endTimeArray = {900, 1300};
- GroupByEngineDataSet groupByEngine = new GroupByWithValueFilterDataSetDataSet(jobId, null, unit, startTimePoint, pairList);
+ GroupByEngineDataSet groupByEngine = new GroupByWithValueFilterDataSet(jobId, null, unit, startTimePoint, pairList);
int cnt = 0;
while (groupByEngine.hasNext()){
Pair pair = groupByEngine.nextTimePartition();
@@ -139,7 +139,7 @@ public class GroupByEngineDataSetTest {
long[] startTimeArray = {50, 585, 590};
long[] endTimeArray = {110, 590, 670};
- GroupByEngineDataSet groupByEngine = new GroupByWithValueFilterDataSetDataSet(jobId, null, unit, startTimePoint, pairList);
+ GroupByEngineDataSet groupByEngine = new GroupByWithValueFilterDataSet(jobId, null, unit, startTimePoint, pairList);
int cnt = 0;
while (groupByEngine.hasNext()){
Pair pair = groupByEngine.nextTimePartition();
diff --git a/iotdb/src/test/java/org/apache/iotdb/db/tools/WalCheckerTest.java b/iotdb/src/test/java/org/apache/iotdb/db/tools/WalCheckerTest.java
index 485a33c..be34d32 100644
--- a/iotdb/src/test/java/org/apache/iotdb/db/tools/WalCheckerTest.java
+++ b/iotdb/src/test/java/org/apache/iotdb/db/tools/WalCheckerTest.java
@@ -34,7 +34,7 @@ import org.apache.commons.io.FileUtils;
import org.apache.iotdb.db.exception.SysCheckException;
import org.apache.iotdb.db.qp.physical.crud.InsertPlan;
import org.apache.iotdb.db.writelog.io.LogWriter;
-import org.apache.iotdb.db.writelog.transfer.PhysicalPlanLogTransfer;
+import org.apache.iotdb.db.qp.physical.transfer.PhysicalPlanLogTransfer;
import org.junit.Test;
public class WalCheckerTest {
diff --git a/iotdb/src/test/java/org/apache/iotdb/db/utils/EnvironmentUtils.java b/iotdb/src/test/java/org/apache/iotdb/db/utils/EnvironmentUtils.java
index 3f55893..31f12fd 100644
--- a/iotdb/src/test/java/org/apache/iotdb/db/utils/EnvironmentUtils.java
+++ b/iotdb/src/test/java/org/apache/iotdb/db/utils/EnvironmentUtils.java
@@ -155,7 +155,7 @@ public class EnvironmentUtils {
config.setEnableMemMonitor(false);
// disable the system monitor
config.setEnableStatMonitor(false);
- IAuthorizer authorizer = null;
+ IAuthorizer authorizer;
try {
authorizer = LocalFileAuthorizer.getInstance();
} catch (AuthException e) {
diff --git a/iotdb/src/test/java/org/apache/iotdb/db/writelog/PerformanceTest.java b/iotdb/src/test/java/org/apache/iotdb/db/writelog/PerformanceTest.java
index eb0a117..17777e7 100644
--- a/iotdb/src/test/java/org/apache/iotdb/db/writelog/PerformanceTest.java
+++ b/iotdb/src/test/java/org/apache/iotdb/db/writelog/PerformanceTest.java
@@ -27,7 +27,6 @@ import org.apache.iotdb.db.exception.FileNodeManagerException;
import org.apache.iotdb.db.exception.MetadataArgsErrorException;
import org.apache.iotdb.db.exception.PathErrorException;
import org.apache.iotdb.db.exception.RecoverException;
-import org.apache.iotdb.db.exception.WALOverSizedException;
import org.apache.iotdb.db.metadata.MManager;
import org.apache.iotdb.db.qp.physical.crud.DeletePlan;
import org.apache.iotdb.db.qp.physical.crud.InsertPlan;
@@ -35,7 +34,7 @@ import org.apache.iotdb.db.qp.physical.crud.UpdatePlan;
import org.apache.iotdb.db.utils.EnvironmentUtils;
import org.apache.iotdb.db.writelog.node.ExclusiveWriteLogNode;
import org.apache.iotdb.db.writelog.node.WriteLogNode;
-import org.apache.iotdb.db.writelog.transfer.PhysicalPlanLogTransfer;
+import org.apache.iotdb.db.qp.physical.transfer.PhysicalPlanLogTransfer;
import org.apache.iotdb.tsfile.common.conf.TSFileConfig;
import org.apache.iotdb.tsfile.common.conf.TSFileDescriptor;
import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
diff --git a/iotdb/src/test/java/org/apache/iotdb/db/writelog/WriteLogNodeTest.java b/iotdb/src/test/java/org/apache/iotdb/db/writelog/WriteLogNodeTest.java
index d125ebc..8b1c241 100644
--- a/iotdb/src/test/java/org/apache/iotdb/db/writelog/WriteLogNodeTest.java
+++ b/iotdb/src/test/java/org/apache/iotdb/db/writelog/WriteLogNodeTest.java
@@ -34,7 +34,7 @@ import org.apache.iotdb.db.qp.physical.crud.UpdatePlan;
import org.apache.iotdb.db.utils.EnvironmentUtils;
import org.apache.iotdb.db.writelog.node.ExclusiveWriteLogNode;
import org.apache.iotdb.db.writelog.node.WriteLogNode;
-import org.apache.iotdb.db.writelog.transfer.PhysicalPlanLogTransfer;
+import org.apache.iotdb.db.qp.physical.transfer.PhysicalPlanLogTransfer;
import org.apache.iotdb.tsfile.read.common.Path;
import org.junit.After;
import org.junit.Assert;
diff --git a/iotdb/src/test/java/org/apache/iotdb/db/writelog/io/LogWriterReaderTest.java b/iotdb/src/test/java/org/apache/iotdb/db/writelog/io/LogWriterReaderTest.java
index 20d15fd..dc40b84 100644
--- a/iotdb/src/test/java/org/apache/iotdb/db/writelog/io/LogWriterReaderTest.java
+++ b/iotdb/src/test/java/org/apache/iotdb/db/writelog/io/LogWriterReaderTest.java
@@ -25,12 +25,11 @@ import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
-import org.apache.iotdb.db.exception.WALOverSizedException;
import org.apache.iotdb.db.qp.physical.PhysicalPlan;
import org.apache.iotdb.db.qp.physical.crud.DeletePlan;
import org.apache.iotdb.db.qp.physical.crud.InsertPlan;
import org.apache.iotdb.db.qp.physical.crud.UpdatePlan;
-import org.apache.iotdb.db.writelog.transfer.PhysicalPlanLogTransfer;
+import org.apache.iotdb.db.qp.physical.transfer.PhysicalPlanLogTransfer;
import org.apache.iotdb.tsfile.read.common.Path;
import org.junit.Before;
import org.junit.Test;
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/BatchData.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/BatchData.java
index fac4765..aeb789e 100644
--- a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/BatchData.java
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/BatchData.java
@@ -18,6 +18,7 @@
*/
package org.apache.iotdb.tsfile.read.common;
+import java.io.Serializable;
import java.util.ArrayList;
import org.apache.iotdb.tsfile.common.conf.TSFileConfig;
import org.apache.iotdb.tsfile.exception.write.UnSupportedDataTypeException;
@@ -28,8 +29,9 @@ import org.apache.iotdb.tsfile.utils.Binary;
* <code>BatchData</code> is a self-defined data structure which is optimized for different type of
* values. This class can be viewed as a collection which is more efficient than ArrayList.
*/
-public class BatchData {
+public class BatchData implements Serializable {
+ private static final long serialVersionUID = -4620310601188394839L;
private int timeCapacity = 1;
private int valueCapacity = 1;
private int emptyTimeCapacity = 1;
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/Field.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/Field.java
index 421e296..2bb4a4d 100644
--- a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/Field.java
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/Field.java
@@ -132,4 +132,26 @@ public class Field {
public boolean isNull() {
return this.isNull;
}
+
+ public Object getObjectValue(TSDataType dataType) {
+ if (isNull) {
+ return null;
+ }
+ switch (dataType) {
+ case DOUBLE:
+ return getDoubleV();
+ case FLOAT:
+ return getFloatV();
+ case INT64:
+ return getLongV();
+ case INT32:
+ return getIntV();
+ case BOOLEAN:
+ return getBoolV();
+ case TEXT:
+ return getBinaryV();
+ default:
+ throw new UnSupportedDataTypeException("UnSupported: " + dataType);
+ }
+ }
}
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/Path.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/Path.java
index 283cbe3..54d9f73 100644
--- a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/Path.java
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/Path.java
@@ -18,6 +18,7 @@
*/
package org.apache.iotdb.tsfile.read.common;
+import java.io.Serializable;
import org.apache.iotdb.tsfile.common.constant.SystemConstant;
import org.apache.iotdb.tsfile.utils.StringContainer;
import org.slf4j.Logger;
@@ -31,8 +32,9 @@ import java.io.IOException;
*
* @author Kangrong
*/
-public class Path {
+public class Path implements Serializable {
+ private static final long serialVersionUID = 3405277066329298200L;
private String measurement = null;
private String device = null;
private String fullPath;
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/expression/ExpressionType.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/expression/ExpressionType.java
index 4bffddb..71e4c11 100644
--- a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/expression/ExpressionType.java
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/expression/ExpressionType.java
@@ -19,5 +19,5 @@
package org.apache.iotdb.tsfile.read.expression;
public enum ExpressionType {
- AND, OR, SERIES, GLOBAL_TIME
+ AND, OR, SERIES, GLOBAL_TIME, TRUE
}
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/expression/IBinaryExpression.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/expression/IBinaryExpression.java
index b97310a..1d478ed 100644
--- a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/expression/IBinaryExpression.java
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/expression/IBinaryExpression.java
@@ -21,10 +21,14 @@ package org.apache.iotdb.tsfile.read.expression;
/**
* @author Jinrui Zhang
*/
-public interface IBinaryExpression extends IExpression {
+public interface IBinaryExpression extends IExpression{
IExpression getLeft();
IExpression getRight();
+ void setLeft(IExpression leftExpression);
+
+ void setRight(IExpression rightExpression);
+
}
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/expression/IExpression.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/expression/IExpression.java
index a05b842..a039311 100644
--- a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/expression/IExpression.java
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/expression/IExpression.java
@@ -18,7 +18,9 @@
*/
package org.apache.iotdb.tsfile.read.expression;
-public interface IExpression {
+public interface IExpression{
ExpressionType getType();
+
+ IExpression clone();
}
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/expression/impl/BinaryExpression.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/expression/impl/BinaryExpression.java
index 60e68e0..4a6d4eb 100644
--- a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/expression/impl/BinaryExpression.java
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/expression/impl/BinaryExpression.java
@@ -18,11 +18,14 @@
*/
package org.apache.iotdb.tsfile.read.expression.impl;
+import java.io.Serializable;
import org.apache.iotdb.tsfile.read.expression.ExpressionType;
import org.apache.iotdb.tsfile.read.expression.IBinaryExpression;
import org.apache.iotdb.tsfile.read.expression.IExpression;
-public abstract class BinaryExpression implements IBinaryExpression {
+public abstract class BinaryExpression implements IBinaryExpression, Serializable {
+
+ private static final long serialVersionUID = -711801318534904452L;
public static AndExpression and(IExpression left, IExpression right) {
return new AndExpression(left, right);
@@ -32,6 +35,9 @@ public abstract class BinaryExpression implements IBinaryExpression {
return new OrExpression(left, right);
}
+ @Override
+ public abstract IExpression clone();
+
protected static class AndExpression extends BinaryExpression {
public IExpression left;
@@ -53,11 +59,26 @@ public abstract class BinaryExpression implements IBinaryExpression {
}
@Override
+ public void setLeft(IExpression leftExpression) {
+ this.left = leftExpression;
+ }
+
+ @Override
+ public void setRight(IExpression rightExpression) {
+ this.right = rightExpression;
+ }
+
+ @Override
public ExpressionType getType() {
return ExpressionType.AND;
}
@Override
+ public IExpression clone() {
+ return new AndExpression(left.clone(), right.clone());
+ }
+
+ @Override
public String toString() {
return "[" + left + " && " + right + "]";
}
@@ -84,11 +105,26 @@ public abstract class BinaryExpression implements IBinaryExpression {
}
@Override
+ public void setLeft(IExpression leftExpression) {
+ this.left = leftExpression;
+ }
+
+ @Override
+ public void setRight(IExpression rightExpression) {
+ this.right = rightExpression;
+ }
+
+ @Override
public ExpressionType getType() {
return ExpressionType.OR;
}
@Override
+ public IExpression clone() {
+ return new OrExpression(left.clone(), right.clone());
+ }
+
+ @Override
public String toString() {
return "[" + left + " || " + right + "]";
}
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/expression/impl/GlobalTimeExpression.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/expression/impl/GlobalTimeExpression.java
index d69a65a..685b948 100644
--- a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/expression/impl/GlobalTimeExpression.java
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/expression/impl/GlobalTimeExpression.java
@@ -18,12 +18,15 @@
*/
package org.apache.iotdb.tsfile.read.expression.impl;
+import java.io.Serializable;
import org.apache.iotdb.tsfile.read.expression.ExpressionType;
+import org.apache.iotdb.tsfile.read.expression.IExpression;
import org.apache.iotdb.tsfile.read.expression.IUnaryExpression;
import org.apache.iotdb.tsfile.read.filter.basic.Filter;
-public class GlobalTimeExpression implements IUnaryExpression {
+public class GlobalTimeExpression implements IUnaryExpression, Serializable {
+ private static final long serialVersionUID = 1146132942359113670L;
private Filter filter;
public GlobalTimeExpression(Filter filter) {
@@ -46,6 +49,11 @@ public class GlobalTimeExpression implements IUnaryExpression {
}
@Override
+ public IExpression clone() {
+ return new GlobalTimeExpression(filter.clone());
+ }
+
+ @Override
public String toString() {
return "[" + this.filter.toString() + "]";
}
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/expression/impl/SingleSeriesExpression.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/expression/impl/SingleSeriesExpression.java
index b2c2b24..2c888e1 100644
--- a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/expression/impl/SingleSeriesExpression.java
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/expression/impl/SingleSeriesExpression.java
@@ -18,13 +18,16 @@
*/
package org.apache.iotdb.tsfile.read.expression.impl;
+import java.io.Serializable;
import org.apache.iotdb.tsfile.read.common.Path;
import org.apache.iotdb.tsfile.read.expression.ExpressionType;
+import org.apache.iotdb.tsfile.read.expression.IExpression;
import org.apache.iotdb.tsfile.read.expression.IUnaryExpression;
import org.apache.iotdb.tsfile.read.filter.basic.Filter;
-public class SingleSeriesExpression implements IUnaryExpression {
+public class SingleSeriesExpression implements IUnaryExpression, Serializable {
+ private static final long serialVersionUID = 7131207370394865228L;
private Path seriesPath;
private Filter filter;
@@ -39,6 +42,11 @@ public class SingleSeriesExpression implements IUnaryExpression {
}
@Override
+ public IExpression clone() {
+ return new SingleSeriesExpression(seriesPath.clone(), filter.clone());
+ }
+
+ @Override
public Filter getFilter() {
return filter;
}
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/filter/basic/BinaryFilter.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/filter/basic/BinaryFilter.java
index aeba875..fc23983 100755
--- a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/filter/basic/BinaryFilter.java
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/filter/basic/BinaryFilter.java
@@ -47,4 +47,7 @@ public abstract class BinaryFilter implements Filter, Serializable {
public String toString() {
return "( " + left + "," + right + " )";
}
+
+ @Override
+ public abstract Filter clone();
}
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/filter/basic/Filter.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/filter/basic/Filter.java
index cabaf4f..03e2a84 100755
--- a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/filter/basic/Filter.java
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/filter/basic/Filter.java
@@ -61,4 +61,6 @@ public interface Filter {
* @param endTime end time of a partition
*/
boolean containStartEndTime(long startTime, long endTime);
+
+ Filter clone();
}
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/filter/basic/UnaryFilter.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/filter/basic/UnaryFilter.java
index ff66ef5..e2d46d4 100755
--- a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/filter/basic/UnaryFilter.java
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/filter/basic/UnaryFilter.java
@@ -49,4 +49,7 @@ public abstract class UnaryFilter<T extends Comparable<T>> implements Filter, Se
@Override
public abstract String toString();
+
+ @Override
+ public abstract Filter clone();
}
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/filter/operator/AndFilter.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/filter/operator/AndFilter.java
index fdeed48..19ccc6e 100755
--- a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/filter/operator/AndFilter.java
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/filter/operator/AndFilter.java
@@ -27,7 +27,7 @@ import org.apache.iotdb.tsfile.read.filter.basic.Filter;
*/
public class AndFilter extends BinaryFilter {
- private static final long serialVersionUID = 6705254093824897938L;
+ private static final long serialVersionUID = -8212850098906044102L;
public AndFilter(Filter left, Filter right) {
super(left, right);
@@ -59,4 +59,9 @@ public class AndFilter extends BinaryFilter {
public String toString() {
return "(" + left + " && " + right + ")";
}
+
+ @Override
+ public Filter clone() {
+ return new AndFilter(left.clone(), right.clone());
+ }
}
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/filter/operator/Eq.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/filter/operator/Eq.java
index 90ac31a..6c3b587 100755
--- a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/filter/operator/Eq.java
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/filter/operator/Eq.java
@@ -19,6 +19,7 @@
package org.apache.iotdb.tsfile.read.filter.operator;
import org.apache.iotdb.tsfile.read.filter.DigestForFilter;
+import org.apache.iotdb.tsfile.read.filter.basic.Filter;
import org.apache.iotdb.tsfile.read.filter.basic.UnaryFilter;
import org.apache.iotdb.tsfile.read.filter.factory.FilterType;
@@ -79,6 +80,11 @@ public class Eq<T extends Comparable<T>> extends UnaryFilter<T> {
}
@Override
+ public Filter clone() {
+ return new Eq(value, filterType);
+ }
+
+ @Override
public String toString() {
return getFilterType() + " == " + value;
}
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/filter/operator/Gt.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/filter/operator/Gt.java
index 21fa6e4..0b12713 100755
--- a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/filter/operator/Gt.java
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/filter/operator/Gt.java
@@ -19,6 +19,7 @@
package org.apache.iotdb.tsfile.read.filter.operator;
import org.apache.iotdb.tsfile.read.filter.DigestForFilter;
+import org.apache.iotdb.tsfile.read.filter.basic.Filter;
import org.apache.iotdb.tsfile.read.filter.basic.UnaryFilter;
import org.apache.iotdb.tsfile.read.filter.factory.FilterType;
@@ -78,6 +79,11 @@ public class Gt<T extends Comparable<T>> extends UnaryFilter<T> {
}
@Override
+ public Filter clone() {
+ return new Gt(value, filterType);
+ }
+
+ @Override
public String toString() {
return getFilterType() + " > " + value;
}
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/filter/operator/GtEq.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/filter/operator/GtEq.java
index 7c8ba53..5d1cb6d 100755
--- a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/filter/operator/GtEq.java
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/filter/operator/GtEq.java
@@ -19,6 +19,7 @@
package org.apache.iotdb.tsfile.read.filter.operator;
import org.apache.iotdb.tsfile.read.filter.DigestForFilter;
+import org.apache.iotdb.tsfile.read.filter.basic.Filter;
import org.apache.iotdb.tsfile.read.filter.basic.UnaryFilter;
import org.apache.iotdb.tsfile.read.filter.factory.FilterType;
@@ -78,6 +79,11 @@ public class GtEq<T extends Comparable<T>> extends UnaryFilter<T> {
}
@Override
+ public Filter clone() {
+ return new GtEq(value, filterType);
+ }
+
+ @Override
public String toString() {
return getFilterType() + " >= " + value;
}
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/filter/operator/Lt.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/filter/operator/Lt.java
index eb0cdf7..3385241 100755
--- a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/filter/operator/Lt.java
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/filter/operator/Lt.java
@@ -19,6 +19,7 @@
package org.apache.iotdb.tsfile.read.filter.operator;
import org.apache.iotdb.tsfile.read.filter.DigestForFilter;
+import org.apache.iotdb.tsfile.read.filter.basic.Filter;
import org.apache.iotdb.tsfile.read.filter.basic.UnaryFilter;
import org.apache.iotdb.tsfile.read.filter.factory.FilterType;
@@ -78,6 +79,11 @@ public class Lt<T extends Comparable<T>> extends UnaryFilter<T> {
}
@Override
+ public Filter clone() {
+ return new Lt(value, filterType);
+ }
+
+ @Override
public String toString() {
return getFilterType() + " < " + value;
}
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/filter/operator/LtEq.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/filter/operator/LtEq.java
index 7c6800f..b1cef19 100755
--- a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/filter/operator/LtEq.java
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/filter/operator/LtEq.java
@@ -19,6 +19,7 @@
package org.apache.iotdb.tsfile.read.filter.operator;
import org.apache.iotdb.tsfile.read.filter.DigestForFilter;
+import org.apache.iotdb.tsfile.read.filter.basic.Filter;
import org.apache.iotdb.tsfile.read.filter.basic.UnaryFilter;
import org.apache.iotdb.tsfile.read.filter.factory.FilterType;
@@ -78,6 +79,11 @@ public class LtEq<T extends Comparable<T>> extends UnaryFilter<T> {
}
@Override
+ public Filter clone() {
+ return new LtEq(value, filterType);
+ }
+
+ @Override
public String toString() {
return getFilterType() + " <= " + value;
}
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/filter/operator/NotEq.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/filter/operator/NotEq.java
index ad1c42f..0b92639 100755
--- a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/filter/operator/NotEq.java
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/filter/operator/NotEq.java
@@ -19,6 +19,7 @@
package org.apache.iotdb.tsfile.read.filter.operator;
import org.apache.iotdb.tsfile.read.filter.DigestForFilter;
+import org.apache.iotdb.tsfile.read.filter.basic.Filter;
import org.apache.iotdb.tsfile.read.filter.basic.UnaryFilter;
import org.apache.iotdb.tsfile.read.filter.factory.FilterType;
@@ -79,6 +80,11 @@ public class NotEq<T extends Comparable<T>> extends UnaryFilter<T> {
}
@Override
+ public Filter clone() {
+ return new NotEq(value, filterType);
+ }
+
+ @Override
public String toString() {
return getFilterType() + " != " + value;
}
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/filter/operator/NotFilter.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/filter/operator/NotFilter.java
index 473b3b0..9ebbbeb 100755
--- a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/filter/operator/NotFilter.java
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/filter/operator/NotFilter.java
@@ -58,6 +58,11 @@ public class NotFilter implements Filter, Serializable {
return !that.satisfyStartEndTime(startTime, endTime);
}
+ @Override
+ public Filter clone() {
+ return new NotFilter(that.clone());
+ }
+
public Filter getFilter() {
return this.that;
}
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/filter/operator/OrFilter.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/filter/operator/OrFilter.java
index 856cc45..dbe538d 100755
--- a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/filter/operator/OrFilter.java
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/filter/operator/OrFilter.java
@@ -40,6 +40,11 @@ public class OrFilter extends BinaryFilter implements Serializable {
}
@Override
+ public Filter clone() {
+ return new OrFilter(left.clone(), right.clone());
+ }
+
+ @Override
public boolean satisfy(DigestForFilter digest) {
return left.satisfy(digest) || right.satisfy(digest);
}
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/query/dataset/DataSetWithTimeGenerator.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/query/dataset/DataSetWithTimeGenerator.java
index 557cedc..8659c40 100644
--- a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/query/dataset/DataSetWithTimeGenerator.java
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/query/dataset/DataSetWithTimeGenerator.java
@@ -84,36 +84,4 @@ public class DataSetWithTimeGenerator extends QueryDataSet {
return rowRecord;
}
-
- private Field getField(Object value, TSDataType dataType) {
- Field field = new Field(dataType);
-
- if (value == null) {
- field.setNull();
- return field;
- }
- switch (dataType) {
- case DOUBLE:
- field.setDoubleV((double) value);
- break;
- case FLOAT:
- field.setFloatV((float) value);
- break;
- case INT64:
- field.setLongV((long) value);
- break;
- case INT32:
- field.setIntV((int) value);
- break;
- case BOOLEAN:
- field.setBoolV((boolean) value);
- break;
- case TEXT:
- field.setBinaryV((Binary) value);
- break;
- default:
- throw new UnSupportedDataTypeException("UnSupported" + String.valueOf(dataType));
- }
- return field;
- }
}
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/query/dataset/QueryDataSet.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/query/dataset/QueryDataSet.java
index c46da70..7f59031 100644
--- a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/query/dataset/QueryDataSet.java
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/query/dataset/QueryDataSet.java
@@ -20,9 +20,12 @@ package org.apache.iotdb.tsfile.read.query.dataset;
import java.io.IOException;
import java.util.List;
+import org.apache.iotdb.tsfile.exception.write.UnSupportedDataTypeException;
import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+import org.apache.iotdb.tsfile.read.common.Field;
import org.apache.iotdb.tsfile.read.common.Path;
import org.apache.iotdb.tsfile.read.common.RowRecord;
+import org.apache.iotdb.tsfile.utils.Binary;
public abstract class QueryDataSet {
@@ -59,4 +62,37 @@ public abstract class QueryDataSet {
public void setDataTypes(List<TSDataType> dataTypes) {
this.dataTypes = dataTypes;
}
+
+ protected Field getField(Object value, TSDataType dataType) {
+ Field field = new Field(dataType);
+
+ if (value == null) {
+ field.setNull();
+ return field;
+ }
+
+ switch (dataType) {
+ case DOUBLE:
+ field.setDoubleV((double) value);
+ break;
+ case FLOAT:
+ field.setFloatV((float) value);
+ break;
+ case INT64:
+ field.setLongV((long) value);
+ break;
+ case INT32:
+ field.setIntV((int) value);
+ break;
+ case BOOLEAN:
+ field.setBoolV((boolean) value);
+ break;
+ case TEXT:
+ field.setBinaryV((Binary) value);
+ break;
+ default:
+ throw new UnSupportedDataTypeException("UnSupported: " + dataType);
+ }
+ return field;
+ }
}
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/utils/Pair.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/utils/Pair.java
index ef88272..60995db 100644
--- a/tsfile/src/main/java/org/apache/iotdb/tsfile/utils/Pair.java
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/utils/Pair.java
@@ -18,6 +18,8 @@
*/
package org.apache.iotdb.tsfile.utils;
+import java.io.Serializable;
+
/**
* Pair is a template class to represent a couple of values. It also override the Object basic
* methods like hasnCode, equals and toString.
@@ -26,8 +28,9 @@ package org.apache.iotdb.tsfile.utils;
* @param <R> R type
* @author kangrong
*/
-public class Pair<L, R> {
+public class Pair<L, R> implements Serializable {
+ private static final long serialVersionUID = -1398609631703707002L;
public L left;
public R right;