You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ja...@apache.org on 2022/04/14 08:20:12 UTC

[iotdb] branch ty-mpp created (now 01a35ea6d4)

This is an automated email from the ASF dual-hosted git repository.

jackietien pushed a change to branch ty-mpp
in repository https://gitbox.apache.org/repos/asf/iotdb.git


      at 01a35ea6d4 add DataNodeTSIServiceImpl

This branch includes the following new commits:

     new 01a35ea6d4 add DataNodeTSIServiceImpl

The 1 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.



[iotdb] 01/01: add DataNodeTSIServiceImpl

Posted by ja...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

jackietien pushed a commit to branch ty-mpp
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit 01a35ea6d42ad114f13314ef57fba85174d4e820
Author: JackieTien97 <ja...@gmail.com>
AuthorDate: Thu Apr 14 16:19:52 2022 +0800

    add DataNodeTSIServiceImpl
---
 .../org/apache/iotdb/db/qp/sql/SqlLexer.tokens     | 245 +++++++
 .../iotdb/commons/service/ThriftService.java       |   8 +-
 .../resources/conf/iotdb-engine.properties         |   3 +
 .../org/apache/iotdb/db/conf/IoTDBDescriptor.java  |   3 +
 .../iotdb/db/mpp/common/MPPQueryContext.java       |  10 +-
 .../iotdb/db/mpp/common/header/DatasetHeader.java  |  13 +-
 .../iotdb/db/mpp/execution/ConfigExecution.java    |  44 +-
 .../apache/iotdb/db/mpp/execution/Coordinator.java |  18 +-
 .../iotdb/db/mpp/execution/IQueryExecution.java    |  13 +
 .../iotdb/db/mpp/execution/QueryExecution.java     |  28 +-
 .../apache/iotdb/db/mpp/sql/analyze/Analyzer.java  | 334 +++++-----
 .../org/apache/iotdb/db/service/RPCService.java    |   6 +-
 .../thrift/impl/DataNodeTSIServiceImpl.java        | 724 +++++++++++++++++++++
 .../db/service/thrift/impl/TSServiceImpl.java      | 364 +----------
 .../apache/iotdb/db/utils/QueryDataSetUtils.java   |   4 +-
 15 files changed, 1261 insertions(+), 556 deletions(-)

diff --git a/antlr/src/main/antlr4/org/apache/iotdb/db/qp/sql/SqlLexer.tokens b/antlr/src/main/antlr4/org/apache/iotdb/db/qp/sql/SqlLexer.tokens
new file mode 100644
index 0000000000..30a7c1b297
--- /dev/null
+++ b/antlr/src/main/antlr4/org/apache/iotdb/db/qp/sql/SqlLexer.tokens
@@ -0,0 +1,245 @@
+WS=1
+ADD=2
+AFTER=3
+ALIAS=4
+ALIGN=5
+ALIGNED=6
+ALL=7
+ALTER=8
+ANY=9
+APPEND=10
+AS=11
+ASC=12
+ATTRIBUTES=13
+AUTOREGISTER=14
+BEFORE=15
+BEGIN=16
+BOUNDARY=17
+BY=18
+CACHE=19
+CHILD=20
+CLEAR=21
+COMPRESSION=22
+COMPRESSOR=23
+CONCAT=24
+CONFIGURATION=25
+CONTINUOUS=26
+COUNT=27
+CONTAIN=28
+CQ=29
+CQS=30
+CREATE=31
+DATATYPE=32
+DEBUG=33
+DELETE=34
+DESC=35
+DESCRIBE=36
+DEVICE=37
+DEVICES=38
+DISABLE=39
+DROP=40
+ENCODING=41
+END=42
+EVERY=43
+EXPLAIN=44
+FILL=45
+FLUSH=46
+FOR=47
+FROM=48
+FULL=49
+FUNCTION=50
+FUNCTIONS=51
+GLOBAL=52
+GRANT=53
+GROUP=54
+INDEX=55
+INFO=56
+INSERT=57
+INTO=58
+KILL=59
+LABEL=60
+LAST=61
+LATEST=62
+LEVEL=63
+LIKE=64
+LIMIT=65
+LINEAR=66
+LINK=67
+LIST=68
+LOAD=69
+LOCK=70
+MERGE=71
+METADATA=72
+NODES=73
+NOW=74
+OF=75
+OFF=76
+OFFSET=77
+ON=78
+ORDER=79
+PARTITION=80
+PASSWORD=81
+PATHS=82
+PREVIOUS=83
+PREVIOUSUNTILLAST=84
+PRIVILEGES=85
+PROCESSLIST=86
+PROPERTY=87
+PRUNE=88
+QUERIES=89
+QUERY=90
+READONLY=91
+REGEXP=92
+REMOVE=93
+RENAME=94
+RESAMPLE=95
+RESOURCE=96
+REVOKE=97
+ROLE=98
+ROOT=99
+SCHEMA=100
+SELECT=101
+SET=102
+SETTLE=103
+SGLEVEL=104
+SHOW=105
+SLIMIT=106
+SOFFSET=107
+STORAGE=108
+START=109
+STOP=110
+SYSTEM=111
+TAGS=112
+TASK=113
+TEMPLATE=114
+TEMPLATES=115
+TIME=116
+TIMESERIES=117
+TIMESTAMP=118
+TO=119
+TOLERANCE=120
+TOP=121
+TRACING=122
+TRIGGER=123
+TRIGGERS=124
+TTL=125
+UNLINK=126
+UNLOAD=127
+UNSET=128
+UPDATE=129
+UPSERT=130
+USER=131
+USING=132
+VALUES=133
+VERIFY=134
+VERSION=135
+WATERMARK_EMBEDDING=136
+WHERE=137
+WITH=138
+WITHOUT=139
+WRITABLE=140
+DATATYPE_VALUE=141
+BOOLEAN=142
+DOUBLE=143
+FLOAT=144
+INT32=145
+INT64=146
+TEXT=147
+ENCODING_VALUE=148
+DICTIONARY=149
+DIFF=150
+GORILLA=151
+PLAIN=152
+REGULAR=153
+RLE=154
+TS_2DIFF=155
+ZIGZAG=156
+FREQ=157
+COMPRESSOR_VALUE=158
+GZIP=159
+LZ4=160
+SNAPPY=161
+UNCOMPRESSED=162
+PRIVILEGE_VALUE=163
+SET_STORAGE_GROUP=164
+DELETE_STORAGE_GROUP=165
+CREATE_TIMESERIES=166
+INSERT_TIMESERIES=167
+READ_TIMESERIES=168
+DELETE_TIMESERIES=169
+CREATE_USER=170
+DELETE_USER=171
+MODIFY_PASSWORD=172
+LIST_USER=173
+GRANT_USER_PRIVILEGE=174
+REVOKE_USER_PRIVILEGE=175
+GRANT_USER_ROLE=176
+REVOKE_USER_ROLE=177
+CREATE_ROLE=178
+DELETE_ROLE=179
+LIST_ROLE=180
+GRANT_ROLE_PRIVILEGE=181
+REVOKE_ROLE_PRIVILEGE=182
+CREATE_FUNCTION=183
+DROP_FUNCTION=184
+CREATE_TRIGGER=185
+DROP_TRIGGER=186
+START_TRIGGER=187
+STOP_TRIGGER=188
+CREATE_CONTINUOUS_QUERY=189
+DROP_CONTINUOUS_QUERY=190
+MINUS=191
+PLUS=192
+DIV=193
+MOD=194
+OPERATOR_DEQ=195
+OPERATOR_SEQ=196
+OPERATOR_GT=197
+OPERATOR_GTE=198
+OPERATOR_LT=199
+OPERATOR_LTE=200
+OPERATOR_NEQ=201
+OPERATOR_IN=202
+OPERATOR_AND=203
+OPERATOR_OR=204
+OPERATOR_NOT=205
+OPERATOR_CONTAINS=206
+DOT=207
+COMMA=208
+SEMI=209
+STAR=210
+DOUBLE_STAR=211
+LR_BRACKET=212
+RR_BRACKET=213
+LS_BRACKET=214
+RS_BRACKET=215
+STRING_LITERAL=216
+DURATION_LITERAL=217
+DATETIME_LITERAL=218
+INTEGER_LITERAL=219
+EXPONENT_NUM_PART=220
+BOOLEAN_LITERAL=221
+NULL_LITERAL=222
+NAN_LITERAL=223
+ID=224
+QUTOED_ID_IN_NODE_NAME=225
+QUTOED_ID=226
+'-'=191
+'+'=192
+'/'=193
+'%'=194
+'=='=195
+'='=196
+'>'=197
+'>='=198
+'<'=199
+'<='=200
+'.'=207
+','=208
+';'=209
+'*'=210
+'**'=211
+'('=212
+')'=213
+'['=214
+']'=215
diff --git a/node-commons/src/main/java/org/apache/iotdb/commons/service/ThriftService.java b/node-commons/src/main/java/org/apache/iotdb/commons/service/ThriftService.java
index e792d88c81..30d75a94eb 100644
--- a/node-commons/src/main/java/org/apache/iotdb/commons/service/ThriftService.java
+++ b/node-commons/src/main/java/org/apache/iotdb/commons/service/ThriftService.java
@@ -26,6 +26,7 @@ import org.apache.thrift.TProcessor;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.lang.reflect.InvocationTargetException;
 import java.util.concurrent.CountDownLatch;
 
 public abstract class ThriftService implements IService {
@@ -88,7 +89,8 @@ public abstract class ThriftService implements IService {
   }
 
   public abstract void initTProcessor()
-      throws ClassNotFoundException, IllegalAccessException, InstantiationException;
+      throws ClassNotFoundException, IllegalAccessException, InstantiationException,
+          NoSuchMethodException, InvocationTargetException;
 
   public abstract void initThriftServiceThread()
       throws IllegalAccessException, InstantiationException, ClassNotFoundException;
@@ -125,7 +127,9 @@ public abstract class ThriftService implements IService {
     } catch (InterruptedException
         | ClassNotFoundException
         | IllegalAccessException
-        | InstantiationException e) {
+        | InstantiationException
+        | NoSuchMethodException
+        | InvocationTargetException e) {
       Thread.currentThread().interrupt();
       throw new StartupException(this.getID().getName(), e.getMessage());
     }
diff --git a/server/src/assembly/resources/conf/iotdb-engine.properties b/server/src/assembly/resources/conf/iotdb-engine.properties
index 8a6673dd85..c3b4f4fe2e 100644
--- a/server/src/assembly/resources/conf/iotdb-engine.properties
+++ b/server/src/assembly/resources/conf/iotdb-engine.properties
@@ -27,6 +27,9 @@ rpc_address=0.0.0.0
 # Datatype: int
 rpc_port=6667
 
+# Client RPC Service Impl Class Name: TSServiceImpl or DataNodeTSIServiceImpl
+# rpc_class_name=TSServiceImpl
+
 # Datatype: int
 mpp_port=7777
 
diff --git a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
index 5ed958b579..1eeab2bfd4 100644
--- a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
+++ b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
@@ -167,6 +167,9 @@ public class IoTDBDescriptor {
           Integer.parseInt(
               properties.getProperty("rpc_port", Integer.toString(conf.getRpcPort()))));
 
+      conf.setRpcImplClassName(
+          properties.getProperty("rpc_class_name", conf.getRpcImplClassName()).trim());
+
       conf.setMppPort(
           Integer.parseInt(
               properties.getProperty("mpp_port", Integer.toString(conf.getRpcPort()))));
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/common/MPPQueryContext.java b/server/src/main/java/org/apache/iotdb/db/mpp/common/MPPQueryContext.java
index def89dba39..b449bdf9e7 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/common/MPPQueryContext.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/common/MPPQueryContext.java
@@ -29,7 +29,7 @@ public class MPPQueryContext {
   private String sql;
   private QueryId queryId;
   private SessionInfo session;
-  private QueryType queryType;
+  private QueryType queryType = QueryType.READ;
 
   private Endpoint hostEndpoint;
   private ResultNodeContext resultNodeContext;
@@ -38,12 +38,10 @@ public class MPPQueryContext {
     this.queryId = queryId;
   }
 
-  public MPPQueryContext(
-      String sql, QueryId queryId, SessionInfo session, QueryType type, Endpoint hostEndpoint) {
+  public MPPQueryContext(String sql, QueryId queryId, SessionInfo session, Endpoint hostEndpoint) {
     this.sql = sql;
     this.queryId = queryId;
     this.session = session;
-    this.queryType = type;
     this.hostEndpoint = hostEndpoint;
     this.resultNodeContext = new ResultNodeContext(queryId);
   }
@@ -56,6 +54,10 @@ public class MPPQueryContext {
     return queryType;
   }
 
+  public void setQueryType(QueryType queryType) {
+    this.queryType = queryType;
+  }
+
   public Endpoint getHostEndpoint() {
     return hostEndpoint;
   }
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/common/header/DatasetHeader.java b/server/src/main/java/org/apache/iotdb/db/mpp/common/header/DatasetHeader.java
index 32a236feae..ee3e52d218 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/common/header/DatasetHeader.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/common/header/DatasetHeader.java
@@ -19,8 +19,6 @@
 
 package org.apache.iotdb.db.mpp.common.header;
 
-import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
-
 import com.google.common.primitives.Bytes;
 
 import java.util.*;
@@ -51,10 +49,6 @@ public class DatasetHeader {
     return isIgnoreTimestamp;
   }
 
-  public Map<String, Integer> getColumnToTsBlockIndexMap() {
-    return columnToTsBlockIndexMap;
-  }
-
   public void setColumnToTsBlockIndexMap(List<String> outputColumnNames) {
     this.columnToTsBlockIndexMap = new HashMap<>();
     for (int i = 0; i < outputColumnNames.size(); i++) {
@@ -66,8 +60,11 @@ public class DatasetHeader {
     return columnHeaders.stream().map(ColumnHeader::getColumnName).collect(Collectors.toList());
   }
 
-  public List<TSDataType> getRespDataTypeList() {
-    return columnHeaders.stream().map(ColumnHeader::getColumnType).collect(Collectors.toList());
+  public List<String> getRespDataTypeList() {
+    return columnHeaders.stream()
+        .map(ColumnHeader::getColumnType)
+        .map(Objects::toString)
+        .collect(Collectors.toList());
   }
 
   public List<Byte> getRespAliasColumns() {
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/ConfigExecution.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/ConfigExecution.java
index 3d8b5dcdd5..c9cb515e71 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/ConfigExecution.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/ConfigExecution.java
@@ -20,15 +20,19 @@
 package org.apache.iotdb.db.mpp.execution;
 
 import org.apache.iotdb.db.mpp.common.MPPQueryContext;
+import org.apache.iotdb.db.mpp.common.header.DatasetHeader;
+import org.apache.iotdb.db.mpp.sql.analyze.QueryType;
 import org.apache.iotdb.db.mpp.sql.statement.Statement;
 import org.apache.iotdb.rpc.RpcUtils;
 import org.apache.iotdb.rpc.TSStatusCode;
 import org.apache.iotdb.tsfile.exception.NotImplementedException;
+import org.apache.iotdb.tsfile.read.common.block.TsBlock;
 
 import com.google.common.util.concurrent.FutureCallback;
 import com.google.common.util.concurrent.Futures;
 import com.google.common.util.concurrent.ListenableFuture;
 import jersey.repackaged.com.google.common.util.concurrent.SettableFuture;
+import org.jetbrains.annotations.NotNull;
 
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.ExecutorService;
@@ -37,12 +41,12 @@ import static com.google.common.base.Throwables.throwIfInstanceOf;
 
 public class ConfigExecution implements IQueryExecution {
 
-  private MPPQueryContext context;
-  private Statement statement;
-  private ExecutorService executor;
+  private final MPPQueryContext context;
+  private final Statement statement;
+  private final ExecutorService executor;
 
-  private QueryStateMachine stateMachine;
-  private SettableFuture<Boolean> result;
+  private final QueryStateMachine stateMachine;
+  private final SettableFuture<Boolean> result;
 
   public ConfigExecution(MPPQueryContext context, Statement statement, ExecutorService executor) {
     this.context = context;
@@ -67,7 +71,7 @@ public class ConfigExecution implements IQueryExecution {
             }
 
             @Override
-            public void onFailure(Throwable throwable) {
+            public void onFailure(@NotNull Throwable throwable) {
               fail(throwable);
             }
           },
@@ -101,6 +105,34 @@ public class ConfigExecution implements IQueryExecution {
     }
   }
 
+  @Override
+  public TsBlock getBatchResult() {
+    // TODO
+    return null;
+  }
+
+  @Override
+  public boolean hasNextResult() {
+    return false;
+  }
+
+  @Override
+  public int getOutputValueColumnCount() {
+    // TODO
+    return 0;
+  }
+
+  @Override
+  public DatasetHeader getDatasetHeader() {
+    // TODO
+    return null;
+  }
+
+  @Override
+  public boolean isQuery() {
+    return context.getQueryType() == QueryType.READ;
+  }
+
   // TODO: consider a more suitable implementation for it
   // Generate the corresponding IConfigTask by statement.
   // Each type of statement will has a ConfigTask
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/Coordinator.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/Coordinator.java
index 4749394259..784b2b5999 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/Coordinator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/Coordinator.java
@@ -24,9 +24,7 @@ import org.apache.iotdb.db.conf.IoTDBDescriptor;
 import org.apache.iotdb.db.mpp.common.MPPQueryContext;
 import org.apache.iotdb.db.mpp.common.QueryId;
 import org.apache.iotdb.db.mpp.common.SessionInfo;
-import org.apache.iotdb.db.mpp.sql.analyze.QueryType;
 import org.apache.iotdb.db.mpp.sql.statement.Statement;
-import org.apache.iotdb.tsfile.read.common.block.TsBlock;
 
 import org.apache.commons.lang3.Validate;
 
@@ -50,12 +48,12 @@ public class Coordinator {
           IoTDBDescriptor.getInstance().getConfig().getRpcAddress(),
           IoTDBDescriptor.getInstance().getConfig().getMppPort());
 
-  private ExecutorService executor;
-  private ScheduledExecutorService scheduledExecutor;
+  private final ExecutorService executor;
+  private final ScheduledExecutorService scheduledExecutor;
 
   private static final Coordinator INSTANCE = new Coordinator();
 
-  private ConcurrentHashMap<QueryId, QueryExecution> queryExecutionMap;
+  private final ConcurrentHashMap<QueryId, IQueryExecution> queryExecutionMap;
 
   private Coordinator() {
     this.queryExecutionMap = new ConcurrentHashMap<>();
@@ -68,11 +66,11 @@ public class Coordinator {
   }
 
   public ExecutionResult execute(
-      Statement statement, QueryId queryId, QueryType queryType, SessionInfo session, String sql) {
+      Statement statement, QueryId queryId, SessionInfo session, String sql) {
 
     QueryExecution execution =
         createQueryExecution(
-            statement, new MPPQueryContext(sql, queryId, session, queryType, getHostEndpoint()));
+            statement, new MPPQueryContext(sql, queryId, session, getHostEndpoint()));
     queryExecutionMap.put(queryId, execution);
 
     execution.start();
@@ -80,10 +78,10 @@ public class Coordinator {
     return execution.getStatus();
   }
 
-  public TsBlock getResultSet(QueryId queryId) {
-    QueryExecution execution = queryExecutionMap.get(queryId);
+  public IQueryExecution getQueryExecution(QueryId queryId) {
+    IQueryExecution execution = queryExecutionMap.get(queryId);
     Validate.notNull(execution, "invalid queryId %s", queryId.getId());
-    return execution.getBatchResult();
+    return execution;
   }
 
   // TODO: (xingtanzjr) need to redo once we have a concrete policy for the threadPool management
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/IQueryExecution.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/IQueryExecution.java
index 3f9fa05ea8..2e0bd9c76d 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/IQueryExecution.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/IQueryExecution.java
@@ -19,6 +19,9 @@
 
 package org.apache.iotdb.db.mpp.execution;
 
+import org.apache.iotdb.db.mpp.common.header.DatasetHeader;
+import org.apache.iotdb.tsfile.read.common.block.TsBlock;
+
 public interface IQueryExecution {
 
   void start();
@@ -26,4 +29,14 @@ public interface IQueryExecution {
   void stop();
 
   ExecutionResult getStatus();
+
+  TsBlock getBatchResult();
+
+  boolean hasNextResult();
+
+  int getOutputValueColumnCount();
+
+  DatasetHeader getDatasetHeader();
+
+  boolean isQuery();
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/QueryExecution.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/QueryExecution.java
index a5dbeaf21f..885e822ea3 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/QueryExecution.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/QueryExecution.java
@@ -21,10 +21,12 @@ package org.apache.iotdb.db.mpp.execution;
 import org.apache.iotdb.db.mpp.buffer.DataBlockService;
 import org.apache.iotdb.db.mpp.buffer.ISourceHandle;
 import org.apache.iotdb.db.mpp.common.MPPQueryContext;
+import org.apache.iotdb.db.mpp.common.header.DatasetHeader;
 import org.apache.iotdb.db.mpp.execution.scheduler.ClusterScheduler;
 import org.apache.iotdb.db.mpp.execution.scheduler.IScheduler;
 import org.apache.iotdb.db.mpp.sql.analyze.Analysis;
 import org.apache.iotdb.db.mpp.sql.analyze.Analyzer;
+import org.apache.iotdb.db.mpp.sql.analyze.QueryType;
 import org.apache.iotdb.db.mpp.sql.optimization.PlanOptimizer;
 import org.apache.iotdb.db.mpp.sql.planner.DistributionPlanner;
 import org.apache.iotdb.db.mpp.sql.planner.LogicalPlanner;
@@ -55,18 +57,18 @@ import static com.google.common.base.Throwables.throwIfUnchecked;
  * corresponding physical nodes. 3. Collect and monitor the progress/states of this query.
  */
 public class QueryExecution implements IQueryExecution {
-  private MPPQueryContext context;
+  private final MPPQueryContext context;
   private IScheduler scheduler;
-  private QueryStateMachine stateMachine;
+  private final QueryStateMachine stateMachine;
 
-  private List<PlanOptimizer> planOptimizers;
+  private final List<PlanOptimizer> planOptimizers;
 
   private final Analysis analysis;
   private LogicalQueryPlan logicalPlan;
   private DistributedQueryPlan distributedPlan;
 
-  private ExecutorService executor;
-  private ScheduledExecutorService scheduledExecutor;
+  private final ExecutorService executor;
+  private final ScheduledExecutorService scheduledExecutor;
 
   // The result of QueryExecution will be written to the DataBlockManager in current Node.
   // We use this SourceHandle to fetch the TsBlock from it.
@@ -153,6 +155,7 @@ public class QueryExecution implements IQueryExecution {
    * DataStreamManager use the virtual ResultOperator's ID (This part will be designed and
    * implemented with DataStreamManager)
    */
+  @Override
   public TsBlock getBatchResult() {
     try {
       initResultHandle();
@@ -171,6 +174,7 @@ public class QueryExecution implements IQueryExecution {
   }
 
   /** @return true if there is more tsblocks, otherwise false */
+  @Override
   public boolean hasNextResult() {
     try {
       initResultHandle();
@@ -182,9 +186,14 @@ public class QueryExecution implements IQueryExecution {
   }
 
   /** return the result column count without the time column */
+  @Override
   public int getOutputValueColumnCount() {
-    // TODO need return the actual size while there exists output columns in Analysis
-    return 1;
+    return analysis.getRespDatasetHeader().getColumnHeaders().size();
+  }
+
+  @Override
+  public DatasetHeader getDatasetHeader() {
+    return analysis.getRespDatasetHeader();
   }
 
   /**
@@ -242,4 +251,9 @@ public class QueryExecution implements IQueryExecution {
   public LogicalQueryPlan getLogicalPlan() {
     return logicalPlan;
   }
+
+  @Override
+  public boolean isQuery() {
+    return context.getQueryType() == QueryType.READ;
+  }
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/sql/analyze/Analyzer.java b/server/src/main/java/org/apache/iotdb/db/mpp/sql/analyze/Analyzer.java
index 5eca107403..29090cf0bc 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/sql/analyze/Analyzer.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/sql/analyze/Analyzer.java
@@ -173,6 +173,7 @@ public class Analyzer {
     @Override
     public Analysis visitInsert(InsertStatement insertStatement, MPPQueryContext context) {
       // TODO: do analyze for insert statement
+      context.setQueryType(QueryType.WRITE);
       Analysis analysis = new Analysis();
       analysis.setStatement(insertStatement);
       return analysis;
@@ -234,6 +235,7 @@ public class Analyzer {
     @Override
     public Analysis visitInsertTablet(
         InsertTabletStatement insertTabletStatement, MPPQueryContext context) {
+      context.setQueryType(QueryType.WRITE);
       DataPartitionQueryParam dataPartitionQueryParam = new DataPartitionQueryParam();
       dataPartitionQueryParam.setTimePartitionSlotList(
           insertTabletStatement.getTimePartitionSlots());
@@ -264,172 +266,9 @@ public class Analyzer {
       return analysis;
     }
 
-    @Override
-    public Analysis visitShowTimeSeries(
-        ShowTimeSeriesStatement showTimeSeriesStatement, MPPQueryContext context) {
-      SchemaPartition schemaPartitionInfo =
-          partitionFetcher.fetchSchemaPartitionInfo(
-              showTimeSeriesStatement.getPathPattern().getDevice());
-      Analysis analysis = new Analysis();
-      analysis.setStatement(showTimeSeriesStatement);
-      analysis.setSchemaPartitionInfo(schemaPartitionInfo);
-      return analysis;
-    }
-
-    @Override
-    public Analysis visitShowDevices(
-        ShowDevicesStatement showDevicesStatement, MPPQueryContext context) {
-      SchemaPartition schemaPartitionInfo =
-          partitionFetcher.fetchSchemaPartitionInfo(
-              showDevicesStatement.getPathPattern().getFullPath());
-      Analysis analysis = new Analysis();
-      analysis.setStatement(showDevicesStatement);
-      analysis.setSchemaPartitionInfo(schemaPartitionInfo);
-      return analysis;
-    }
-
-    @Override
-    public Analysis visitCreateUser(AuthorStatement authorStatement, MPPQueryContext context) {
-      Analysis analysis = new Analysis();
-      analysis.setStatement(authorStatement);
-      return analysis;
-    }
-
-    @Override
-    public Analysis visitCreateRole(AuthorStatement authorStatement, MPPQueryContext context) {
-      Analysis analysis = new Analysis();
-      analysis.setStatement(authorStatement);
-      return analysis;
-    }
-
-    @Override
-    public Analysis visitAlterUser(AuthorStatement authorStatement, MPPQueryContext context) {
-      Analysis analysis = new Analysis();
-      analysis.setStatement(authorStatement);
-      return analysis;
-    }
-
-    @Override
-    public Analysis visitGrantUser(AuthorStatement authorStatement, MPPQueryContext context) {
-      Analysis analysis = new Analysis();
-      analysis.setStatement(authorStatement);
-      return analysis;
-    }
-
-    @Override
-    public Analysis visitGrantRole(AuthorStatement authorStatement, MPPQueryContext context) {
-      Analysis analysis = new Analysis();
-      analysis.setStatement(authorStatement);
-      return analysis;
-    }
-
-    @Override
-    public Analysis visitGrantRoleToUser(AuthorStatement authorStatement, MPPQueryContext context) {
-      Analysis analysis = new Analysis();
-      analysis.setStatement(authorStatement);
-      return analysis;
-    }
-
-    @Override
-    public Analysis visitRevokeUser(AuthorStatement authorStatement, MPPQueryContext context) {
-      Analysis analysis = new Analysis();
-      analysis.setStatement(authorStatement);
-      return analysis;
-    }
-
-    @Override
-    public Analysis visitRevokeRole(AuthorStatement authorStatement, MPPQueryContext context) {
-      Analysis analysis = new Analysis();
-      analysis.setStatement(authorStatement);
-      return analysis;
-    }
-
-    @Override
-    public Analysis visitRevokeRoleFromUser(
-        AuthorStatement authorStatement, MPPQueryContext context) {
-      Analysis analysis = new Analysis();
-      analysis.setStatement(authorStatement);
-      return analysis;
-    }
-
-    @Override
-    public Analysis visitDropUser(AuthorStatement authorStatement, MPPQueryContext context) {
-      Analysis analysis = new Analysis();
-      analysis.setStatement(authorStatement);
-      return analysis;
-    }
-
-    @Override
-    public Analysis visitDropRole(AuthorStatement authorStatement, MPPQueryContext context) {
-      Analysis analysis = new Analysis();
-      analysis.setStatement(authorStatement);
-      return analysis;
-    }
-
-    @Override
-    public Analysis visitListUser(AuthorStatement authorStatement, MPPQueryContext context) {
-      Analysis analysis = new Analysis();
-      analysis.setStatement(authorStatement);
-      return analysis;
-    }
-
-    @Override
-    public Analysis visitListRole(AuthorStatement authorStatement, MPPQueryContext context) {
-      Analysis analysis = new Analysis();
-      analysis.setStatement(authorStatement);
-      return analysis;
-    }
-
-    @Override
-    public Analysis visitListPrivilegesUser(
-        AuthorStatement authorStatement, MPPQueryContext context) {
-      Analysis analysis = new Analysis();
-      analysis.setStatement(authorStatement);
-      return analysis;
-    }
-
-    @Override
-    public Analysis visitListPrivilegesRole(
-        AuthorStatement authorStatement, MPPQueryContext context) {
-      Analysis analysis = new Analysis();
-      analysis.setStatement(authorStatement);
-      return analysis;
-    }
-
-    @Override
-    public Analysis visitListUserPrivileges(
-        AuthorStatement authorStatement, MPPQueryContext context) {
-      Analysis analysis = new Analysis();
-      analysis.setStatement(authorStatement);
-      return analysis;
-    }
-
-    @Override
-    public Analysis visitListRolePrivileges(
-        AuthorStatement authorStatement, MPPQueryContext context) {
-      Analysis analysis = new Analysis();
-      analysis.setStatement(authorStatement);
-      return analysis;
-    }
-
-    @Override
-    public Analysis visitListAllRoleOfUser(
-        AuthorStatement authorStatement, MPPQueryContext context) {
-      Analysis analysis = new Analysis();
-      analysis.setStatement(authorStatement);
-      return analysis;
-    }
-
-    @Override
-    public Analysis visitListAllUserOfRole(
-        AuthorStatement authorStatement, MPPQueryContext context) {
-      Analysis analysis = new Analysis();
-      analysis.setStatement(authorStatement);
-      return analysis;
-    }
-
     @Override
     public Analysis visitInsertRow(InsertRowStatement insertRowStatement, MPPQueryContext context) {
+      context.setQueryType(QueryType.WRITE);
       DataPartitionQueryParam dataPartitionQueryParam = new DataPartitionQueryParam();
       dataPartitionQueryParam.setDevicePath(insertRowStatement.getDevicePath().getFullPath());
       dataPartitionQueryParam.setTimePartitionSlotList(insertRowStatement.getTimePartitionSlots());
@@ -470,6 +309,7 @@ public class Analyzer {
     public Analysis visitInsertRows(
         InsertRowsStatement insertRowsStatement, MPPQueryContext context) {
       // TODO remove duplicate
+      context.setQueryType(QueryType.WRITE);
       List<DataPartitionQueryParam> dataPartitionQueryParams = new ArrayList<>();
       for (InsertRowStatement insertRowStatement :
           insertRowsStatement.getInsertRowStatementList()) {
@@ -523,6 +363,7 @@ public class Analyzer {
     @Override
     public Analysis visitInsertMultiTablets(
         InsertMultiTabletsStatement insertMultiTabletsStatement, MPPQueryContext context) {
+      context.setQueryType(QueryType.WRITE);
       // TODO remove duplicate
       List<DataPartitionQueryParam> dataPartitionQueryParams = new ArrayList<>();
       for (InsertTabletStatement insertTabletStatement :
@@ -570,6 +411,7 @@ public class Analyzer {
     @Override
     public Analysis visitInsertRowsOfOneDevice(
         InsertRowsOfOneDeviceStatement insertRowsOfOneDeviceStatement, MPPQueryContext context) {
+      context.setQueryType(QueryType.WRITE);
       DataPartitionQueryParam dataPartitionQueryParam = new DataPartitionQueryParam();
       dataPartitionQueryParam.setDevicePath(
           insertRowsOfOneDeviceStatement.getDevicePath().getFullPath());
@@ -615,5 +457,169 @@ public class Analyzer {
 
       return analysis;
     }
+
+    @Override
+    public Analysis visitShowTimeSeries(
+        ShowTimeSeriesStatement showTimeSeriesStatement, MPPQueryContext context) {
+      SchemaPartition schemaPartitionInfo =
+          partitionFetcher.fetchSchemaPartitionInfo(
+              showTimeSeriesStatement.getPathPattern().getDevice());
+      Analysis analysis = new Analysis();
+      analysis.setStatement(showTimeSeriesStatement);
+      analysis.setSchemaPartitionInfo(schemaPartitionInfo);
+      return analysis;
+    }
+
+    @Override
+    public Analysis visitShowDevices(
+        ShowDevicesStatement showDevicesStatement, MPPQueryContext context) {
+      SchemaPartition schemaPartitionInfo =
+          partitionFetcher.fetchSchemaPartitionInfo(
+              showDevicesStatement.getPathPattern().getFullPath());
+      Analysis analysis = new Analysis();
+      analysis.setStatement(showDevicesStatement);
+      analysis.setSchemaPartitionInfo(schemaPartitionInfo);
+      return analysis;
+    }
+
+    @Override
+    public Analysis visitCreateUser(AuthorStatement authorStatement, MPPQueryContext context) {
+      Analysis analysis = new Analysis();
+      analysis.setStatement(authorStatement);
+      return analysis;
+    }
+
+    @Override
+    public Analysis visitCreateRole(AuthorStatement authorStatement, MPPQueryContext context) {
+      Analysis analysis = new Analysis();
+      analysis.setStatement(authorStatement);
+      return analysis;
+    }
+
+    @Override
+    public Analysis visitAlterUser(AuthorStatement authorStatement, MPPQueryContext context) {
+      Analysis analysis = new Analysis();
+      analysis.setStatement(authorStatement);
+      return analysis;
+    }
+
+    @Override
+    public Analysis visitGrantUser(AuthorStatement authorStatement, MPPQueryContext context) {
+      Analysis analysis = new Analysis();
+      analysis.setStatement(authorStatement);
+      return analysis;
+    }
+
+    @Override
+    public Analysis visitGrantRole(AuthorStatement authorStatement, MPPQueryContext context) {
+      Analysis analysis = new Analysis();
+      analysis.setStatement(authorStatement);
+      return analysis;
+    }
+
+    @Override
+    public Analysis visitGrantRoleToUser(AuthorStatement authorStatement, MPPQueryContext context) {
+      Analysis analysis = new Analysis();
+      analysis.setStatement(authorStatement);
+      return analysis;
+    }
+
+    @Override
+    public Analysis visitRevokeUser(AuthorStatement authorStatement, MPPQueryContext context) {
+      Analysis analysis = new Analysis();
+      analysis.setStatement(authorStatement);
+      return analysis;
+    }
+
+    @Override
+    public Analysis visitRevokeRole(AuthorStatement authorStatement, MPPQueryContext context) {
+      Analysis analysis = new Analysis();
+      analysis.setStatement(authorStatement);
+      return analysis;
+    }
+
+    @Override
+    public Analysis visitRevokeRoleFromUser(
+        AuthorStatement authorStatement, MPPQueryContext context) {
+      Analysis analysis = new Analysis();
+      analysis.setStatement(authorStatement);
+      return analysis;
+    }
+
+    @Override
+    public Analysis visitDropUser(AuthorStatement authorStatement, MPPQueryContext context) {
+      Analysis analysis = new Analysis();
+      analysis.setStatement(authorStatement);
+      return analysis;
+    }
+
+    @Override
+    public Analysis visitDropRole(AuthorStatement authorStatement, MPPQueryContext context) {
+      Analysis analysis = new Analysis();
+      analysis.setStatement(authorStatement);
+      return analysis;
+    }
+
+    @Override
+    public Analysis visitListUser(AuthorStatement authorStatement, MPPQueryContext context) {
+      Analysis analysis = new Analysis();
+      analysis.setStatement(authorStatement);
+      return analysis;
+    }
+
+    @Override
+    public Analysis visitListRole(AuthorStatement authorStatement, MPPQueryContext context) {
+      Analysis analysis = new Analysis();
+      analysis.setStatement(authorStatement);
+      return analysis;
+    }
+
+    @Override
+    public Analysis visitListPrivilegesUser(
+        AuthorStatement authorStatement, MPPQueryContext context) {
+      Analysis analysis = new Analysis();
+      analysis.setStatement(authorStatement);
+      return analysis;
+    }
+
+    @Override
+    public Analysis visitListPrivilegesRole(
+        AuthorStatement authorStatement, MPPQueryContext context) {
+      Analysis analysis = new Analysis();
+      analysis.setStatement(authorStatement);
+      return analysis;
+    }
+
+    @Override
+    public Analysis visitListUserPrivileges(
+        AuthorStatement authorStatement, MPPQueryContext context) {
+      Analysis analysis = new Analysis();
+      analysis.setStatement(authorStatement);
+      return analysis;
+    }
+
+    @Override
+    public Analysis visitListRolePrivileges(
+        AuthorStatement authorStatement, MPPQueryContext context) {
+      Analysis analysis = new Analysis();
+      analysis.setStatement(authorStatement);
+      return analysis;
+    }
+
+    @Override
+    public Analysis visitListAllRoleOfUser(
+        AuthorStatement authorStatement, MPPQueryContext context) {
+      Analysis analysis = new Analysis();
+      analysis.setStatement(authorStatement);
+      return analysis;
+    }
+
+    @Override
+    public Analysis visitListAllUserOfRole(
+        AuthorStatement authorStatement, MPPQueryContext context) {
+      Analysis analysis = new Analysis();
+      analysis.setStatement(authorStatement);
+      return analysis;
+    }
   }
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/service/RPCService.java b/server/src/main/java/org/apache/iotdb/db/service/RPCService.java
index dc0ee3987c..a376eaece0 100644
--- a/server/src/main/java/org/apache/iotdb/db/service/RPCService.java
+++ b/server/src/main/java/org/apache/iotdb/db/service/RPCService.java
@@ -31,6 +31,8 @@ import org.apache.iotdb.db.service.thrift.impl.TSServiceImpl;
 import org.apache.iotdb.metrics.config.MetricConfigDescriptor;
 import org.apache.iotdb.service.rpc.thrift.TSIService.Processor;
 
+import java.lang.reflect.InvocationTargetException;
+
 /** A service to handle jdbc request from client. */
 public class RPCService extends ThriftService implements RPCServiceMBean {
 
@@ -47,10 +49,12 @@ public class RPCService extends ThriftService implements RPCServiceMBean {
 
   @Override
   public void initTProcessor()
-      throws ClassNotFoundException, IllegalAccessException, InstantiationException {
+      throws ClassNotFoundException, IllegalAccessException, InstantiationException,
+          NoSuchMethodException, InvocationTargetException {
     impl =
         (TSServiceImpl)
             Class.forName(IoTDBDescriptor.getInstance().getConfig().getRpcImplClassName())
+                .getDeclaredConstructor()
                 .newInstance();
     initSyncedServiceImpl(null);
     if (MetricConfigDescriptor.getInstance().getMetricConfig().getEnableMetric()) {
diff --git a/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/DataNodeTSIServiceImpl.java b/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/DataNodeTSIServiceImpl.java
new file mode 100644
index 0000000000..af372e4319
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/DataNodeTSIServiceImpl.java
@@ -0,0 +1,724 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.db.service.thrift.impl;
+
+import org.apache.iotdb.common.rpc.thrift.TSStatus;
+import org.apache.iotdb.commons.conf.IoTDBConstant;
+import org.apache.iotdb.db.conf.IoTDBDescriptor;
+import org.apache.iotdb.db.conf.OperationType;
+import org.apache.iotdb.db.mpp.common.QueryId;
+import org.apache.iotdb.db.mpp.common.header.DatasetHeader;
+import org.apache.iotdb.db.mpp.execution.Coordinator;
+import org.apache.iotdb.db.mpp.execution.ExecutionResult;
+import org.apache.iotdb.db.mpp.execution.IQueryExecution;
+import org.apache.iotdb.db.mpp.sql.parser.StatementGenerator;
+import org.apache.iotdb.db.mpp.sql.statement.Statement;
+import org.apache.iotdb.db.mpp.sql.statement.crud.InsertMultiTabletsStatement;
+import org.apache.iotdb.db.mpp.sql.statement.crud.InsertRowStatement;
+import org.apache.iotdb.db.mpp.sql.statement.crud.InsertRowsOfOneDeviceStatement;
+import org.apache.iotdb.db.mpp.sql.statement.crud.InsertRowsStatement;
+import org.apache.iotdb.db.mpp.sql.statement.crud.InsertTabletStatement;
+import org.apache.iotdb.db.query.control.SessionManager;
+import org.apache.iotdb.db.service.basic.BasicOpenSessionResp;
+import org.apache.iotdb.db.service.metrics.MetricsService;
+import org.apache.iotdb.db.service.metrics.Operation;
+import org.apache.iotdb.db.utils.QueryDataSetUtils;
+import org.apache.iotdb.metrics.utils.MetricLevel;
+import org.apache.iotdb.rpc.RpcUtils;
+import org.apache.iotdb.rpc.TSStatusCode;
+import org.apache.iotdb.service.rpc.thrift.*;
+
+import org.apache.thrift.TException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.time.ZoneId;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+
+import static org.apache.iotdb.db.service.basic.ServiceProvider.AUDIT_LOGGER;
+import static org.apache.iotdb.db.service.basic.ServiceProvider.CONFIG;
+import static org.apache.iotdb.db.service.basic.ServiceProvider.CURRENT_RPC_VERSION;
+import static org.apache.iotdb.db.service.basic.ServiceProvider.QUERY_FREQUENCY_RECORDER;
+import static org.apache.iotdb.db.service.basic.ServiceProvider.QUERY_TIME_MANAGER;
+import static org.apache.iotdb.db.service.basic.ServiceProvider.SLOW_SQL_LOGGER;
+import static org.apache.iotdb.db.utils.ErrorHandlingUtils.onNPEOrUnexpectedException;
+import static org.apache.iotdb.db.utils.ErrorHandlingUtils.onQueryException;
+
+public class DataNodeTSIServiceImpl implements TSIService.Iface {
+
+  private static final Logger LOGGER = LoggerFactory.getLogger(DataNodeTSIServiceImpl.class);
+
+  private static final Coordinator COORDINATOR = Coordinator.getInstance();
+
+  private static final SessionManager SESSION_MANAGER = SessionManager.getInstance();
+
+  @Override
+  public TSOpenSessionResp openSession(TSOpenSessionReq req) throws TException {
+    IoTDBConstant.ClientVersion clientVersion = parseClientVersion(req);
+    BasicOpenSessionResp openSessionResp =
+        SESSION_MANAGER.openSession(
+            req.username, req.password, req.zoneId, req.client_protocol, clientVersion);
+    TSStatus tsStatus = RpcUtils.getStatus(openSessionResp.getCode(), openSessionResp.getMessage());
+    TSOpenSessionResp resp = new TSOpenSessionResp(tsStatus, CURRENT_RPC_VERSION);
+    return resp.setSessionId(openSessionResp.getSessionId());
+  }
+
+  private IoTDBConstant.ClientVersion parseClientVersion(TSOpenSessionReq req) {
+    Map<String, String> configuration = req.configuration;
+    if (configuration != null && configuration.containsKey("version")) {
+      return IoTDBConstant.ClientVersion.valueOf(configuration.get("version"));
+    }
+    return IoTDBConstant.ClientVersion.V_0_12;
+  }
+
+  @Override
+  public TSStatus closeSession(TSCloseSessionReq req) {
+    return new TSStatus(
+        !SESSION_MANAGER.closeSession(req.sessionId)
+            ? RpcUtils.getStatus(TSStatusCode.NOT_LOGIN_ERROR)
+            : RpcUtils.getStatus(TSStatusCode.SUCCESS_STATUS));
+  }
+
+  @Override
+  public TSStatus cancelOperation(TSCancelOperationReq req) {
+    // TODO implement
+    return RpcUtils.getStatus(TSStatusCode.QUERY_NOT_ALLOWED, "Cancellation is not implemented");
+  }
+
+  @Override
+  public TSStatus closeOperation(TSCloseOperationReq req) {
+    return SESSION_MANAGER.closeOperation(
+        req.sessionId, req.queryId, req.statementId, req.isSetStatementId(), req.isSetQueryId());
+  }
+
+  @Override
+  public TSGetTimeZoneResp getTimeZone(long sessionId) {
+    try {
+      ZoneId zoneId = SESSION_MANAGER.getZoneId(sessionId);
+      return new TSGetTimeZoneResp(
+          RpcUtils.getStatus(TSStatusCode.SUCCESS_STATUS),
+          zoneId != null ? zoneId.toString() : "Unknown time zone");
+    } catch (Exception e) {
+      return new TSGetTimeZoneResp(
+          onNPEOrUnexpectedException(
+              e, OperationType.GET_TIME_ZONE, TSStatusCode.GENERATE_TIME_ZONE_ERROR),
+          "Unknown time zone");
+    }
+  }
+
+  @Override
+  public TSStatus setTimeZone(TSSetTimeZoneReq req) {
+    try {
+      SESSION_MANAGER.setTimezone(req.sessionId, req.timeZone);
+      return RpcUtils.getStatus(TSStatusCode.SUCCESS_STATUS);
+    } catch (Exception e) {
+      return onNPEOrUnexpectedException(
+          e, OperationType.SET_TIME_ZONE, TSStatusCode.SET_TIME_ZONE_ERROR);
+    }
+  }
+
+  @Override
+  public ServerProperties getProperties() {
+    ServerProperties properties = new ServerProperties();
+    properties.setVersion(IoTDBConstant.VERSION);
+    LOGGER.info("IoTDB server version: {}", IoTDBConstant.VERSION);
+    properties.setSupportedTimeAggregationOperations(new ArrayList<>());
+    properties.getSupportedTimeAggregationOperations().add(IoTDBConstant.MAX_TIME);
+    properties.getSupportedTimeAggregationOperations().add(IoTDBConstant.MIN_TIME);
+    properties.setTimestampPrecision(
+        IoTDBDescriptor.getInstance().getConfig().getTimestampPrecision());
+    properties.setMaxConcurrentClientNum(
+        IoTDBDescriptor.getInstance().getConfig().getRpcMaxConcurrentClientNum());
+    properties.setWatermarkSecretKey(
+        IoTDBDescriptor.getInstance().getConfig().getWatermarkSecretKey());
+    properties.setWatermarkBitString(
+        IoTDBDescriptor.getInstance().getConfig().getWatermarkBitString());
+    properties.setWatermarkParamMarkRate(
+        IoTDBDescriptor.getInstance().getConfig().getWatermarkParamMarkRate());
+    properties.setWatermarkParamMaxRightBit(
+        IoTDBDescriptor.getInstance().getConfig().getWatermarkParamMaxRightBit());
+    properties.setIsReadOnly(IoTDBDescriptor.getInstance().getConfig().isReadOnly());
+    properties.setThriftMaxFrameSize(
+        IoTDBDescriptor.getInstance().getConfig().getThriftMaxFrameSize());
+    return properties;
+  }
+
+  @Override
+  public TSStatus setStorageGroup(long sessionId, String storageGroup) {
+    throw new UnsupportedOperationException();
+  }
+
+  @Override
+  public TSStatus createTimeseries(TSCreateTimeseriesReq req) {
+    throw new UnsupportedOperationException();
+  }
+
+  @Override
+  public TSStatus createAlignedTimeseries(TSCreateAlignedTimeseriesReq req) {
+    throw new UnsupportedOperationException();
+  }
+
+  @Override
+  public TSStatus createMultiTimeseries(TSCreateMultiTimeseriesReq req) {
+    throw new UnsupportedOperationException();
+  }
+
+  @Override
+  public TSStatus deleteTimeseries(long sessionId, List<String> path) {
+    throw new UnsupportedOperationException();
+  }
+
+  @Override
+  public TSStatus deleteStorageGroups(long sessionId, List<String> storageGroup) {
+    throw new UnsupportedOperationException();
+  }
+
+  @Override
+  public TSFetchMetadataResp fetchMetadata(TSFetchMetadataReq req) {
+    throw new UnsupportedOperationException();
+  }
+
+  @Override
+  public TSExecuteStatementResp executeStatement(TSExecuteStatementReq req) {
+    String statement = req.getStatement();
+    if (!SESSION_MANAGER.checkLogin(req.getSessionId())) {
+      return RpcUtils.getTSExecuteStatementResp(getNotLoggedInStatus());
+    }
+
+    long startTime = System.currentTimeMillis();
+    Statement s =
+        StatementGenerator.createStatement(
+            statement, SESSION_MANAGER.getZoneId(req.getSessionId()));
+
+    QUERY_FREQUENCY_RECORDER.incrementAndGet();
+    AUDIT_LOGGER.debug("Session {} execute Query: {}", req.sessionId, statement);
+
+    try {
+      long queryId = SESSION_MANAGER.requestQueryId(req.statementId, true);
+      QueryId id = new QueryId(String.valueOf(queryId));
+      // create and cache dataset
+      ExecutionResult result =
+          COORDINATOR.execute(s, id, SESSION_MANAGER.getSessionInfo(req.sessionId), statement);
+
+      if (result.status.code != TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
+        throw new RuntimeException("");
+      }
+
+      IQueryExecution queryExecution = COORDINATOR.getQueryExecution(id);
+
+      TSExecuteStatementResp resp;
+      if (queryExecution.isQuery()) {
+        resp = createResponse(queryExecution.getDatasetHeader(), queryId);
+        resp.setStatus(result.status);
+        resp.setQueryDataSet(
+            QueryDataSetUtils.convertTsBlockByFetchSize(queryExecution, req.fetchSize));
+      } else {
+        resp = RpcUtils.getTSExecuteStatementResp(result.status);
+      }
+
+      return resp;
+    } catch (Exception e) {
+      // TODO call the coordinator to release query resource
+      return RpcUtils.getTSExecuteStatementResp(
+          onQueryException(e, "\"" + statement + "\". " + OperationType.EXECUTE_STATEMENT));
+    } finally {
+      addOperationLatency(Operation.EXECUTE_QUERY, startTime);
+      long costTime = System.currentTimeMillis() - startTime;
+      if (costTime >= CONFIG.getSlowQueryThreshold()) {
+        SLOW_SQL_LOGGER.info("Cost: {} ms, sql is {}", costTime, statement);
+      }
+    }
+  }
+
+  @Override
+  public TSStatus executeBatchStatement(TSExecuteBatchStatementReq req) {
+    throw new UnsupportedOperationException();
+  }
+
+  @Override
+  public TSExecuteStatementResp executeQueryStatement(TSExecuteStatementReq req) throws TException {
+    return executeStatement(req);
+  }
+
+  @Override
+  public TSExecuteStatementResp executeUpdateStatement(TSExecuteStatementReq req)
+      throws TException {
+    return executeStatement(req);
+  }
+
+  @Override
+  public TSFetchResultsResp fetchResults(TSFetchResultsReq req) {
+    try {
+      if (!SESSION_MANAGER.checkLogin(req.getSessionId())) {
+        return RpcUtils.getTSFetchResultsResp(getNotLoggedInStatus());
+      }
+
+      TSFetchResultsResp resp = RpcUtils.getTSFetchResultsResp(TSStatusCode.SUCCESS_STATUS);
+      TSQueryDataSet result =
+          QueryDataSetUtils.convertTsBlockByFetchSize(
+              COORDINATOR.getQueryExecution(new QueryId(String.valueOf(req.queryId))),
+              req.fetchSize);
+      boolean hasResultSet = result.bufferForTime().limit() != 0;
+
+      resp.setHasResultSet(hasResultSet);
+      resp.setQueryDataSet(result);
+      resp.setIsAlign(true);
+
+      QUERY_TIME_MANAGER.unRegisterQuery(req.queryId, false);
+      return resp;
+
+    } catch (Exception e) {
+      return RpcUtils.getTSFetchResultsResp(onQueryException(e, OperationType.FETCH_RESULTS));
+    }
+  }
+
+  @Override
+  public TSStatus insertRecords(TSInsertRecordsReq req) {
+    long t1 = System.currentTimeMillis();
+    try {
+      if (!SESSION_MANAGER.checkLogin(req.getSessionId())) {
+        return getNotLoggedInStatus();
+      }
+
+      if (AUDIT_LOGGER.isDebugEnabled()) {
+        AUDIT_LOGGER.debug(
+            "Session {} insertRecords, first device {}, first time {}",
+            SESSION_MANAGER.getCurrSessionId(),
+            req.prefixPaths.get(0),
+            req.getTimestamps().get(0));
+      }
+
+      // Step 1: TODO(INSERT) transfer from TSInsertTabletsReq to Statement
+      InsertRowsStatement statement = (InsertRowsStatement) StatementGenerator.createStatement(req);
+
+      // Step 2: call the coordinator
+      long queryId = SESSION_MANAGER.requestQueryId(false);
+      ExecutionResult result =
+          COORDINATOR.execute(
+              statement,
+              new QueryId(String.valueOf(queryId)),
+              SESSION_MANAGER.getSessionInfo(req.sessionId),
+              "");
+
+      // TODO(INSERT) do this check in analyze
+      //      TSStatus status = serviceProvider.checkAuthority(insertTabletPlan,
+      // req.getSessionId());
+      return result.status;
+    } catch (Exception e) {
+      return onNPEOrUnexpectedException(
+          e, OperationType.INSERT_TABLET, TSStatusCode.EXECUTE_STATEMENT_ERROR);
+    } finally {
+      addOperationLatency(Operation.EXECUTE_RPC_BATCH_INSERT, t1);
+    }
+  }
+
+  @Override
+  public TSStatus insertRecordsOfOneDevice(TSInsertRecordsOfOneDeviceReq req) {
+    long t1 = System.currentTimeMillis();
+    try {
+      if (!SESSION_MANAGER.checkLogin(req.getSessionId())) {
+        return getNotLoggedInStatus();
+      }
+
+      if (AUDIT_LOGGER.isDebugEnabled()) {
+        AUDIT_LOGGER.debug(
+            "Session {} insertRecords, device {}, first time {}",
+            SESSION_MANAGER.getCurrSessionId(),
+            req.prefixPath,
+            req.getTimestamps().get(0));
+      }
+
+      // Step 1: TODO(INSERT) transfer from TSInsertTabletsReq to Statement
+      InsertRowsOfOneDeviceStatement statement =
+          (InsertRowsOfOneDeviceStatement) StatementGenerator.createStatement(req);
+
+      // Step 2: call the coordinator
+      long queryId = SESSION_MANAGER.requestQueryId(false);
+      ExecutionResult result =
+          COORDINATOR.execute(
+              statement,
+              new QueryId(String.valueOf(queryId)),
+              SESSION_MANAGER.getSessionInfo(req.sessionId),
+              "");
+
+      // TODO(INSERT) do this check in analyze
+      //      TSStatus status = serviceProvider.checkAuthority(insertTabletPlan,
+      // req.getSessionId());
+      return result.status;
+    } catch (Exception e) {
+      return onNPEOrUnexpectedException(
+          e, OperationType.INSERT_TABLET, TSStatusCode.EXECUTE_STATEMENT_ERROR);
+    } finally {
+      addOperationLatency(Operation.EXECUTE_RPC_BATCH_INSERT, t1);
+    }
+  }
+
+  @Override
+  public TSStatus insertStringRecordsOfOneDevice(TSInsertStringRecordsOfOneDeviceReq req) {
+    long t1 = System.currentTimeMillis();
+    try {
+      if (!SESSION_MANAGER.checkLogin(req.getSessionId())) {
+        return getNotLoggedInStatus();
+      }
+
+      if (AUDIT_LOGGER.isDebugEnabled()) {
+        AUDIT_LOGGER.debug(
+            "Session {} insertRecords, device {}, first time {}",
+            SESSION_MANAGER.getCurrSessionId(),
+            req.prefixPath,
+            req.getTimestamps().get(0));
+      }
+
+      // Step 1: TODO(INSERT) transfer from TSInsertTabletsReq to Statement
+      InsertRowsOfOneDeviceStatement statement =
+          (InsertRowsOfOneDeviceStatement) StatementGenerator.createStatement(req);
+
+      // Step 2: call the coordinator
+      long queryId = SESSION_MANAGER.requestQueryId(false);
+      ExecutionResult result =
+          COORDINATOR.execute(
+              statement,
+              new QueryId(String.valueOf(queryId)),
+              SESSION_MANAGER.getSessionInfo(req.sessionId),
+              "");
+
+      // TODO(INSERT) do this check in analyze
+      //      TSStatus status = serviceProvider.checkAuthority(insertTabletPlan,
+      // req.getSessionId());
+      return result.status;
+    } catch (Exception e) {
+      return onNPEOrUnexpectedException(
+          e, OperationType.INSERT_TABLET, TSStatusCode.EXECUTE_STATEMENT_ERROR);
+    } finally {
+      addOperationLatency(Operation.EXECUTE_RPC_BATCH_INSERT, t1);
+    }
+  }
+
+  @Override
+  public TSStatus insertRecord(TSInsertRecordReq req) {
+    long t1 = System.currentTimeMillis();
+    try {
+      if (!SESSION_MANAGER.checkLogin(req.getSessionId())) {
+        return getNotLoggedInStatus();
+      }
+
+      AUDIT_LOGGER.debug(
+          "Session {} insertRecord, device {}, time {}",
+          SESSION_MANAGER.getCurrSessionId(),
+          req.getPrefixPath(),
+          req.getTimestamp());
+
+      InsertRowStatement statement = (InsertRowStatement) StatementGenerator.createStatement(req);
+
+      // Step 2: call the coordinator
+      long queryId = SESSION_MANAGER.requestQueryId(false);
+      ExecutionResult result =
+          COORDINATOR.execute(
+              statement,
+              new QueryId(String.valueOf(queryId)),
+              SESSION_MANAGER.getSessionInfo(req.sessionId),
+              "");
+
+      // TODO(INSERT) do this check in analyze
+      //      TSStatus status = SESSION_MANAGER.checkAuthority(insertTabletPlan,
+      // req.getSessionId());
+      return result.status;
+    } catch (Exception e) {
+      return onNPEOrUnexpectedException(
+          e, OperationType.INSERT_TABLET, TSStatusCode.EXECUTE_STATEMENT_ERROR);
+    } finally {
+      addOperationLatency(Operation.EXECUTE_RPC_BATCH_INSERT, t1);
+    }
+  }
+
+  @Override
+  public TSStatus insertTablets(TSInsertTabletsReq req) {
+    long t1 = System.currentTimeMillis();
+    try {
+      if (!SESSION_MANAGER.checkLogin(req.getSessionId())) {
+        return getNotLoggedInStatus();
+      }
+
+      // Step 1: TODO(INSERT) transfer from TSInsertTabletsReq to Statement
+      InsertMultiTabletsStatement statement =
+          (InsertMultiTabletsStatement) StatementGenerator.createStatement(req);
+      // Step 2: call the coordinator
+      long queryId = SESSION_MANAGER.requestQueryId(false);
+      ExecutionResult result =
+          COORDINATOR.execute(
+              statement,
+              new QueryId(String.valueOf(queryId)),
+              SESSION_MANAGER.getSessionInfo(req.sessionId),
+              "");
+
+      // TODO(INSERT) do this check in analyze
+      //      TSStatus status = serviceProvider.checkAuthority(insertTabletPlan,
+      // req.getSessionId());
+      return result.status;
+    } catch (Exception e) {
+      return onNPEOrUnexpectedException(
+          e, OperationType.INSERT_TABLET, TSStatusCode.EXECUTE_STATEMENT_ERROR);
+    } finally {
+      addOperationLatency(Operation.EXECUTE_RPC_BATCH_INSERT, t1);
+    }
+  }
+
+  @Override
+  public TSStatus insertTablet(TSInsertTabletReq req) {
+    long t1 = System.currentTimeMillis();
+    try {
+      if (!SESSION_MANAGER.checkLogin(req.getSessionId())) {
+        return getNotLoggedInStatus();
+      }
+
+      // Step 1: TODO(INSERT) transfer from TSInsertTabletReq to Statement
+      InsertTabletStatement statement =
+          (InsertTabletStatement) StatementGenerator.createStatement(req);
+
+      // Step 2: call the coordinator
+      long queryId = SESSION_MANAGER.requestQueryId(false);
+      ExecutionResult result =
+          COORDINATOR.execute(
+              statement,
+              new QueryId(String.valueOf(queryId)),
+              SESSION_MANAGER.getSessionInfo(req.sessionId),
+              "");
+
+      // TODO(INSERT) do this check in analyze
+      //      TSStatus status = SESSION_MANAGER.checkAuthority(insertTabletPlan,
+      // req.getSessionId());
+      return result.status;
+    } catch (Exception e) {
+      return onNPEOrUnexpectedException(
+          e, OperationType.INSERT_TABLET, TSStatusCode.EXECUTE_STATEMENT_ERROR);
+    } finally {
+      addOperationLatency(Operation.EXECUTE_RPC_BATCH_INSERT, t1);
+    }
+  }
+
+  @Override
+  public TSStatus insertStringRecords(TSInsertStringRecordsReq req) {
+    long t1 = System.currentTimeMillis();
+    try {
+      if (!SESSION_MANAGER.checkLogin(req.getSessionId())) {
+        return getNotLoggedInStatus();
+      }
+
+      if (AUDIT_LOGGER.isDebugEnabled()) {
+        AUDIT_LOGGER.debug(
+            "Session {} insertRecords, first device {}, first time {}",
+            SESSION_MANAGER.getCurrSessionId(),
+            req.prefixPaths.get(0),
+            req.getTimestamps().get(0));
+      }
+
+      InsertRowsStatement statement = (InsertRowsStatement) StatementGenerator.createStatement(req);
+
+      long queryId = SESSION_MANAGER.requestQueryId(false);
+      ExecutionResult result =
+          COORDINATOR.execute(
+              statement,
+              new QueryId(String.valueOf(queryId)),
+              SESSION_MANAGER.getSessionInfo(req.sessionId),
+              "");
+
+      // TODO(INSERT) do this check in analyze
+      //      TSStatus status = serviceProvider.checkAuthority(insertTabletPlan,
+      // req.getSessionId());
+      return result.status;
+    } catch (Exception e) {
+      return onNPEOrUnexpectedException(
+          e, OperationType.INSERT_TABLET, TSStatusCode.EXECUTE_STATEMENT_ERROR);
+    } finally {
+      addOperationLatency(Operation.EXECUTE_RPC_BATCH_INSERT, t1);
+    }
+  }
+
+  @Override
+  public TSStatus testInsertTablet(TSInsertTabletReq req) {
+    LOGGER.debug("Test insert batch request receive.");
+    return RpcUtils.getStatus(TSStatusCode.SUCCESS_STATUS);
+  }
+
+  @Override
+  public TSStatus testInsertTablets(TSInsertTabletsReq req) {
+    LOGGER.debug("Test insert batch request receive.");
+    return RpcUtils.getStatus(TSStatusCode.SUCCESS_STATUS);
+  }
+
+  @Override
+  public TSStatus testInsertRecord(TSInsertRecordReq req) {
+    LOGGER.debug("Test insert row request receive.");
+    return RpcUtils.getStatus(TSStatusCode.SUCCESS_STATUS);
+  }
+
+  @Override
+  public TSStatus testInsertStringRecord(TSInsertStringRecordReq req) {
+    LOGGER.debug("Test insert string record request receive.");
+    return RpcUtils.getStatus(TSStatusCode.SUCCESS_STATUS);
+  }
+
+  @Override
+  public TSStatus testInsertRecords(TSInsertRecordsReq req) {
+    LOGGER.debug("Test insert row in batch request receive.");
+    return RpcUtils.getStatus(TSStatusCode.SUCCESS_STATUS);
+  }
+
+  @Override
+  public TSStatus testInsertRecordsOfOneDevice(TSInsertRecordsOfOneDeviceReq req) {
+    LOGGER.debug("Test insert rows in batch request receive.");
+    return RpcUtils.getStatus(TSStatusCode.SUCCESS_STATUS);
+  }
+
+  @Override
+  public TSStatus testInsertStringRecords(TSInsertStringRecordsReq req) {
+    LOGGER.debug("Test insert string records request receive.");
+    return RpcUtils.getStatus(TSStatusCode.SUCCESS_STATUS);
+  }
+
+  @Override
+  public TSStatus deleteData(TSDeleteDataReq req) {
+    throw new UnsupportedOperationException();
+  }
+
+  @Override
+  public TSExecuteStatementResp executeRawDataQuery(TSRawDataQueryReq req) {
+    throw new UnsupportedOperationException();
+  }
+
+  @Override
+  public TSExecuteStatementResp executeLastDataQuery(TSLastDataQueryReq req) {
+    throw new UnsupportedOperationException();
+  }
+
+  @Override
+  public long requestStatementId(long sessionId) {
+    return SESSION_MANAGER.requestStatementId(sessionId);
+  }
+
+  @Override
+  public TSStatus createSchemaTemplate(TSCreateSchemaTemplateReq req) {
+    throw new UnsupportedOperationException();
+  }
+
+  @Override
+  public TSStatus appendSchemaTemplate(TSAppendSchemaTemplateReq req) {
+    throw new UnsupportedOperationException();
+  }
+
+  @Override
+  public TSStatus pruneSchemaTemplate(TSPruneSchemaTemplateReq req) {
+    throw new UnsupportedOperationException();
+  }
+
+  @Override
+  public TSQueryTemplateResp querySchemaTemplate(TSQueryTemplateReq req) {
+    throw new UnsupportedOperationException();
+  }
+
+  @Override
+  public TSStatus setSchemaTemplate(TSSetSchemaTemplateReq req) throws TException {
+    throw new UnsupportedOperationException();
+  }
+
+  @Override
+  public TSStatus unsetSchemaTemplate(TSUnsetSchemaTemplateReq req) throws TException {
+    throw new UnsupportedOperationException();
+  }
+
+  @Override
+  public TSStatus dropSchemaTemplate(TSDropSchemaTemplateReq req) throws TException {
+    throw new UnsupportedOperationException();
+  }
+
+  @Override
+  public TSStatus insertStringRecord(TSInsertStringRecordReq req) {
+    long t1 = System.currentTimeMillis();
+    try {
+      if (!SESSION_MANAGER.checkLogin(req.getSessionId())) {
+        return getNotLoggedInStatus();
+      }
+
+      AUDIT_LOGGER.debug(
+          "Session {} insertRecord, device {}, time {}",
+          SESSION_MANAGER.getCurrSessionId(),
+          req.getPrefixPath(),
+          req.getTimestamp());
+
+      InsertRowStatement statement = (InsertRowStatement) StatementGenerator.createStatement(req);
+
+      // Step 2: call the coordinator
+      long queryId = SESSION_MANAGER.requestQueryId(false);
+      ExecutionResult result =
+          COORDINATOR.execute(
+              statement,
+              new QueryId(String.valueOf(queryId)),
+              SESSION_MANAGER.getSessionInfo(req.sessionId),
+              "");
+
+      // TODO(INSERT) do this check in analyze
+      //      TSStatus status = serviceProvider.checkAuthority(insertTabletPlan,
+      // req.getSessionId());
+      return result.status;
+    } catch (Exception e) {
+      return onNPEOrUnexpectedException(
+          e, OperationType.INSERT_TABLET, TSStatusCode.EXECUTE_STATEMENT_ERROR);
+    } finally {
+      addOperationLatency(Operation.EXECUTE_RPC_BATCH_INSERT, t1);
+    }
+  }
+
+  private TSExecuteStatementResp createResponse(DatasetHeader header, long queryId) {
+    TSExecuteStatementResp resp = RpcUtils.getTSExecuteStatementResp(TSStatusCode.SUCCESS_STATUS);
+    resp.setColumnNameIndexMap(header.getColumnNameIndexMap());
+    // TODO deal with the sg name here
+    resp.setSgColumns(new ArrayList<>());
+    resp.setColumns(header.getRespColumns());
+    resp.setDataTypeList(header.getRespDataTypeList());
+    resp.setAliasColumns(header.getRespAliasColumns());
+    resp.setIgnoreTimeStamp(header.isIgnoreTimestamp());
+    resp.setQueryId(queryId);
+    return resp;
+  }
+
+  private TSStatus getNotLoggedInStatus() {
+    return RpcUtils.getStatus(
+        TSStatusCode.NOT_LOGIN_ERROR,
+        "Log in failed. Either you are not authorized or the session has timed out.");
+  }
+
+  /** Add stat of operation into metrics */
+  private void addOperationLatency(Operation operation, long startTime) {
+    if (CONFIG.isEnablePerformanceStat()) {
+      MetricsService.getInstance()
+          .getMetricManager()
+          .histogram(
+              System.currentTimeMillis() - startTime,
+              "operation_histogram",
+              MetricLevel.IMPORTANT,
+              "name",
+              operation.getName());
+      MetricsService.getInstance()
+          .getMetricManager()
+          .count(1, "operation_count", MetricLevel.IMPORTANT, "name", operation.getName());
+    }
+  }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/TSServiceImpl.java b/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/TSServiceImpl.java
index 6291b14875..8866bf361d 100644
--- a/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/TSServiceImpl.java
+++ b/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/TSServiceImpl.java
@@ -37,16 +37,17 @@ import org.apache.iotdb.db.exception.query.QueryProcessException;
 import org.apache.iotdb.db.metadata.path.MeasurementPath;
 import org.apache.iotdb.db.metadata.path.PartialPath;
 import org.apache.iotdb.db.metadata.template.TemplateQueryType;
-import org.apache.iotdb.db.mpp.common.QueryId;
-import org.apache.iotdb.db.mpp.execution.Coordinator;
-import org.apache.iotdb.db.mpp.execution.ExecutionResult;
-import org.apache.iotdb.db.mpp.sql.analyze.QueryType;
-import org.apache.iotdb.db.mpp.sql.parser.StatementGenerator;
-import org.apache.iotdb.db.mpp.sql.statement.crud.*;
 import org.apache.iotdb.db.qp.logical.Operator.OperatorType;
 import org.apache.iotdb.db.qp.physical.PhysicalPlan;
-import org.apache.iotdb.db.qp.physical.crud.*;
+import org.apache.iotdb.db.qp.physical.crud.DeletePlan;
 import org.apache.iotdb.db.qp.physical.crud.InsertMultiTabletsPlan;
+import org.apache.iotdb.db.qp.physical.crud.InsertRowPlan;
+import org.apache.iotdb.db.qp.physical.crud.InsertRowsOfOneDevicePlan;
+import org.apache.iotdb.db.qp.physical.crud.InsertRowsPlan;
+import org.apache.iotdb.db.qp.physical.crud.InsertTabletPlan;
+import org.apache.iotdb.db.qp.physical.crud.QueryPlan;
+import org.apache.iotdb.db.qp.physical.crud.SelectIntoPlan;
+import org.apache.iotdb.db.qp.physical.crud.UDFPlan;
 import org.apache.iotdb.db.qp.physical.sys.AppendTemplatePlan;
 import org.apache.iotdb.db.qp.physical.sys.CreateAlignedTimeSeriesPlan;
 import org.apache.iotdb.db.qp.physical.sys.CreateMultiTimeSeriesPlan;
@@ -79,47 +80,7 @@ import org.apache.iotdb.metrics.utils.MetricLevel;
 import org.apache.iotdb.rpc.RedirectException;
 import org.apache.iotdb.rpc.RpcUtils;
 import org.apache.iotdb.rpc.TSStatusCode;
-import org.apache.iotdb.service.rpc.thrift.ServerProperties;
-import org.apache.iotdb.service.rpc.thrift.TSAppendSchemaTemplateReq;
-import org.apache.iotdb.service.rpc.thrift.TSCancelOperationReq;
-import org.apache.iotdb.service.rpc.thrift.TSCloseOperationReq;
-import org.apache.iotdb.service.rpc.thrift.TSCloseSessionReq;
-import org.apache.iotdb.service.rpc.thrift.TSCreateAlignedTimeseriesReq;
-import org.apache.iotdb.service.rpc.thrift.TSCreateMultiTimeseriesReq;
-import org.apache.iotdb.service.rpc.thrift.TSCreateSchemaTemplateReq;
-import org.apache.iotdb.service.rpc.thrift.TSCreateTimeseriesReq;
-import org.apache.iotdb.service.rpc.thrift.TSDeleteDataReq;
-import org.apache.iotdb.service.rpc.thrift.TSDropSchemaTemplateReq;
-import org.apache.iotdb.service.rpc.thrift.TSExecuteBatchStatementReq;
-import org.apache.iotdb.service.rpc.thrift.TSExecuteStatementReq;
-import org.apache.iotdb.service.rpc.thrift.TSExecuteStatementResp;
-import org.apache.iotdb.service.rpc.thrift.TSFetchMetadataReq;
-import org.apache.iotdb.service.rpc.thrift.TSFetchMetadataResp;
-import org.apache.iotdb.service.rpc.thrift.TSFetchResultsReq;
-import org.apache.iotdb.service.rpc.thrift.TSFetchResultsResp;
-import org.apache.iotdb.service.rpc.thrift.TSGetTimeZoneResp;
-import org.apache.iotdb.service.rpc.thrift.TSIService;
-import org.apache.iotdb.service.rpc.thrift.TSInsertRecordReq;
-import org.apache.iotdb.service.rpc.thrift.TSInsertRecordsOfOneDeviceReq;
-import org.apache.iotdb.service.rpc.thrift.TSInsertRecordsReq;
-import org.apache.iotdb.service.rpc.thrift.TSInsertStringRecordReq;
-import org.apache.iotdb.service.rpc.thrift.TSInsertStringRecordsOfOneDeviceReq;
-import org.apache.iotdb.service.rpc.thrift.TSInsertStringRecordsReq;
-import org.apache.iotdb.service.rpc.thrift.TSInsertTabletReq;
-import org.apache.iotdb.service.rpc.thrift.TSInsertTabletsReq;
-import org.apache.iotdb.service.rpc.thrift.TSLastDataQueryReq;
-import org.apache.iotdb.service.rpc.thrift.TSOpenSessionReq;
-import org.apache.iotdb.service.rpc.thrift.TSOpenSessionResp;
-import org.apache.iotdb.service.rpc.thrift.TSPruneSchemaTemplateReq;
-import org.apache.iotdb.service.rpc.thrift.TSQueryDataSet;
-import org.apache.iotdb.service.rpc.thrift.TSQueryNonAlignDataSet;
-import org.apache.iotdb.service.rpc.thrift.TSQueryTemplateReq;
-import org.apache.iotdb.service.rpc.thrift.TSQueryTemplateResp;
-import org.apache.iotdb.service.rpc.thrift.TSRawDataQueryReq;
-import org.apache.iotdb.service.rpc.thrift.TSSetSchemaTemplateReq;
-import org.apache.iotdb.service.rpc.thrift.TSSetTimeZoneReq;
-import org.apache.iotdb.service.rpc.thrift.TSTracingInfo;
-import org.apache.iotdb.service.rpc.thrift.TSUnsetSchemaTemplateReq;
+import org.apache.iotdb.service.rpc.thrift.*;
 import org.apache.iotdb.tsfile.exception.filter.QueryFilterOptimizationException;
 import org.apache.iotdb.tsfile.exception.write.UnSupportedDataTypeException;
 import org.apache.iotdb.tsfile.file.metadata.enums.CompressionType;
@@ -161,11 +122,9 @@ import static org.apache.iotdb.db.utils.ErrorHandlingUtils.onQueryException;
 /** Thrift RPC implementation at server side. */
 public class TSServiceImpl implements TSIService.Iface {
 
-  private static final Coordinator coordinator = Coordinator.getInstance();
+  private static final SessionManager SESSION_MANAGER = SessionManager.getInstance();
 
-  public static SessionManager SESSION_MANAGER = SessionManager.getInstance();
-
-  protected class QueryTask implements Callable<TSExecuteStatementResp> {
+  private class QueryTask implements Callable<TSExecuteStatementResp> {
 
     private PhysicalPlan plan;
     private final long queryStartTime;
@@ -241,7 +200,7 @@ public class TSServiceImpl implements TSIService.Iface {
     }
   }
 
-  protected class FetchResultsTask implements Callable<TSFetchResultsResp> {
+  private class FetchResultsTask implements Callable<TSFetchResultsResp> {
 
     private final long sessionId;
     private final long queryId;
@@ -1228,46 +1187,6 @@ public class TSServiceImpl implements TSIService.Iface {
         allCheckSuccess, tsStatus, insertRowsPlan.getResults(), req.prefixPaths.size());
   }
 
-  public TSStatus insertRecordsV2(TSInsertRecordsReq req) {
-    long t1 = System.currentTimeMillis();
-    try {
-      if (!SESSION_MANAGER.checkLogin(req.getSessionId())) {
-        return getNotLoggedInStatus();
-      }
-
-      if (AUDIT_LOGGER.isDebugEnabled()) {
-        AUDIT_LOGGER.debug(
-            "Session {} insertRecords, first device {}, first time {}",
-            SESSION_MANAGER.getCurrSessionId(),
-            req.prefixPaths.get(0),
-            req.getTimestamps().get(0));
-      }
-
-      // Step 1: TODO(INSERT) transfer from TSInsertTabletsReq to Statement
-      InsertRowsStatement statement = (InsertRowsStatement) StatementGenerator.createStatement(req);
-
-      // Step 2: call the coordinator
-      long queryId = SESSION_MANAGER.requestQueryId(false);
-      ExecutionResult result =
-          coordinator.execute(
-              statement,
-              new QueryId(String.valueOf(queryId)),
-              QueryType.WRITE,
-              SESSION_MANAGER.getSessionInfo(req.sessionId),
-              "");
-
-      // TODO(INSERT) do this check in analyze
-      //      TSStatus status = serviceProvider.checkAuthority(insertTabletPlan,
-      // req.getSessionId());
-      return result.status;
-    } catch (Exception e) {
-      return onNPEOrUnexpectedException(
-          e, OperationType.INSERT_TABLET, TSStatusCode.EXECUTE_STATEMENT_ERROR);
-    } finally {
-      addOperationLatency(Operation.EXECUTE_RPC_BATCH_INSERT, t1);
-    }
-  }
-
   private TSStatus judgeFinalTsStatus(
       boolean allCheckSuccess,
       TSStatus executeTsStatus,
@@ -1335,47 +1254,6 @@ public class TSServiceImpl implements TSIService.Iface {
     return resp;
   }
 
-  public TSStatus insertRecordsOfOneDeviceV2(TSInsertRecordsOfOneDeviceReq req) {
-    long t1 = System.currentTimeMillis();
-    try {
-      if (!SESSION_MANAGER.checkLogin(req.getSessionId())) {
-        return getNotLoggedInStatus();
-      }
-
-      if (AUDIT_LOGGER.isDebugEnabled()) {
-        AUDIT_LOGGER.debug(
-            "Session {} insertRecords, device {}, first time {}",
-            SESSION_MANAGER.getCurrSessionId(),
-            req.prefixPath,
-            req.getTimestamps().get(0));
-      }
-
-      // Step 1: TODO(INSERT) transfer from TSInsertTabletsReq to Statement
-      InsertRowsOfOneDeviceStatement statement =
-          (InsertRowsOfOneDeviceStatement) StatementGenerator.createStatement(req);
-
-      // Step 2: call the coordinator
-      long queryId = SESSION_MANAGER.requestQueryId(false);
-      ExecutionResult result =
-          coordinator.execute(
-              statement,
-              new QueryId(String.valueOf(queryId)),
-              QueryType.WRITE,
-              SESSION_MANAGER.getSessionInfo(req.sessionId),
-              "");
-
-      // TODO(INSERT) do this check in analyze
-      //      TSStatus status = serviceProvider.checkAuthority(insertTabletPlan,
-      // req.getSessionId());
-      return result.status;
-    } catch (Exception e) {
-      return onNPEOrUnexpectedException(
-          e, OperationType.INSERT_TABLET, TSStatusCode.EXECUTE_STATEMENT_ERROR);
-    } finally {
-      addOperationLatency(Operation.EXECUTE_RPC_BATCH_INSERT, t1);
-    }
-  }
-
   @Override
   public TSStatus insertStringRecordsOfOneDevice(TSInsertStringRecordsOfOneDeviceReq req) {
     if (!SESSION_MANAGER.checkLogin(req.getSessionId())) {
@@ -1434,47 +1312,6 @@ public class TSServiceImpl implements TSIService.Iface {
         allCheckSuccess, tsStatus, insertRowsPlan.getResults(), req.timestamps.size());
   }
 
-  public TSStatus insertStringRecordsOfOneDeviceV2(TSInsertStringRecordsOfOneDeviceReq req) {
-    long t1 = System.currentTimeMillis();
-    try {
-      if (!SESSION_MANAGER.checkLogin(req.getSessionId())) {
-        return getNotLoggedInStatus();
-      }
-
-      if (AUDIT_LOGGER.isDebugEnabled()) {
-        AUDIT_LOGGER.debug(
-            "Session {} insertRecords, device {}, first time {}",
-            SESSION_MANAGER.getCurrSessionId(),
-            req.prefixPath,
-            req.getTimestamps().get(0));
-      }
-
-      // Step 1: TODO(INSERT) transfer from TSInsertTabletsReq to Statement
-      InsertRowsOfOneDeviceStatement statement =
-          (InsertRowsOfOneDeviceStatement) StatementGenerator.createStatement(req);
-
-      // Step 2: call the coordinator
-      long queryId = SESSION_MANAGER.requestQueryId(false);
-      ExecutionResult result =
-          coordinator.execute(
-              statement,
-              new QueryId(String.valueOf(queryId)),
-              QueryType.WRITE,
-              SESSION_MANAGER.getSessionInfo(req.sessionId),
-              "");
-
-      // TODO(INSERT) do this check in analyze
-      //      TSStatus status = serviceProvider.checkAuthority(insertTabletPlan,
-      // req.getSessionId());
-      return result.status;
-    } catch (Exception e) {
-      return onNPEOrUnexpectedException(
-          e, OperationType.INSERT_TABLET, TSStatusCode.EXECUTE_STATEMENT_ERROR);
-    } finally {
-      addOperationLatency(Operation.EXECUTE_RPC_BATCH_INSERT, t1);
-    }
-  }
-
   @Override
   public TSStatus insertStringRecords(TSInsertStringRecordsReq req) {
     if (!SESSION_MANAGER.checkLogin(req.getSessionId())) {
@@ -1528,44 +1365,6 @@ public class TSServiceImpl implements TSIService.Iface {
         allCheckSuccess, tsStatus, insertRowsPlan.getResults(), req.prefixPaths.size());
   }
 
-  public TSStatus insertStringRecordsV2(TSInsertStringRecordsReq req) {
-    long t1 = System.currentTimeMillis();
-    try {
-      if (!SESSION_MANAGER.checkLogin(req.getSessionId())) {
-        return getNotLoggedInStatus();
-      }
-
-      if (AUDIT_LOGGER.isDebugEnabled()) {
-        AUDIT_LOGGER.debug(
-            "Session {} insertRecords, first device {}, first time {}",
-            SESSION_MANAGER.getCurrSessionId(),
-            req.prefixPaths.get(0),
-            req.getTimestamps().get(0));
-      }
-
-      InsertRowsStatement statement = (InsertRowsStatement) StatementGenerator.createStatement(req);
-
-      long queryId = SESSION_MANAGER.requestQueryId(false);
-      ExecutionResult result =
-          coordinator.execute(
-              statement,
-              new QueryId(String.valueOf(queryId)),
-              QueryType.WRITE,
-              SESSION_MANAGER.getSessionInfo(req.sessionId),
-              "");
-
-      // TODO(INSERT) do this check in analyze
-      //      TSStatus status = serviceProvider.checkAuthority(insertTabletPlan,
-      // req.getSessionId());
-      return result.status;
-    } catch (Exception e) {
-      return onNPEOrUnexpectedException(
-          e, OperationType.INSERT_TABLET, TSStatusCode.EXECUTE_STATEMENT_ERROR);
-    } finally {
-      addOperationLatency(Operation.EXECUTE_RPC_BATCH_INSERT, t1);
-    }
-  }
-
   private void addMeasurementAndValue(
       InsertRowPlan insertRowPlan, List<String> measurements, List<String> values) {
     List<String> newMeasurements = new ArrayList<>(measurements.size());
@@ -1656,43 +1455,6 @@ public class TSServiceImpl implements TSIService.Iface {
     }
   }
 
-  public TSStatus insertRecordV2(TSInsertRecordReq req) {
-    long t1 = System.currentTimeMillis();
-    try {
-      if (!SESSION_MANAGER.checkLogin(req.getSessionId())) {
-        return getNotLoggedInStatus();
-      }
-
-      AUDIT_LOGGER.debug(
-          "Session {} insertRecord, device {}, time {}",
-          SESSION_MANAGER.getCurrSessionId(),
-          req.getPrefixPath(),
-          req.getTimestamp());
-
-      InsertRowStatement statement = (InsertRowStatement) StatementGenerator.createStatement(req);
-
-      // Step 2: call the coordinator
-      long queryId = SESSION_MANAGER.requestQueryId(false);
-      ExecutionResult result =
-          coordinator.execute(
-              statement,
-              new QueryId(String.valueOf(queryId)),
-              QueryType.WRITE,
-              SESSION_MANAGER.getSessionInfo(req.sessionId),
-              "");
-
-      // TODO(INSERT) do this check in analyze
-      //      TSStatus status = SESSION_MANAGER.checkAuthority(insertTabletPlan,
-      // req.getSessionId());
-      return result.status;
-    } catch (Exception e) {
-      return onNPEOrUnexpectedException(
-          e, OperationType.INSERT_TABLET, TSStatusCode.EXECUTE_STATEMENT_ERROR);
-    } finally {
-      addOperationLatency(Operation.EXECUTE_RPC_BATCH_INSERT, t1);
-    }
-  }
-
   @Override
   public TSStatus insertStringRecord(TSInsertStringRecordReq req) {
     try {
@@ -1724,43 +1486,6 @@ public class TSServiceImpl implements TSIService.Iface {
     }
   }
 
-  public TSStatus insertStringRecordV2(TSInsertStringRecordReq req) {
-    long t1 = System.currentTimeMillis();
-    try {
-      if (!SESSION_MANAGER.checkLogin(req.getSessionId())) {
-        return getNotLoggedInStatus();
-      }
-
-      AUDIT_LOGGER.debug(
-          "Session {} insertRecord, device {}, time {}",
-          SESSION_MANAGER.getCurrSessionId(),
-          req.getPrefixPath(),
-          req.getTimestamp());
-
-      InsertRowStatement statement = (InsertRowStatement) StatementGenerator.createStatement(req);
-
-      // Step 2: call the coordinator
-      long queryId = SESSION_MANAGER.requestQueryId(false);
-      ExecutionResult result =
-          coordinator.execute(
-              statement,
-              new QueryId(String.valueOf(queryId)),
-              QueryType.WRITE,
-              SESSION_MANAGER.getSessionInfo(req.sessionId),
-              "");
-
-      // TODO(INSERT) do this check in analyze
-      //      TSStatus status = serviceProvider.checkAuthority(insertTabletPlan,
-      // req.getSessionId());
-      return result.status;
-    } catch (Exception e) {
-      return onNPEOrUnexpectedException(
-          e, OperationType.INSERT_TABLET, TSStatusCode.EXECUTE_STATEMENT_ERROR);
-    } finally {
-      addOperationLatency(Operation.EXECUTE_RPC_BATCH_INSERT, t1);
-    }
-  }
-
   @Override
   public TSStatus deleteData(TSDeleteDataReq req) {
     try {
@@ -1819,39 +1544,6 @@ public class TSServiceImpl implements TSIService.Iface {
     }
   }
 
-  public TSStatus insertTabletV2(TSInsertTabletReq req) {
-    long t1 = System.currentTimeMillis();
-    try {
-      if (!SESSION_MANAGER.checkLogin(req.getSessionId())) {
-        return getNotLoggedInStatus();
-      }
-
-      // Step 1: TODO(INSERT) transfer from TSInsertTabletReq to Statement
-      InsertTabletStatement statement =
-          (InsertTabletStatement) StatementGenerator.createStatement(req);
-
-      // Step 2: call the coordinator
-      long queryId = SESSION_MANAGER.requestQueryId(false);
-      ExecutionResult result =
-          coordinator.execute(
-              statement,
-              new QueryId(String.valueOf(queryId)),
-              QueryType.WRITE,
-              SESSION_MANAGER.getSessionInfo(req.sessionId),
-              "");
-
-      // TODO(INSERT) do this check in analyze
-      //      TSStatus status = SESSION_MANAGER.checkAuthority(insertTabletPlan,
-      // req.getSessionId());
-      return result.status;
-    } catch (Exception e) {
-      return onNPEOrUnexpectedException(
-          e, OperationType.INSERT_TABLET, TSStatusCode.EXECUTE_STATEMENT_ERROR);
-    } finally {
-      addOperationLatency(Operation.EXECUTE_RPC_BATCH_INSERT, t1);
-    }
-  }
-
   @Override
   public TSStatus insertTablets(TSInsertTabletsReq req) {
     long t1 = System.currentTimeMillis();
@@ -1874,38 +1566,6 @@ public class TSServiceImpl implements TSIService.Iface {
     }
   }
 
-  public TSStatus insertTabletsV2(TSInsertTabletsReq req) {
-    long t1 = System.currentTimeMillis();
-    try {
-      if (!SESSION_MANAGER.checkLogin(req.getSessionId())) {
-        return getNotLoggedInStatus();
-      }
-
-      // Step 1: TODO(INSERT) transfer from TSInsertTabletsReq to Statement
-      InsertMultiTabletsStatement statement =
-          (InsertMultiTabletsStatement) StatementGenerator.createStatement(req);
-      // Step 2: call the coordinator
-      long queryId = SESSION_MANAGER.requestQueryId(false);
-      ExecutionResult result =
-          coordinator.execute(
-              statement,
-              new QueryId(String.valueOf(queryId)),
-              QueryType.WRITE,
-              SESSION_MANAGER.getSessionInfo(req.sessionId),
-              "");
-
-      // TODO(INSERT) do this check in analyze
-      //      TSStatus status = serviceProvider.checkAuthority(insertTabletPlan,
-      // req.getSessionId());
-      return result.status;
-    } catch (Exception e) {
-      return onNPEOrUnexpectedException(
-          e, OperationType.INSERT_TABLET, TSStatusCode.EXECUTE_STATEMENT_ERROR);
-    } finally {
-      addOperationLatency(Operation.EXECUTE_RPC_BATCH_INSERT, t1);
-    }
-  }
-
   private InsertTabletPlan constructInsertTabletPlan(TSInsertTabletsReq req, int i)
       throws IllegalPathException {
     InsertTabletPlan insertTabletPlan =
diff --git a/server/src/main/java/org/apache/iotdb/db/utils/QueryDataSetUtils.java b/server/src/main/java/org/apache/iotdb/db/utils/QueryDataSetUtils.java
index 29efa1a4c7..455c274f41 100644
--- a/server/src/main/java/org/apache/iotdb/db/utils/QueryDataSetUtils.java
+++ b/server/src/main/java/org/apache/iotdb/db/utils/QueryDataSetUtils.java
@@ -18,7 +18,7 @@
  */
 package org.apache.iotdb.db.utils;
 
-import org.apache.iotdb.db.mpp.execution.QueryExecution;
+import org.apache.iotdb.db.mpp.execution.IQueryExecution;
 import org.apache.iotdb.db.tools.watermark.WatermarkEncoder;
 import org.apache.iotdb.service.rpc.thrift.TSQueryDataSet;
 import org.apache.iotdb.tsfile.exception.write.UnSupportedDataTypeException;
@@ -177,7 +177,7 @@ public class QueryDataSetUtils {
   }
 
   public static TSQueryDataSet convertTsBlockByFetchSize(
-      QueryExecution queryExecution, int fetchSize) throws IOException {
+      IQueryExecution queryExecution, int fetchSize) throws IOException {
     int columnNum = queryExecution.getOutputValueColumnCount();
     TSQueryDataSet tsQueryDataSet = new TSQueryDataSet();
     // one time column and each value column has an actual value buffer and a bitmap value to