You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ja...@apache.org on 2022/04/14 08:20:13 UTC

[iotdb] 01/01: add DataNodeTSIServiceImpl

This is an automated email from the ASF dual-hosted git repository.

jackietien pushed a commit to branch ty-mpp
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit 01a35ea6d42ad114f13314ef57fba85174d4e820
Author: JackieTien97 <ja...@gmail.com>
AuthorDate: Thu Apr 14 16:19:52 2022 +0800

    add DataNodeTSIServiceImpl
---
 .../org/apache/iotdb/db/qp/sql/SqlLexer.tokens     | 245 +++++++
 .../iotdb/commons/service/ThriftService.java       |   8 +-
 .../resources/conf/iotdb-engine.properties         |   3 +
 .../org/apache/iotdb/db/conf/IoTDBDescriptor.java  |   3 +
 .../iotdb/db/mpp/common/MPPQueryContext.java       |  10 +-
 .../iotdb/db/mpp/common/header/DatasetHeader.java  |  13 +-
 .../iotdb/db/mpp/execution/ConfigExecution.java    |  44 +-
 .../apache/iotdb/db/mpp/execution/Coordinator.java |  18 +-
 .../iotdb/db/mpp/execution/IQueryExecution.java    |  13 +
 .../iotdb/db/mpp/execution/QueryExecution.java     |  28 +-
 .../apache/iotdb/db/mpp/sql/analyze/Analyzer.java  | 334 +++++-----
 .../org/apache/iotdb/db/service/RPCService.java    |   6 +-
 .../thrift/impl/DataNodeTSIServiceImpl.java        | 724 +++++++++++++++++++++
 .../db/service/thrift/impl/TSServiceImpl.java      | 364 +----------
 .../apache/iotdb/db/utils/QueryDataSetUtils.java   |   4 +-
 15 files changed, 1261 insertions(+), 556 deletions(-)

diff --git a/antlr/src/main/antlr4/org/apache/iotdb/db/qp/sql/SqlLexer.tokens b/antlr/src/main/antlr4/org/apache/iotdb/db/qp/sql/SqlLexer.tokens
new file mode 100644
index 0000000000..30a7c1b297
--- /dev/null
+++ b/antlr/src/main/antlr4/org/apache/iotdb/db/qp/sql/SqlLexer.tokens
@@ -0,0 +1,245 @@
+WS=1
+ADD=2
+AFTER=3
+ALIAS=4
+ALIGN=5
+ALIGNED=6
+ALL=7
+ALTER=8
+ANY=9
+APPEND=10
+AS=11
+ASC=12
+ATTRIBUTES=13
+AUTOREGISTER=14
+BEFORE=15
+BEGIN=16
+BOUNDARY=17
+BY=18
+CACHE=19
+CHILD=20
+CLEAR=21
+COMPRESSION=22
+COMPRESSOR=23
+CONCAT=24
+CONFIGURATION=25
+CONTINUOUS=26
+COUNT=27
+CONTAIN=28
+CQ=29
+CQS=30
+CREATE=31
+DATATYPE=32
+DEBUG=33
+DELETE=34
+DESC=35
+DESCRIBE=36
+DEVICE=37
+DEVICES=38
+DISABLE=39
+DROP=40
+ENCODING=41
+END=42
+EVERY=43
+EXPLAIN=44
+FILL=45
+FLUSH=46
+FOR=47
+FROM=48
+FULL=49
+FUNCTION=50
+FUNCTIONS=51
+GLOBAL=52
+GRANT=53
+GROUP=54
+INDEX=55
+INFO=56
+INSERT=57
+INTO=58
+KILL=59
+LABEL=60
+LAST=61
+LATEST=62
+LEVEL=63
+LIKE=64
+LIMIT=65
+LINEAR=66
+LINK=67
+LIST=68
+LOAD=69
+LOCK=70
+MERGE=71
+METADATA=72
+NODES=73
+NOW=74
+OF=75
+OFF=76
+OFFSET=77
+ON=78
+ORDER=79
+PARTITION=80
+PASSWORD=81
+PATHS=82
+PREVIOUS=83
+PREVIOUSUNTILLAST=84
+PRIVILEGES=85
+PROCESSLIST=86
+PROPERTY=87
+PRUNE=88
+QUERIES=89
+QUERY=90
+READONLY=91
+REGEXP=92
+REMOVE=93
+RENAME=94
+RESAMPLE=95
+RESOURCE=96
+REVOKE=97
+ROLE=98
+ROOT=99
+SCHEMA=100
+SELECT=101
+SET=102
+SETTLE=103
+SGLEVEL=104
+SHOW=105
+SLIMIT=106
+SOFFSET=107
+STORAGE=108
+START=109
+STOP=110
+SYSTEM=111
+TAGS=112
+TASK=113
+TEMPLATE=114
+TEMPLATES=115
+TIME=116
+TIMESERIES=117
+TIMESTAMP=118
+TO=119
+TOLERANCE=120
+TOP=121
+TRACING=122
+TRIGGER=123
+TRIGGERS=124
+TTL=125
+UNLINK=126
+UNLOAD=127
+UNSET=128
+UPDATE=129
+UPSERT=130
+USER=131
+USING=132
+VALUES=133
+VERIFY=134
+VERSION=135
+WATERMARK_EMBEDDING=136
+WHERE=137
+WITH=138
+WITHOUT=139
+WRITABLE=140
+DATATYPE_VALUE=141
+BOOLEAN=142
+DOUBLE=143
+FLOAT=144
+INT32=145
+INT64=146
+TEXT=147
+ENCODING_VALUE=148
+DICTIONARY=149
+DIFF=150
+GORILLA=151
+PLAIN=152
+REGULAR=153
+RLE=154
+TS_2DIFF=155
+ZIGZAG=156
+FREQ=157
+COMPRESSOR_VALUE=158
+GZIP=159
+LZ4=160
+SNAPPY=161
+UNCOMPRESSED=162
+PRIVILEGE_VALUE=163
+SET_STORAGE_GROUP=164
+DELETE_STORAGE_GROUP=165
+CREATE_TIMESERIES=166
+INSERT_TIMESERIES=167
+READ_TIMESERIES=168
+DELETE_TIMESERIES=169
+CREATE_USER=170
+DELETE_USER=171
+MODIFY_PASSWORD=172
+LIST_USER=173
+GRANT_USER_PRIVILEGE=174
+REVOKE_USER_PRIVILEGE=175
+GRANT_USER_ROLE=176
+REVOKE_USER_ROLE=177
+CREATE_ROLE=178
+DELETE_ROLE=179
+LIST_ROLE=180
+GRANT_ROLE_PRIVILEGE=181
+REVOKE_ROLE_PRIVILEGE=182
+CREATE_FUNCTION=183
+DROP_FUNCTION=184
+CREATE_TRIGGER=185
+DROP_TRIGGER=186
+START_TRIGGER=187
+STOP_TRIGGER=188
+CREATE_CONTINUOUS_QUERY=189
+DROP_CONTINUOUS_QUERY=190
+MINUS=191
+PLUS=192
+DIV=193
+MOD=194
+OPERATOR_DEQ=195
+OPERATOR_SEQ=196
+OPERATOR_GT=197
+OPERATOR_GTE=198
+OPERATOR_LT=199
+OPERATOR_LTE=200
+OPERATOR_NEQ=201
+OPERATOR_IN=202
+OPERATOR_AND=203
+OPERATOR_OR=204
+OPERATOR_NOT=205
+OPERATOR_CONTAINS=206
+DOT=207
+COMMA=208
+SEMI=209
+STAR=210
+DOUBLE_STAR=211
+LR_BRACKET=212
+RR_BRACKET=213
+LS_BRACKET=214
+RS_BRACKET=215
+STRING_LITERAL=216
+DURATION_LITERAL=217
+DATETIME_LITERAL=218
+INTEGER_LITERAL=219
+EXPONENT_NUM_PART=220
+BOOLEAN_LITERAL=221
+NULL_LITERAL=222
+NAN_LITERAL=223
+ID=224
+QUTOED_ID_IN_NODE_NAME=225
+QUTOED_ID=226
+'-'=191
+'+'=192
+'/'=193
+'%'=194
+'=='=195
+'='=196
+'>'=197
+'>='=198
+'<'=199
+'<='=200
+'.'=207
+','=208
+';'=209
+'*'=210
+'**'=211
+'('=212
+')'=213
+'['=214
+']'=215
diff --git a/node-commons/src/main/java/org/apache/iotdb/commons/service/ThriftService.java b/node-commons/src/main/java/org/apache/iotdb/commons/service/ThriftService.java
index e792d88c81..30d75a94eb 100644
--- a/node-commons/src/main/java/org/apache/iotdb/commons/service/ThriftService.java
+++ b/node-commons/src/main/java/org/apache/iotdb/commons/service/ThriftService.java
@@ -26,6 +26,7 @@ import org.apache.thrift.TProcessor;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.lang.reflect.InvocationTargetException;
 import java.util.concurrent.CountDownLatch;
 
 public abstract class ThriftService implements IService {
@@ -88,7 +89,8 @@ public abstract class ThriftService implements IService {
   }
 
   public abstract void initTProcessor()
-      throws ClassNotFoundException, IllegalAccessException, InstantiationException;
+      throws ClassNotFoundException, IllegalAccessException, InstantiationException,
+          NoSuchMethodException, InvocationTargetException;
 
   public abstract void initThriftServiceThread()
       throws IllegalAccessException, InstantiationException, ClassNotFoundException;
@@ -125,7 +127,9 @@ public abstract class ThriftService implements IService {
     } catch (InterruptedException
         | ClassNotFoundException
         | IllegalAccessException
-        | InstantiationException e) {
+        | InstantiationException
+        | NoSuchMethodException
+        | InvocationTargetException e) {
       Thread.currentThread().interrupt();
       throw new StartupException(this.getID().getName(), e.getMessage());
     }
diff --git a/server/src/assembly/resources/conf/iotdb-engine.properties b/server/src/assembly/resources/conf/iotdb-engine.properties
index 8a6673dd85..c3b4f4fe2e 100644
--- a/server/src/assembly/resources/conf/iotdb-engine.properties
+++ b/server/src/assembly/resources/conf/iotdb-engine.properties
@@ -27,6 +27,9 @@ rpc_address=0.0.0.0
 # Datatype: int
 rpc_port=6667
 
+# Client RPC Service Impl Class Name: TSServiceImpl or DataNodeTSIServiceImpl
+# rpc_class_name=TSServiceImpl
+
 # Datatype: int
 mpp_port=7777
 
diff --git a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
index 5ed958b579..1eeab2bfd4 100644
--- a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
+++ b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
@@ -167,6 +167,9 @@ public class IoTDBDescriptor {
           Integer.parseInt(
               properties.getProperty("rpc_port", Integer.toString(conf.getRpcPort()))));
 
+      conf.setRpcImplClassName(
+          properties.getProperty("rpc_class_name", conf.getRpcImplClassName()).trim());
+
       conf.setMppPort(
           Integer.parseInt(
               properties.getProperty("mpp_port", Integer.toString(conf.getRpcPort()))));
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/common/MPPQueryContext.java b/server/src/main/java/org/apache/iotdb/db/mpp/common/MPPQueryContext.java
index def89dba39..b449bdf9e7 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/common/MPPQueryContext.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/common/MPPQueryContext.java
@@ -29,7 +29,7 @@ public class MPPQueryContext {
   private String sql;
   private QueryId queryId;
   private SessionInfo session;
-  private QueryType queryType;
+  private QueryType queryType = QueryType.READ;
 
   private Endpoint hostEndpoint;
   private ResultNodeContext resultNodeContext;
@@ -38,12 +38,10 @@ public class MPPQueryContext {
     this.queryId = queryId;
   }
 
-  public MPPQueryContext(
-      String sql, QueryId queryId, SessionInfo session, QueryType type, Endpoint hostEndpoint) {
+  public MPPQueryContext(String sql, QueryId queryId, SessionInfo session, Endpoint hostEndpoint) {
     this.sql = sql;
     this.queryId = queryId;
     this.session = session;
-    this.queryType = type;
     this.hostEndpoint = hostEndpoint;
     this.resultNodeContext = new ResultNodeContext(queryId);
   }
@@ -56,6 +54,10 @@ public class MPPQueryContext {
     return queryType;
   }
 
+  public void setQueryType(QueryType queryType) {
+    this.queryType = queryType;
+  }
+
   public Endpoint getHostEndpoint() {
     return hostEndpoint;
   }
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/common/header/DatasetHeader.java b/server/src/main/java/org/apache/iotdb/db/mpp/common/header/DatasetHeader.java
index 32a236feae..ee3e52d218 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/common/header/DatasetHeader.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/common/header/DatasetHeader.java
@@ -19,8 +19,6 @@
 
 package org.apache.iotdb.db.mpp.common.header;
 
-import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
-
 import com.google.common.primitives.Bytes;
 
 import java.util.*;
@@ -51,10 +49,6 @@ public class DatasetHeader {
     return isIgnoreTimestamp;
   }
 
-  public Map<String, Integer> getColumnToTsBlockIndexMap() {
-    return columnToTsBlockIndexMap;
-  }
-
   public void setColumnToTsBlockIndexMap(List<String> outputColumnNames) {
     this.columnToTsBlockIndexMap = new HashMap<>();
     for (int i = 0; i < outputColumnNames.size(); i++) {
@@ -66,8 +60,11 @@ public class DatasetHeader {
     return columnHeaders.stream().map(ColumnHeader::getColumnName).collect(Collectors.toList());
   }
 
-  public List<TSDataType> getRespDataTypeList() {
-    return columnHeaders.stream().map(ColumnHeader::getColumnType).collect(Collectors.toList());
+  public List<String> getRespDataTypeList() {
+    return columnHeaders.stream()
+        .map(ColumnHeader::getColumnType)
+        .map(Objects::toString)
+        .collect(Collectors.toList());
   }
 
   public List<Byte> getRespAliasColumns() {
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/ConfigExecution.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/ConfigExecution.java
index 3d8b5dcdd5..c9cb515e71 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/ConfigExecution.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/ConfigExecution.java
@@ -20,15 +20,19 @@
 package org.apache.iotdb.db.mpp.execution;
 
 import org.apache.iotdb.db.mpp.common.MPPQueryContext;
+import org.apache.iotdb.db.mpp.common.header.DatasetHeader;
+import org.apache.iotdb.db.mpp.sql.analyze.QueryType;
 import org.apache.iotdb.db.mpp.sql.statement.Statement;
 import org.apache.iotdb.rpc.RpcUtils;
 import org.apache.iotdb.rpc.TSStatusCode;
 import org.apache.iotdb.tsfile.exception.NotImplementedException;
+import org.apache.iotdb.tsfile.read.common.block.TsBlock;
 
 import com.google.common.util.concurrent.FutureCallback;
 import com.google.common.util.concurrent.Futures;
 import com.google.common.util.concurrent.ListenableFuture;
 import jersey.repackaged.com.google.common.util.concurrent.SettableFuture;
+import org.jetbrains.annotations.NotNull;
 
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.ExecutorService;
@@ -37,12 +41,12 @@ import static com.google.common.base.Throwables.throwIfInstanceOf;
 
 public class ConfigExecution implements IQueryExecution {
 
-  private MPPQueryContext context;
-  private Statement statement;
-  private ExecutorService executor;
+  private final MPPQueryContext context;
+  private final Statement statement;
+  private final ExecutorService executor;
 
-  private QueryStateMachine stateMachine;
-  private SettableFuture<Boolean> result;
+  private final QueryStateMachine stateMachine;
+  private final SettableFuture<Boolean> result;
 
   public ConfigExecution(MPPQueryContext context, Statement statement, ExecutorService executor) {
     this.context = context;
@@ -67,7 +71,7 @@ public class ConfigExecution implements IQueryExecution {
             }
 
             @Override
-            public void onFailure(Throwable throwable) {
+            public void onFailure(@NotNull Throwable throwable) {
               fail(throwable);
             }
           },
@@ -101,6 +105,34 @@ public class ConfigExecution implements IQueryExecution {
     }
   }
 
+  @Override
+  public TsBlock getBatchResult() {
+    // TODO
+    return null;
+  }
+
+  @Override
+  public boolean hasNextResult() {
+    return false;
+  }
+
+  @Override
+  public int getOutputValueColumnCount() {
+    // TODO
+    return 0;
+  }
+
+  @Override
+  public DatasetHeader getDatasetHeader() {
+    // TODO
+    return null;
+  }
+
+  @Override
+  public boolean isQuery() {
+    return context.getQueryType() == QueryType.READ;
+  }
+
   // TODO: consider a more suitable implementation for it
   // Generate the corresponding IConfigTask by statement.
   // Each type of statement will has a ConfigTask
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/Coordinator.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/Coordinator.java
index 4749394259..784b2b5999 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/Coordinator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/Coordinator.java
@@ -24,9 +24,7 @@ import org.apache.iotdb.db.conf.IoTDBDescriptor;
 import org.apache.iotdb.db.mpp.common.MPPQueryContext;
 import org.apache.iotdb.db.mpp.common.QueryId;
 import org.apache.iotdb.db.mpp.common.SessionInfo;
-import org.apache.iotdb.db.mpp.sql.analyze.QueryType;
 import org.apache.iotdb.db.mpp.sql.statement.Statement;
-import org.apache.iotdb.tsfile.read.common.block.TsBlock;
 
 import org.apache.commons.lang3.Validate;
 
@@ -50,12 +48,12 @@ public class Coordinator {
           IoTDBDescriptor.getInstance().getConfig().getRpcAddress(),
           IoTDBDescriptor.getInstance().getConfig().getMppPort());
 
-  private ExecutorService executor;
-  private ScheduledExecutorService scheduledExecutor;
+  private final ExecutorService executor;
+  private final ScheduledExecutorService scheduledExecutor;
 
   private static final Coordinator INSTANCE = new Coordinator();
 
-  private ConcurrentHashMap<QueryId, QueryExecution> queryExecutionMap;
+  private final ConcurrentHashMap<QueryId, IQueryExecution> queryExecutionMap;
 
   private Coordinator() {
     this.queryExecutionMap = new ConcurrentHashMap<>();
@@ -68,11 +66,11 @@ public class Coordinator {
   }
 
   public ExecutionResult execute(
-      Statement statement, QueryId queryId, QueryType queryType, SessionInfo session, String sql) {
+      Statement statement, QueryId queryId, SessionInfo session, String sql) {
 
     QueryExecution execution =
         createQueryExecution(
-            statement, new MPPQueryContext(sql, queryId, session, queryType, getHostEndpoint()));
+            statement, new MPPQueryContext(sql, queryId, session, getHostEndpoint()));
     queryExecutionMap.put(queryId, execution);
 
     execution.start();
@@ -80,10 +78,10 @@ public class Coordinator {
     return execution.getStatus();
   }
 
-  public TsBlock getResultSet(QueryId queryId) {
-    QueryExecution execution = queryExecutionMap.get(queryId);
+  public IQueryExecution getQueryExecution(QueryId queryId) {
+    IQueryExecution execution = queryExecutionMap.get(queryId);
     Validate.notNull(execution, "invalid queryId %s", queryId.getId());
-    return execution.getBatchResult();
+    return execution;
   }
 
   // TODO: (xingtanzjr) need to redo once we have a concrete policy for the threadPool management
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/IQueryExecution.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/IQueryExecution.java
index 3f9fa05ea8..2e0bd9c76d 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/IQueryExecution.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/IQueryExecution.java
@@ -19,6 +19,9 @@
 
 package org.apache.iotdb.db.mpp.execution;
 
+import org.apache.iotdb.db.mpp.common.header.DatasetHeader;
+import org.apache.iotdb.tsfile.read.common.block.TsBlock;
+
 public interface IQueryExecution {
 
   void start();
@@ -26,4 +29,14 @@ public interface IQueryExecution {
   void stop();
 
   ExecutionResult getStatus();
+
+  TsBlock getBatchResult();
+
+  boolean hasNextResult();
+
+  int getOutputValueColumnCount();
+
+  DatasetHeader getDatasetHeader();
+
+  boolean isQuery();
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/QueryExecution.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/QueryExecution.java
index a5dbeaf21f..885e822ea3 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/QueryExecution.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/QueryExecution.java
@@ -21,10 +21,12 @@ package org.apache.iotdb.db.mpp.execution;
 import org.apache.iotdb.db.mpp.buffer.DataBlockService;
 import org.apache.iotdb.db.mpp.buffer.ISourceHandle;
 import org.apache.iotdb.db.mpp.common.MPPQueryContext;
+import org.apache.iotdb.db.mpp.common.header.DatasetHeader;
 import org.apache.iotdb.db.mpp.execution.scheduler.ClusterScheduler;
 import org.apache.iotdb.db.mpp.execution.scheduler.IScheduler;
 import org.apache.iotdb.db.mpp.sql.analyze.Analysis;
 import org.apache.iotdb.db.mpp.sql.analyze.Analyzer;
+import org.apache.iotdb.db.mpp.sql.analyze.QueryType;
 import org.apache.iotdb.db.mpp.sql.optimization.PlanOptimizer;
 import org.apache.iotdb.db.mpp.sql.planner.DistributionPlanner;
 import org.apache.iotdb.db.mpp.sql.planner.LogicalPlanner;
@@ -55,18 +57,18 @@ import static com.google.common.base.Throwables.throwIfUnchecked;
  * corresponding physical nodes. 3. Collect and monitor the progress/states of this query.
  */
 public class QueryExecution implements IQueryExecution {
-  private MPPQueryContext context;
+  private final MPPQueryContext context;
   private IScheduler scheduler;
-  private QueryStateMachine stateMachine;
+  private final QueryStateMachine stateMachine;
 
-  private List<PlanOptimizer> planOptimizers;
+  private final List<PlanOptimizer> planOptimizers;
 
   private final Analysis analysis;
   private LogicalQueryPlan logicalPlan;
   private DistributedQueryPlan distributedPlan;
 
-  private ExecutorService executor;
-  private ScheduledExecutorService scheduledExecutor;
+  private final ExecutorService executor;
+  private final ScheduledExecutorService scheduledExecutor;
 
   // The result of QueryExecution will be written to the DataBlockManager in current Node.
   // We use this SourceHandle to fetch the TsBlock from it.
@@ -153,6 +155,7 @@ public class QueryExecution implements IQueryExecution {
    * DataStreamManager use the virtual ResultOperator's ID (This part will be designed and
    * implemented with DataStreamManager)
    */
+  @Override
   public TsBlock getBatchResult() {
     try {
       initResultHandle();
@@ -171,6 +174,7 @@ public class QueryExecution implements IQueryExecution {
   }
 
   /** @return true if there is more tsblocks, otherwise false */
+  @Override
   public boolean hasNextResult() {
     try {
       initResultHandle();
@@ -182,9 +186,14 @@ public class QueryExecution implements IQueryExecution {
   }
 
   /** return the result column count without the time column */
+  @Override
   public int getOutputValueColumnCount() {
-    // TODO need return the actual size while there exists output columns in Analysis
-    return 1;
+    return analysis.getRespDatasetHeader().getColumnHeaders().size();
+  }
+
+  @Override
+  public DatasetHeader getDatasetHeader() {
+    return analysis.getRespDatasetHeader();
   }
 
   /**
@@ -242,4 +251,9 @@ public class QueryExecution implements IQueryExecution {
   public LogicalQueryPlan getLogicalPlan() {
     return logicalPlan;
   }
+
+  @Override
+  public boolean isQuery() {
+    return context.getQueryType() == QueryType.READ;
+  }
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/sql/analyze/Analyzer.java b/server/src/main/java/org/apache/iotdb/db/mpp/sql/analyze/Analyzer.java
index 5eca107403..29090cf0bc 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/sql/analyze/Analyzer.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/sql/analyze/Analyzer.java
@@ -173,6 +173,7 @@ public class Analyzer {
     @Override
     public Analysis visitInsert(InsertStatement insertStatement, MPPQueryContext context) {
       // TODO: do analyze for insert statement
+      context.setQueryType(QueryType.WRITE);
       Analysis analysis = new Analysis();
       analysis.setStatement(insertStatement);
       return analysis;
@@ -234,6 +235,7 @@ public class Analyzer {
     @Override
     public Analysis visitInsertTablet(
         InsertTabletStatement insertTabletStatement, MPPQueryContext context) {
+      context.setQueryType(QueryType.WRITE);
       DataPartitionQueryParam dataPartitionQueryParam = new DataPartitionQueryParam();
       dataPartitionQueryParam.setTimePartitionSlotList(
           insertTabletStatement.getTimePartitionSlots());
@@ -264,172 +266,9 @@ public class Analyzer {
       return analysis;
     }
 
-    @Override
-    public Analysis visitShowTimeSeries(
-        ShowTimeSeriesStatement showTimeSeriesStatement, MPPQueryContext context) {
-      SchemaPartition schemaPartitionInfo =
-          partitionFetcher.fetchSchemaPartitionInfo(
-              showTimeSeriesStatement.getPathPattern().getDevice());
-      Analysis analysis = new Analysis();
-      analysis.setStatement(showTimeSeriesStatement);
-      analysis.setSchemaPartitionInfo(schemaPartitionInfo);
-      return analysis;
-    }
-
-    @Override
-    public Analysis visitShowDevices(
-        ShowDevicesStatement showDevicesStatement, MPPQueryContext context) {
-      SchemaPartition schemaPartitionInfo =
-          partitionFetcher.fetchSchemaPartitionInfo(
-              showDevicesStatement.getPathPattern().getFullPath());
-      Analysis analysis = new Analysis();
-      analysis.setStatement(showDevicesStatement);
-      analysis.setSchemaPartitionInfo(schemaPartitionInfo);
-      return analysis;
-    }
-
-    @Override
-    public Analysis visitCreateUser(AuthorStatement authorStatement, MPPQueryContext context) {
-      Analysis analysis = new Analysis();
-      analysis.setStatement(authorStatement);
-      return analysis;
-    }
-
-    @Override
-    public Analysis visitCreateRole(AuthorStatement authorStatement, MPPQueryContext context) {
-      Analysis analysis = new Analysis();
-      analysis.setStatement(authorStatement);
-      return analysis;
-    }
-
-    @Override
-    public Analysis visitAlterUser(AuthorStatement authorStatement, MPPQueryContext context) {
-      Analysis analysis = new Analysis();
-      analysis.setStatement(authorStatement);
-      return analysis;
-    }
-
-    @Override
-    public Analysis visitGrantUser(AuthorStatement authorStatement, MPPQueryContext context) {
-      Analysis analysis = new Analysis();
-      analysis.setStatement(authorStatement);
-      return analysis;
-    }
-
-    @Override
-    public Analysis visitGrantRole(AuthorStatement authorStatement, MPPQueryContext context) {
-      Analysis analysis = new Analysis();
-      analysis.setStatement(authorStatement);
-      return analysis;
-    }
-
-    @Override
-    public Analysis visitGrantRoleToUser(AuthorStatement authorStatement, MPPQueryContext context) {
-      Analysis analysis = new Analysis();
-      analysis.setStatement(authorStatement);
-      return analysis;
-    }
-
-    @Override
-    public Analysis visitRevokeUser(AuthorStatement authorStatement, MPPQueryContext context) {
-      Analysis analysis = new Analysis();
-      analysis.setStatement(authorStatement);
-      return analysis;
-    }
-
-    @Override
-    public Analysis visitRevokeRole(AuthorStatement authorStatement, MPPQueryContext context) {
-      Analysis analysis = new Analysis();
-      analysis.setStatement(authorStatement);
-      return analysis;
-    }
-
-    @Override
-    public Analysis visitRevokeRoleFromUser(
-        AuthorStatement authorStatement, MPPQueryContext context) {
-      Analysis analysis = new Analysis();
-      analysis.setStatement(authorStatement);
-      return analysis;
-    }
-
-    @Override
-    public Analysis visitDropUser(AuthorStatement authorStatement, MPPQueryContext context) {
-      Analysis analysis = new Analysis();
-      analysis.setStatement(authorStatement);
-      return analysis;
-    }
-
-    @Override
-    public Analysis visitDropRole(AuthorStatement authorStatement, MPPQueryContext context) {
-      Analysis analysis = new Analysis();
-      analysis.setStatement(authorStatement);
-      return analysis;
-    }
-
-    @Override
-    public Analysis visitListUser(AuthorStatement authorStatement, MPPQueryContext context) {
-      Analysis analysis = new Analysis();
-      analysis.setStatement(authorStatement);
-      return analysis;
-    }
-
-    @Override
-    public Analysis visitListRole(AuthorStatement authorStatement, MPPQueryContext context) {
-      Analysis analysis = new Analysis();
-      analysis.setStatement(authorStatement);
-      return analysis;
-    }
-
-    @Override
-    public Analysis visitListPrivilegesUser(
-        AuthorStatement authorStatement, MPPQueryContext context) {
-      Analysis analysis = new Analysis();
-      analysis.setStatement(authorStatement);
-      return analysis;
-    }
-
-    @Override
-    public Analysis visitListPrivilegesRole(
-        AuthorStatement authorStatement, MPPQueryContext context) {
-      Analysis analysis = new Analysis();
-      analysis.setStatement(authorStatement);
-      return analysis;
-    }
-
-    @Override
-    public Analysis visitListUserPrivileges(
-        AuthorStatement authorStatement, MPPQueryContext context) {
-      Analysis analysis = new Analysis();
-      analysis.setStatement(authorStatement);
-      return analysis;
-    }
-
-    @Override
-    public Analysis visitListRolePrivileges(
-        AuthorStatement authorStatement, MPPQueryContext context) {
-      Analysis analysis = new Analysis();
-      analysis.setStatement(authorStatement);
-      return analysis;
-    }
-
-    @Override
-    public Analysis visitListAllRoleOfUser(
-        AuthorStatement authorStatement, MPPQueryContext context) {
-      Analysis analysis = new Analysis();
-      analysis.setStatement(authorStatement);
-      return analysis;
-    }
-
-    @Override
-    public Analysis visitListAllUserOfRole(
-        AuthorStatement authorStatement, MPPQueryContext context) {
-      Analysis analysis = new Analysis();
-      analysis.setStatement(authorStatement);
-      return analysis;
-    }
-
     @Override
     public Analysis visitInsertRow(InsertRowStatement insertRowStatement, MPPQueryContext context) {
+      context.setQueryType(QueryType.WRITE);
       DataPartitionQueryParam dataPartitionQueryParam = new DataPartitionQueryParam();
       dataPartitionQueryParam.setDevicePath(insertRowStatement.getDevicePath().getFullPath());
       dataPartitionQueryParam.setTimePartitionSlotList(insertRowStatement.getTimePartitionSlots());
@@ -470,6 +309,7 @@ public class Analyzer {
     public Analysis visitInsertRows(
         InsertRowsStatement insertRowsStatement, MPPQueryContext context) {
       // TODO remove duplicate
+      context.setQueryType(QueryType.WRITE);
       List<DataPartitionQueryParam> dataPartitionQueryParams = new ArrayList<>();
       for (InsertRowStatement insertRowStatement :
           insertRowsStatement.getInsertRowStatementList()) {
@@ -523,6 +363,7 @@ public class Analyzer {
     @Override
     public Analysis visitInsertMultiTablets(
         InsertMultiTabletsStatement insertMultiTabletsStatement, MPPQueryContext context) {
+      context.setQueryType(QueryType.WRITE);
       // TODO remove duplicate
       List<DataPartitionQueryParam> dataPartitionQueryParams = new ArrayList<>();
       for (InsertTabletStatement insertTabletStatement :
@@ -570,6 +411,7 @@ public class Analyzer {
     @Override
     public Analysis visitInsertRowsOfOneDevice(
         InsertRowsOfOneDeviceStatement insertRowsOfOneDeviceStatement, MPPQueryContext context) {
+      context.setQueryType(QueryType.WRITE);
       DataPartitionQueryParam dataPartitionQueryParam = new DataPartitionQueryParam();
       dataPartitionQueryParam.setDevicePath(
           insertRowsOfOneDeviceStatement.getDevicePath().getFullPath());
@@ -615,5 +457,169 @@ public class Analyzer {
 
       return analysis;
     }
+
+    @Override
+    public Analysis visitShowTimeSeries(
+        ShowTimeSeriesStatement showTimeSeriesStatement, MPPQueryContext context) {
+      SchemaPartition schemaPartitionInfo =
+          partitionFetcher.fetchSchemaPartitionInfo(
+              showTimeSeriesStatement.getPathPattern().getDevice());
+      Analysis analysis = new Analysis();
+      analysis.setStatement(showTimeSeriesStatement);
+      analysis.setSchemaPartitionInfo(schemaPartitionInfo);
+      return analysis;
+    }
+
+    @Override
+    public Analysis visitShowDevices(
+        ShowDevicesStatement showDevicesStatement, MPPQueryContext context) {
+      SchemaPartition schemaPartitionInfo =
+          partitionFetcher.fetchSchemaPartitionInfo(
+              showDevicesStatement.getPathPattern().getFullPath());
+      Analysis analysis = new Analysis();
+      analysis.setStatement(showDevicesStatement);
+      analysis.setSchemaPartitionInfo(schemaPartitionInfo);
+      return analysis;
+    }
+
+    @Override
+    public Analysis visitCreateUser(AuthorStatement authorStatement, MPPQueryContext context) {
+      Analysis analysis = new Analysis();
+      analysis.setStatement(authorStatement);
+      return analysis;
+    }
+
+    @Override
+    public Analysis visitCreateRole(AuthorStatement authorStatement, MPPQueryContext context) {
+      Analysis analysis = new Analysis();
+      analysis.setStatement(authorStatement);
+      return analysis;
+    }
+
+    @Override
+    public Analysis visitAlterUser(AuthorStatement authorStatement, MPPQueryContext context) {
+      Analysis analysis = new Analysis();
+      analysis.setStatement(authorStatement);
+      return analysis;
+    }
+
+    @Override
+    public Analysis visitGrantUser(AuthorStatement authorStatement, MPPQueryContext context) {
+      Analysis analysis = new Analysis();
+      analysis.setStatement(authorStatement);
+      return analysis;
+    }
+
+    @Override
+    public Analysis visitGrantRole(AuthorStatement authorStatement, MPPQueryContext context) {
+      Analysis analysis = new Analysis();
+      analysis.setStatement(authorStatement);
+      return analysis;
+    }
+
+    @Override
+    public Analysis visitGrantRoleToUser(AuthorStatement authorStatement, MPPQueryContext context) {
+      Analysis analysis = new Analysis();
+      analysis.setStatement(authorStatement);
+      return analysis;
+    }
+
+    @Override
+    public Analysis visitRevokeUser(AuthorStatement authorStatement, MPPQueryContext context) {
+      Analysis analysis = new Analysis();
+      analysis.setStatement(authorStatement);
+      return analysis;
+    }
+
+    @Override
+    public Analysis visitRevokeRole(AuthorStatement authorStatement, MPPQueryContext context) {
+      Analysis analysis = new Analysis();
+      analysis.setStatement(authorStatement);
+      return analysis;
+    }
+
+    @Override
+    public Analysis visitRevokeRoleFromUser(
+        AuthorStatement authorStatement, MPPQueryContext context) {
+      Analysis analysis = new Analysis();
+      analysis.setStatement(authorStatement);
+      return analysis;
+    }
+
+    @Override
+    public Analysis visitDropUser(AuthorStatement authorStatement, MPPQueryContext context) {
+      Analysis analysis = new Analysis();
+      analysis.setStatement(authorStatement);
+      return analysis;
+    }
+
+    @Override
+    public Analysis visitDropRole(AuthorStatement authorStatement, MPPQueryContext context) {
+      Analysis analysis = new Analysis();
+      analysis.setStatement(authorStatement);
+      return analysis;
+    }
+
+    @Override
+    public Analysis visitListUser(AuthorStatement authorStatement, MPPQueryContext context) {
+      Analysis analysis = new Analysis();
+      analysis.setStatement(authorStatement);
+      return analysis;
+    }
+
+    @Override
+    public Analysis visitListRole(AuthorStatement authorStatement, MPPQueryContext context) {
+      Analysis analysis = new Analysis();
+      analysis.setStatement(authorStatement);
+      return analysis;
+    }
+
+    @Override
+    public Analysis visitListPrivilegesUser(
+        AuthorStatement authorStatement, MPPQueryContext context) {
+      Analysis analysis = new Analysis();
+      analysis.setStatement(authorStatement);
+      return analysis;
+    }
+
+    @Override
+    public Analysis visitListPrivilegesRole(
+        AuthorStatement authorStatement, MPPQueryContext context) {
+      Analysis analysis = new Analysis();
+      analysis.setStatement(authorStatement);
+      return analysis;
+    }
+
+    @Override
+    public Analysis visitListUserPrivileges(
+        AuthorStatement authorStatement, MPPQueryContext context) {
+      Analysis analysis = new Analysis();
+      analysis.setStatement(authorStatement);
+      return analysis;
+    }
+
+    @Override
+    public Analysis visitListRolePrivileges(
+        AuthorStatement authorStatement, MPPQueryContext context) {
+      Analysis analysis = new Analysis();
+      analysis.setStatement(authorStatement);
+      return analysis;
+    }
+
+    @Override
+    public Analysis visitListAllRoleOfUser(
+        AuthorStatement authorStatement, MPPQueryContext context) {
+      Analysis analysis = new Analysis();
+      analysis.setStatement(authorStatement);
+      return analysis;
+    }
+
+    @Override
+    public Analysis visitListAllUserOfRole(
+        AuthorStatement authorStatement, MPPQueryContext context) {
+      Analysis analysis = new Analysis();
+      analysis.setStatement(authorStatement);
+      return analysis;
+    }
   }
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/service/RPCService.java b/server/src/main/java/org/apache/iotdb/db/service/RPCService.java
index dc0ee3987c..a376eaece0 100644
--- a/server/src/main/java/org/apache/iotdb/db/service/RPCService.java
+++ b/server/src/main/java/org/apache/iotdb/db/service/RPCService.java
@@ -31,6 +31,8 @@ import org.apache.iotdb.db.service.thrift.impl.TSServiceImpl;
 import org.apache.iotdb.metrics.config.MetricConfigDescriptor;
 import org.apache.iotdb.service.rpc.thrift.TSIService.Processor;
 
+import java.lang.reflect.InvocationTargetException;
+
 /** A service to handle jdbc request from client. */
 public class RPCService extends ThriftService implements RPCServiceMBean {
 
@@ -47,10 +49,12 @@ public class RPCService extends ThriftService implements RPCServiceMBean {
 
   @Override
   public void initTProcessor()
-      throws ClassNotFoundException, IllegalAccessException, InstantiationException {
+      throws ClassNotFoundException, IllegalAccessException, InstantiationException,
+          NoSuchMethodException, InvocationTargetException {
     impl =
         (TSServiceImpl)
             Class.forName(IoTDBDescriptor.getInstance().getConfig().getRpcImplClassName())
+                .getDeclaredConstructor()
                 .newInstance();
     initSyncedServiceImpl(null);
     if (MetricConfigDescriptor.getInstance().getMetricConfig().getEnableMetric()) {
diff --git a/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/DataNodeTSIServiceImpl.java b/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/DataNodeTSIServiceImpl.java
new file mode 100644
index 0000000000..af372e4319
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/DataNodeTSIServiceImpl.java
@@ -0,0 +1,724 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.db.service.thrift.impl;
+
+import org.apache.iotdb.common.rpc.thrift.TSStatus;
+import org.apache.iotdb.commons.conf.IoTDBConstant;
+import org.apache.iotdb.db.conf.IoTDBDescriptor;
+import org.apache.iotdb.db.conf.OperationType;
+import org.apache.iotdb.db.mpp.common.QueryId;
+import org.apache.iotdb.db.mpp.common.header.DatasetHeader;
+import org.apache.iotdb.db.mpp.execution.Coordinator;
+import org.apache.iotdb.db.mpp.execution.ExecutionResult;
+import org.apache.iotdb.db.mpp.execution.IQueryExecution;
+import org.apache.iotdb.db.mpp.sql.parser.StatementGenerator;
+import org.apache.iotdb.db.mpp.sql.statement.Statement;
+import org.apache.iotdb.db.mpp.sql.statement.crud.InsertMultiTabletsStatement;
+import org.apache.iotdb.db.mpp.sql.statement.crud.InsertRowStatement;
+import org.apache.iotdb.db.mpp.sql.statement.crud.InsertRowsOfOneDeviceStatement;
+import org.apache.iotdb.db.mpp.sql.statement.crud.InsertRowsStatement;
+import org.apache.iotdb.db.mpp.sql.statement.crud.InsertTabletStatement;
+import org.apache.iotdb.db.query.control.SessionManager;
+import org.apache.iotdb.db.service.basic.BasicOpenSessionResp;
+import org.apache.iotdb.db.service.metrics.MetricsService;
+import org.apache.iotdb.db.service.metrics.Operation;
+import org.apache.iotdb.db.utils.QueryDataSetUtils;
+import org.apache.iotdb.metrics.utils.MetricLevel;
+import org.apache.iotdb.rpc.RpcUtils;
+import org.apache.iotdb.rpc.TSStatusCode;
+import org.apache.iotdb.service.rpc.thrift.*;
+
+import org.apache.thrift.TException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.time.ZoneId;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+
+import static org.apache.iotdb.db.service.basic.ServiceProvider.AUDIT_LOGGER;
+import static org.apache.iotdb.db.service.basic.ServiceProvider.CONFIG;
+import static org.apache.iotdb.db.service.basic.ServiceProvider.CURRENT_RPC_VERSION;
+import static org.apache.iotdb.db.service.basic.ServiceProvider.QUERY_FREQUENCY_RECORDER;
+import static org.apache.iotdb.db.service.basic.ServiceProvider.QUERY_TIME_MANAGER;
+import static org.apache.iotdb.db.service.basic.ServiceProvider.SLOW_SQL_LOGGER;
+import static org.apache.iotdb.db.utils.ErrorHandlingUtils.onNPEOrUnexpectedException;
+import static org.apache.iotdb.db.utils.ErrorHandlingUtils.onQueryException;
+
+public class DataNodeTSIServiceImpl implements TSIService.Iface {
+
+  private static final Logger LOGGER = LoggerFactory.getLogger(DataNodeTSIServiceImpl.class);
+
+  private static final Coordinator COORDINATOR = Coordinator.getInstance();
+
+  private static final SessionManager SESSION_MANAGER = SessionManager.getInstance();
+
+  @Override
+  public TSOpenSessionResp openSession(TSOpenSessionReq req) throws TException {
+    IoTDBConstant.ClientVersion clientVersion = parseClientVersion(req);
+    BasicOpenSessionResp openSessionResp =
+        SESSION_MANAGER.openSession(
+            req.username, req.password, req.zoneId, req.client_protocol, clientVersion);
+    TSStatus tsStatus = RpcUtils.getStatus(openSessionResp.getCode(), openSessionResp.getMessage());
+    TSOpenSessionResp resp = new TSOpenSessionResp(tsStatus, CURRENT_RPC_VERSION);
+    return resp.setSessionId(openSessionResp.getSessionId());
+  }
+
+  private IoTDBConstant.ClientVersion parseClientVersion(TSOpenSessionReq req) {
+    Map<String, String> configuration = req.configuration;
+    if (configuration != null && configuration.containsKey("version")) {
+      return IoTDBConstant.ClientVersion.valueOf(configuration.get("version"));
+    }
+    return IoTDBConstant.ClientVersion.V_0_12;
+  }
+
+  @Override
+  public TSStatus closeSession(TSCloseSessionReq req) {
+    return new TSStatus(
+        !SESSION_MANAGER.closeSession(req.sessionId)
+            ? RpcUtils.getStatus(TSStatusCode.NOT_LOGIN_ERROR)
+            : RpcUtils.getStatus(TSStatusCode.SUCCESS_STATUS));
+  }
+
+  @Override
+  public TSStatus cancelOperation(TSCancelOperationReq req) {
+    // TODO implement
+    return RpcUtils.getStatus(TSStatusCode.QUERY_NOT_ALLOWED, "Cancellation is not implemented");
+  }
+
+  @Override
+  public TSStatus closeOperation(TSCloseOperationReq req) {
+    return SESSION_MANAGER.closeOperation(
+        req.sessionId, req.queryId, req.statementId, req.isSetStatementId(), req.isSetQueryId());
+  }
+
+  @Override
+  public TSGetTimeZoneResp getTimeZone(long sessionId) {
+    try {
+      ZoneId zoneId = SESSION_MANAGER.getZoneId(sessionId);
+      return new TSGetTimeZoneResp(
+          RpcUtils.getStatus(TSStatusCode.SUCCESS_STATUS),
+          zoneId != null ? zoneId.toString() : "Unknown time zone");
+    } catch (Exception e) {
+      return new TSGetTimeZoneResp(
+          onNPEOrUnexpectedException(
+              e, OperationType.GET_TIME_ZONE, TSStatusCode.GENERATE_TIME_ZONE_ERROR),
+          "Unknown time zone");
+    }
+  }
+
+  @Override
+  public TSStatus setTimeZone(TSSetTimeZoneReq req) {
+    try {
+      SESSION_MANAGER.setTimezone(req.sessionId, req.timeZone);
+      return RpcUtils.getStatus(TSStatusCode.SUCCESS_STATUS);
+    } catch (Exception e) {
+      return onNPEOrUnexpectedException(
+          e, OperationType.SET_TIME_ZONE, TSStatusCode.SET_TIME_ZONE_ERROR);
+    }
+  }
+
+  @Override
+  public ServerProperties getProperties() {
+    ServerProperties properties = new ServerProperties();
+    properties.setVersion(IoTDBConstant.VERSION);
+    LOGGER.info("IoTDB server version: {}", IoTDBConstant.VERSION);
+    properties.setSupportedTimeAggregationOperations(new ArrayList<>());
+    properties.getSupportedTimeAggregationOperations().add(IoTDBConstant.MAX_TIME);
+    properties.getSupportedTimeAggregationOperations().add(IoTDBConstant.MIN_TIME);
+    properties.setTimestampPrecision(
+        IoTDBDescriptor.getInstance().getConfig().getTimestampPrecision());
+    properties.setMaxConcurrentClientNum(
+        IoTDBDescriptor.getInstance().getConfig().getRpcMaxConcurrentClientNum());
+    properties.setWatermarkSecretKey(
+        IoTDBDescriptor.getInstance().getConfig().getWatermarkSecretKey());
+    properties.setWatermarkBitString(
+        IoTDBDescriptor.getInstance().getConfig().getWatermarkBitString());
+    properties.setWatermarkParamMarkRate(
+        IoTDBDescriptor.getInstance().getConfig().getWatermarkParamMarkRate());
+    properties.setWatermarkParamMaxRightBit(
+        IoTDBDescriptor.getInstance().getConfig().getWatermarkParamMaxRightBit());
+    properties.setIsReadOnly(IoTDBDescriptor.getInstance().getConfig().isReadOnly());
+    properties.setThriftMaxFrameSize(
+        IoTDBDescriptor.getInstance().getConfig().getThriftMaxFrameSize());
+    return properties;
+  }
+
+  @Override
+  public TSStatus setStorageGroup(long sessionId, String storageGroup) {
+    throw new UnsupportedOperationException();
+  }
+
+  @Override
+  public TSStatus createTimeseries(TSCreateTimeseriesReq req) {
+    throw new UnsupportedOperationException();
+  }
+
+  @Override
+  public TSStatus createAlignedTimeseries(TSCreateAlignedTimeseriesReq req) {
+    throw new UnsupportedOperationException();
+  }
+
+  @Override
+  public TSStatus createMultiTimeseries(TSCreateMultiTimeseriesReq req) {
+    throw new UnsupportedOperationException();
+  }
+
+  @Override
+  public TSStatus deleteTimeseries(long sessionId, List<String> path) {
+    throw new UnsupportedOperationException();
+  }
+
+  @Override
+  public TSStatus deleteStorageGroups(long sessionId, List<String> storageGroup) {
+    throw new UnsupportedOperationException();
+  }
+
+  @Override
+  public TSFetchMetadataResp fetchMetadata(TSFetchMetadataReq req) {
+    throw new UnsupportedOperationException();
+  }
+
+  @Override
+  public TSExecuteStatementResp executeStatement(TSExecuteStatementReq req) {
+    String statement = req.getStatement();
+    if (!SESSION_MANAGER.checkLogin(req.getSessionId())) {
+      return RpcUtils.getTSExecuteStatementResp(getNotLoggedInStatus());
+    }
+
+    long startTime = System.currentTimeMillis();
+    Statement s =
+        StatementGenerator.createStatement(
+            statement, SESSION_MANAGER.getZoneId(req.getSessionId()));
+
+    QUERY_FREQUENCY_RECORDER.incrementAndGet();
+    AUDIT_LOGGER.debug("Session {} execute Query: {}", req.sessionId, statement);
+
+    try {
+      long queryId = SESSION_MANAGER.requestQueryId(req.statementId, true);
+      QueryId id = new QueryId(String.valueOf(queryId));
+      // create and cache dataset
+      ExecutionResult result =
+          COORDINATOR.execute(s, id, SESSION_MANAGER.getSessionInfo(req.sessionId), statement);
+
+      if (result.status.code != TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
+        throw new RuntimeException("");
+      }
+
+      IQueryExecution queryExecution = COORDINATOR.getQueryExecution(id);
+
+      TSExecuteStatementResp resp;
+      if (queryExecution.isQuery()) {
+        resp = createResponse(queryExecution.getDatasetHeader(), queryId);
+        resp.setStatus(result.status);
+        resp.setQueryDataSet(
+            QueryDataSetUtils.convertTsBlockByFetchSize(queryExecution, req.fetchSize));
+      } else {
+        resp = RpcUtils.getTSExecuteStatementResp(result.status);
+      }
+
+      return resp;
+    } catch (Exception e) {
+      // TODO call the coordinator to release query resource
+      return RpcUtils.getTSExecuteStatementResp(
+          onQueryException(e, "\"" + statement + "\". " + OperationType.EXECUTE_STATEMENT));
+    } finally {
+      addOperationLatency(Operation.EXECUTE_QUERY, startTime);
+      long costTime = System.currentTimeMillis() - startTime;
+      if (costTime >= CONFIG.getSlowQueryThreshold()) {
+        SLOW_SQL_LOGGER.info("Cost: {} ms, sql is {}", costTime, statement);
+      }
+    }
+  }
+
+  @Override
+  public TSStatus executeBatchStatement(TSExecuteBatchStatementReq req) {
+    throw new UnsupportedOperationException();
+  }
+
+  @Override
+  public TSExecuteStatementResp executeQueryStatement(TSExecuteStatementReq req) throws TException {
+    return executeStatement(req);
+  }
+
+  @Override
+  public TSExecuteStatementResp executeUpdateStatement(TSExecuteStatementReq req)
+      throws TException {
+    return executeStatement(req);
+  }
+
+  @Override
+  public TSFetchResultsResp fetchResults(TSFetchResultsReq req) {
+    try {
+      if (!SESSION_MANAGER.checkLogin(req.getSessionId())) {
+        return RpcUtils.getTSFetchResultsResp(getNotLoggedInStatus());
+      }
+
+      TSFetchResultsResp resp = RpcUtils.getTSFetchResultsResp(TSStatusCode.SUCCESS_STATUS);
+      TSQueryDataSet result =
+          QueryDataSetUtils.convertTsBlockByFetchSize(
+              COORDINATOR.getQueryExecution(new QueryId(String.valueOf(req.queryId))),
+              req.fetchSize);
+      boolean hasResultSet = result.bufferForTime().limit() != 0;
+
+      resp.setHasResultSet(hasResultSet);
+      resp.setQueryDataSet(result);
+      resp.setIsAlign(true);
+
+      QUERY_TIME_MANAGER.unRegisterQuery(req.queryId, false);
+      return resp;
+
+    } catch (Exception e) {
+      return RpcUtils.getTSFetchResultsResp(onQueryException(e, OperationType.FETCH_RESULTS));
+    }
+  }
+
+  @Override
+  public TSStatus insertRecords(TSInsertRecordsReq req) {
+    long t1 = System.currentTimeMillis();
+    try {
+      if (!SESSION_MANAGER.checkLogin(req.getSessionId())) {
+        return getNotLoggedInStatus();
+      }
+
+      if (AUDIT_LOGGER.isDebugEnabled()) {
+        AUDIT_LOGGER.debug(
+            "Session {} insertRecords, first device {}, first time {}",
+            SESSION_MANAGER.getCurrSessionId(),
+            req.prefixPaths.get(0),
+            req.getTimestamps().get(0));
+      }
+
+      // Step 1: TODO(INSERT) transfer from TSInsertTabletsReq to Statement
+      InsertRowsStatement statement = (InsertRowsStatement) StatementGenerator.createStatement(req);
+
+      // Step 2: call the coordinator
+      long queryId = SESSION_MANAGER.requestQueryId(false);
+      ExecutionResult result =
+          COORDINATOR.execute(
+              statement,
+              new QueryId(String.valueOf(queryId)),
+              SESSION_MANAGER.getSessionInfo(req.sessionId),
+              "");
+
+      // TODO(INSERT) do this check in analyze
+      //      TSStatus status = serviceProvider.checkAuthority(insertTabletPlan,
+      // req.getSessionId());
+      return result.status;
+    } catch (Exception e) {
+      return onNPEOrUnexpectedException(
+          e, OperationType.INSERT_TABLET, TSStatusCode.EXECUTE_STATEMENT_ERROR);
+    } finally {
+      addOperationLatency(Operation.EXECUTE_RPC_BATCH_INSERT, t1);
+    }
+  }
+
+  @Override
+  public TSStatus insertRecordsOfOneDevice(TSInsertRecordsOfOneDeviceReq req) {
+    long t1 = System.currentTimeMillis();
+    try {
+      if (!SESSION_MANAGER.checkLogin(req.getSessionId())) {
+        return getNotLoggedInStatus();
+      }
+
+      if (AUDIT_LOGGER.isDebugEnabled()) {
+        AUDIT_LOGGER.debug(
+            "Session {} insertRecords, device {}, first time {}",
+            SESSION_MANAGER.getCurrSessionId(),
+            req.prefixPath,
+            req.getTimestamps().get(0));
+      }
+
+      // Step 1: TODO(INSERT) transfer from TSInsertTabletsReq to Statement
+      InsertRowsOfOneDeviceStatement statement =
+          (InsertRowsOfOneDeviceStatement) StatementGenerator.createStatement(req);
+
+      // Step 2: call the coordinator
+      long queryId = SESSION_MANAGER.requestQueryId(false);
+      ExecutionResult result =
+          COORDINATOR.execute(
+              statement,
+              new QueryId(String.valueOf(queryId)),
+              SESSION_MANAGER.getSessionInfo(req.sessionId),
+              "");
+
+      // TODO(INSERT) do this check in analyze
+      //      TSStatus status = serviceProvider.checkAuthority(insertTabletPlan,
+      // req.getSessionId());
+      return result.status;
+    } catch (Exception e) {
+      return onNPEOrUnexpectedException(
+          e, OperationType.INSERT_TABLET, TSStatusCode.EXECUTE_STATEMENT_ERROR);
+    } finally {
+      addOperationLatency(Operation.EXECUTE_RPC_BATCH_INSERT, t1);
+    }
+  }
+
+  @Override
+  public TSStatus insertStringRecordsOfOneDevice(TSInsertStringRecordsOfOneDeviceReq req) {
+    long t1 = System.currentTimeMillis();
+    try {
+      if (!SESSION_MANAGER.checkLogin(req.getSessionId())) {
+        return getNotLoggedInStatus();
+      }
+
+      if (AUDIT_LOGGER.isDebugEnabled()) {
+        AUDIT_LOGGER.debug(
+            "Session {} insertRecords, device {}, first time {}",
+            SESSION_MANAGER.getCurrSessionId(),
+            req.prefixPath,
+            req.getTimestamps().get(0));
+      }
+
+      // Step 1: TODO(INSERT) transfer from TSInsertTabletsReq to Statement
+      InsertRowsOfOneDeviceStatement statement =
+          (InsertRowsOfOneDeviceStatement) StatementGenerator.createStatement(req);
+
+      // Step 2: call the coordinator
+      long queryId = SESSION_MANAGER.requestQueryId(false);
+      ExecutionResult result =
+          COORDINATOR.execute(
+              statement,
+              new QueryId(String.valueOf(queryId)),
+              SESSION_MANAGER.getSessionInfo(req.sessionId),
+              "");
+
+      // TODO(INSERT) do this check in analyze
+      //      TSStatus status = serviceProvider.checkAuthority(insertTabletPlan,
+      // req.getSessionId());
+      return result.status;
+    } catch (Exception e) {
+      return onNPEOrUnexpectedException(
+          e, OperationType.INSERT_TABLET, TSStatusCode.EXECUTE_STATEMENT_ERROR);
+    } finally {
+      addOperationLatency(Operation.EXECUTE_RPC_BATCH_INSERT, t1);
+    }
+  }
+
+  @Override
+  public TSStatus insertRecord(TSInsertRecordReq req) {
+    long t1 = System.currentTimeMillis();
+    try {
+      if (!SESSION_MANAGER.checkLogin(req.getSessionId())) {
+        return getNotLoggedInStatus();
+      }
+
+      AUDIT_LOGGER.debug(
+          "Session {} insertRecord, device {}, time {}",
+          SESSION_MANAGER.getCurrSessionId(),
+          req.getPrefixPath(),
+          req.getTimestamp());
+
+      InsertRowStatement statement = (InsertRowStatement) StatementGenerator.createStatement(req);
+
+      // Step 2: call the coordinator
+      long queryId = SESSION_MANAGER.requestQueryId(false);
+      ExecutionResult result =
+          COORDINATOR.execute(
+              statement,
+              new QueryId(String.valueOf(queryId)),
+              SESSION_MANAGER.getSessionInfo(req.sessionId),
+              "");
+
+      // TODO(INSERT) do this check in analyze
+      //      TSStatus status = SESSION_MANAGER.checkAuthority(insertTabletPlan,
+      // req.getSessionId());
+      return result.status;
+    } catch (Exception e) {
+      return onNPEOrUnexpectedException(
+          e, OperationType.INSERT_TABLET, TSStatusCode.EXECUTE_STATEMENT_ERROR);
+    } finally {
+      addOperationLatency(Operation.EXECUTE_RPC_BATCH_INSERT, t1);
+    }
+  }
+
+  @Override
+  public TSStatus insertTablets(TSInsertTabletsReq req) {
+    long t1 = System.currentTimeMillis();
+    try {
+      if (!SESSION_MANAGER.checkLogin(req.getSessionId())) {
+        return getNotLoggedInStatus();
+      }
+
+      // Step 1: TODO(INSERT) transfer from TSInsertTabletsReq to Statement
+      InsertMultiTabletsStatement statement =
+          (InsertMultiTabletsStatement) StatementGenerator.createStatement(req);
+      // Step 2: call the coordinator
+      long queryId = SESSION_MANAGER.requestQueryId(false);
+      ExecutionResult result =
+          COORDINATOR.execute(
+              statement,
+              new QueryId(String.valueOf(queryId)),
+              SESSION_MANAGER.getSessionInfo(req.sessionId),
+              "");
+
+      // TODO(INSERT) do this check in analyze
+      //      TSStatus status = serviceProvider.checkAuthority(insertTabletPlan,
+      // req.getSessionId());
+      return result.status;
+    } catch (Exception e) {
+      return onNPEOrUnexpectedException(
+          e, OperationType.INSERT_TABLET, TSStatusCode.EXECUTE_STATEMENT_ERROR);
+    } finally {
+      addOperationLatency(Operation.EXECUTE_RPC_BATCH_INSERT, t1);
+    }
+  }
+
+  @Override
+  public TSStatus insertTablet(TSInsertTabletReq req) {
+    long t1 = System.currentTimeMillis();
+    try {
+      if (!SESSION_MANAGER.checkLogin(req.getSessionId())) {
+        return getNotLoggedInStatus();
+      }
+
+      // Step 1: TODO(INSERT) transfer from TSInsertTabletReq to Statement
+      InsertTabletStatement statement =
+          (InsertTabletStatement) StatementGenerator.createStatement(req);
+
+      // Step 2: call the coordinator
+      long queryId = SESSION_MANAGER.requestQueryId(false);
+      ExecutionResult result =
+          COORDINATOR.execute(
+              statement,
+              new QueryId(String.valueOf(queryId)),
+              SESSION_MANAGER.getSessionInfo(req.sessionId),
+              "");
+
+      // TODO(INSERT) do this check in analyze
+      //      TSStatus status = SESSION_MANAGER.checkAuthority(insertTabletPlan,
+      // req.getSessionId());
+      return result.status;
+    } catch (Exception e) {
+      return onNPEOrUnexpectedException(
+          e, OperationType.INSERT_TABLET, TSStatusCode.EXECUTE_STATEMENT_ERROR);
+    } finally {
+      addOperationLatency(Operation.EXECUTE_RPC_BATCH_INSERT, t1);
+    }
+  }
+
+  @Override
+  public TSStatus insertStringRecords(TSInsertStringRecordsReq req) {
+    long t1 = System.currentTimeMillis();
+    try {
+      if (!SESSION_MANAGER.checkLogin(req.getSessionId())) {
+        return getNotLoggedInStatus();
+      }
+
+      if (AUDIT_LOGGER.isDebugEnabled()) {
+        AUDIT_LOGGER.debug(
+            "Session {} insertRecords, first device {}, first time {}",
+            SESSION_MANAGER.getCurrSessionId(),
+            req.prefixPaths.get(0),
+            req.getTimestamps().get(0));
+      }
+
+      InsertRowsStatement statement = (InsertRowsStatement) StatementGenerator.createStatement(req);
+
+      long queryId = SESSION_MANAGER.requestQueryId(false);
+      ExecutionResult result =
+          COORDINATOR.execute(
+              statement,
+              new QueryId(String.valueOf(queryId)),
+              SESSION_MANAGER.getSessionInfo(req.sessionId),
+              "");
+
+      // TODO(INSERT) do this check in analyze
+      //      TSStatus status = serviceProvider.checkAuthority(insertTabletPlan,
+      // req.getSessionId());
+      return result.status;
+    } catch (Exception e) {
+      return onNPEOrUnexpectedException(
+          e, OperationType.INSERT_TABLET, TSStatusCode.EXECUTE_STATEMENT_ERROR);
+    } finally {
+      addOperationLatency(Operation.EXECUTE_RPC_BATCH_INSERT, t1);
+    }
+  }
+
+  @Override
+  public TSStatus testInsertTablet(TSInsertTabletReq req) {
+    LOGGER.debug("Test insert batch request receive.");
+    return RpcUtils.getStatus(TSStatusCode.SUCCESS_STATUS);
+  }
+
+  @Override
+  public TSStatus testInsertTablets(TSInsertTabletsReq req) {
+    LOGGER.debug("Test insert batch request receive.");
+    return RpcUtils.getStatus(TSStatusCode.SUCCESS_STATUS);
+  }
+
+  @Override
+  public TSStatus testInsertRecord(TSInsertRecordReq req) {
+    LOGGER.debug("Test insert row request receive.");
+    return RpcUtils.getStatus(TSStatusCode.SUCCESS_STATUS);
+  }
+
+  @Override
+  public TSStatus testInsertStringRecord(TSInsertStringRecordReq req) {
+    LOGGER.debug("Test insert string record request receive.");
+    return RpcUtils.getStatus(TSStatusCode.SUCCESS_STATUS);
+  }
+
+  @Override
+  public TSStatus testInsertRecords(TSInsertRecordsReq req) {
+    LOGGER.debug("Test insert row in batch request receive.");
+    return RpcUtils.getStatus(TSStatusCode.SUCCESS_STATUS);
+  }
+
+  @Override
+  public TSStatus testInsertRecordsOfOneDevice(TSInsertRecordsOfOneDeviceReq req) {
+    LOGGER.debug("Test insert rows in batch request receive.");
+    return RpcUtils.getStatus(TSStatusCode.SUCCESS_STATUS);
+  }
+
+  @Override
+  public TSStatus testInsertStringRecords(TSInsertStringRecordsReq req) {
+    LOGGER.debug("Test insert string records request receive.");
+    return RpcUtils.getStatus(TSStatusCode.SUCCESS_STATUS);
+  }
+
+  @Override
+  public TSStatus deleteData(TSDeleteDataReq req) {
+    throw new UnsupportedOperationException();
+  }
+
+  @Override
+  public TSExecuteStatementResp executeRawDataQuery(TSRawDataQueryReq req) {
+    throw new UnsupportedOperationException();
+  }
+
+  @Override
+  public TSExecuteStatementResp executeLastDataQuery(TSLastDataQueryReq req) {
+    throw new UnsupportedOperationException();
+  }
+
+  @Override
+  public long requestStatementId(long sessionId) {
+    return SESSION_MANAGER.requestStatementId(sessionId);
+  }
+
+  @Override
+  public TSStatus createSchemaTemplate(TSCreateSchemaTemplateReq req) {
+    throw new UnsupportedOperationException();
+  }
+
+  @Override
+  public TSStatus appendSchemaTemplate(TSAppendSchemaTemplateReq req) {
+    throw new UnsupportedOperationException();
+  }
+
+  @Override
+  public TSStatus pruneSchemaTemplate(TSPruneSchemaTemplateReq req) {
+    throw new UnsupportedOperationException();
+  }
+
+  @Override
+  public TSQueryTemplateResp querySchemaTemplate(TSQueryTemplateReq req) {
+    throw new UnsupportedOperationException();
+  }
+
+  @Override
+  public TSStatus setSchemaTemplate(TSSetSchemaTemplateReq req) throws TException {
+    throw new UnsupportedOperationException();
+  }
+
+  @Override
+  public TSStatus unsetSchemaTemplate(TSUnsetSchemaTemplateReq req) throws TException {
+    throw new UnsupportedOperationException();
+  }
+
+  @Override
+  public TSStatus dropSchemaTemplate(TSDropSchemaTemplateReq req) throws TException {
+    throw new UnsupportedOperationException();
+  }
+
+  @Override
+  public TSStatus insertStringRecord(TSInsertStringRecordReq req) {
+    long t1 = System.currentTimeMillis();
+    try {
+      if (!SESSION_MANAGER.checkLogin(req.getSessionId())) {
+        return getNotLoggedInStatus();
+      }
+
+      AUDIT_LOGGER.debug(
+          "Session {} insertRecord, device {}, time {}",
+          SESSION_MANAGER.getCurrSessionId(),
+          req.getPrefixPath(),
+          req.getTimestamp());
+
+      InsertRowStatement statement = (InsertRowStatement) StatementGenerator.createStatement(req);
+
+      // Step 2: call the coordinator
+      long queryId = SESSION_MANAGER.requestQueryId(false);
+      ExecutionResult result =
+          COORDINATOR.execute(
+              statement,
+              new QueryId(String.valueOf(queryId)),
+              SESSION_MANAGER.getSessionInfo(req.sessionId),
+              "");
+
+      // TODO(INSERT) do this check in analyze
+      //      TSStatus status = serviceProvider.checkAuthority(insertTabletPlan,
+      // req.getSessionId());
+      return result.status;
+    } catch (Exception e) {
+      return onNPEOrUnexpectedException(
+          e, OperationType.INSERT_TABLET, TSStatusCode.EXECUTE_STATEMENT_ERROR);
+    } finally {
+      addOperationLatency(Operation.EXECUTE_RPC_BATCH_INSERT, t1);
+    }
+  }
+
+  private TSExecuteStatementResp createResponse(DatasetHeader header, long queryId) {
+    TSExecuteStatementResp resp = RpcUtils.getTSExecuteStatementResp(TSStatusCode.SUCCESS_STATUS);
+    resp.setColumnNameIndexMap(header.getColumnNameIndexMap());
+    // TODO deal with the sg name here
+    resp.setSgColumns(new ArrayList<>());
+    resp.setColumns(header.getRespColumns());
+    resp.setDataTypeList(header.getRespDataTypeList());
+    resp.setAliasColumns(header.getRespAliasColumns());
+    resp.setIgnoreTimeStamp(header.isIgnoreTimestamp());
+    resp.setQueryId(queryId);
+    return resp;
+  }
+
+  private TSStatus getNotLoggedInStatus() {
+    return RpcUtils.getStatus(
+        TSStatusCode.NOT_LOGIN_ERROR,
+        "Log in failed. Either you are not authorized or the session has timed out.");
+  }
+
+  /** Add stat of operation into metrics */
+  private void addOperationLatency(Operation operation, long startTime) {
+    if (CONFIG.isEnablePerformanceStat()) {
+      MetricsService.getInstance()
+          .getMetricManager()
+          .histogram(
+              System.currentTimeMillis() - startTime,
+              "operation_histogram",
+              MetricLevel.IMPORTANT,
+              "name",
+              operation.getName());
+      MetricsService.getInstance()
+          .getMetricManager()
+          .count(1, "operation_count", MetricLevel.IMPORTANT, "name", operation.getName());
+    }
+  }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/TSServiceImpl.java b/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/TSServiceImpl.java
index 6291b14875..8866bf361d 100644
--- a/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/TSServiceImpl.java
+++ b/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/TSServiceImpl.java
@@ -37,16 +37,17 @@ import org.apache.iotdb.db.exception.query.QueryProcessException;
 import org.apache.iotdb.db.metadata.path.MeasurementPath;
 import org.apache.iotdb.db.metadata.path.PartialPath;
 import org.apache.iotdb.db.metadata.template.TemplateQueryType;
-import org.apache.iotdb.db.mpp.common.QueryId;
-import org.apache.iotdb.db.mpp.execution.Coordinator;
-import org.apache.iotdb.db.mpp.execution.ExecutionResult;
-import org.apache.iotdb.db.mpp.sql.analyze.QueryType;
-import org.apache.iotdb.db.mpp.sql.parser.StatementGenerator;
-import org.apache.iotdb.db.mpp.sql.statement.crud.*;
 import org.apache.iotdb.db.qp.logical.Operator.OperatorType;
 import org.apache.iotdb.db.qp.physical.PhysicalPlan;
-import org.apache.iotdb.db.qp.physical.crud.*;
+import org.apache.iotdb.db.qp.physical.crud.DeletePlan;
 import org.apache.iotdb.db.qp.physical.crud.InsertMultiTabletsPlan;
+import org.apache.iotdb.db.qp.physical.crud.InsertRowPlan;
+import org.apache.iotdb.db.qp.physical.crud.InsertRowsOfOneDevicePlan;
+import org.apache.iotdb.db.qp.physical.crud.InsertRowsPlan;
+import org.apache.iotdb.db.qp.physical.crud.InsertTabletPlan;
+import org.apache.iotdb.db.qp.physical.crud.QueryPlan;
+import org.apache.iotdb.db.qp.physical.crud.SelectIntoPlan;
+import org.apache.iotdb.db.qp.physical.crud.UDFPlan;
 import org.apache.iotdb.db.qp.physical.sys.AppendTemplatePlan;
 import org.apache.iotdb.db.qp.physical.sys.CreateAlignedTimeSeriesPlan;
 import org.apache.iotdb.db.qp.physical.sys.CreateMultiTimeSeriesPlan;
@@ -79,47 +80,7 @@ import org.apache.iotdb.metrics.utils.MetricLevel;
 import org.apache.iotdb.rpc.RedirectException;
 import org.apache.iotdb.rpc.RpcUtils;
 import org.apache.iotdb.rpc.TSStatusCode;
-import org.apache.iotdb.service.rpc.thrift.ServerProperties;
-import org.apache.iotdb.service.rpc.thrift.TSAppendSchemaTemplateReq;
-import org.apache.iotdb.service.rpc.thrift.TSCancelOperationReq;
-import org.apache.iotdb.service.rpc.thrift.TSCloseOperationReq;
-import org.apache.iotdb.service.rpc.thrift.TSCloseSessionReq;
-import org.apache.iotdb.service.rpc.thrift.TSCreateAlignedTimeseriesReq;
-import org.apache.iotdb.service.rpc.thrift.TSCreateMultiTimeseriesReq;
-import org.apache.iotdb.service.rpc.thrift.TSCreateSchemaTemplateReq;
-import org.apache.iotdb.service.rpc.thrift.TSCreateTimeseriesReq;
-import org.apache.iotdb.service.rpc.thrift.TSDeleteDataReq;
-import org.apache.iotdb.service.rpc.thrift.TSDropSchemaTemplateReq;
-import org.apache.iotdb.service.rpc.thrift.TSExecuteBatchStatementReq;
-import org.apache.iotdb.service.rpc.thrift.TSExecuteStatementReq;
-import org.apache.iotdb.service.rpc.thrift.TSExecuteStatementResp;
-import org.apache.iotdb.service.rpc.thrift.TSFetchMetadataReq;
-import org.apache.iotdb.service.rpc.thrift.TSFetchMetadataResp;
-import org.apache.iotdb.service.rpc.thrift.TSFetchResultsReq;
-import org.apache.iotdb.service.rpc.thrift.TSFetchResultsResp;
-import org.apache.iotdb.service.rpc.thrift.TSGetTimeZoneResp;
-import org.apache.iotdb.service.rpc.thrift.TSIService;
-import org.apache.iotdb.service.rpc.thrift.TSInsertRecordReq;
-import org.apache.iotdb.service.rpc.thrift.TSInsertRecordsOfOneDeviceReq;
-import org.apache.iotdb.service.rpc.thrift.TSInsertRecordsReq;
-import org.apache.iotdb.service.rpc.thrift.TSInsertStringRecordReq;
-import org.apache.iotdb.service.rpc.thrift.TSInsertStringRecordsOfOneDeviceReq;
-import org.apache.iotdb.service.rpc.thrift.TSInsertStringRecordsReq;
-import org.apache.iotdb.service.rpc.thrift.TSInsertTabletReq;
-import org.apache.iotdb.service.rpc.thrift.TSInsertTabletsReq;
-import org.apache.iotdb.service.rpc.thrift.TSLastDataQueryReq;
-import org.apache.iotdb.service.rpc.thrift.TSOpenSessionReq;
-import org.apache.iotdb.service.rpc.thrift.TSOpenSessionResp;
-import org.apache.iotdb.service.rpc.thrift.TSPruneSchemaTemplateReq;
-import org.apache.iotdb.service.rpc.thrift.TSQueryDataSet;
-import org.apache.iotdb.service.rpc.thrift.TSQueryNonAlignDataSet;
-import org.apache.iotdb.service.rpc.thrift.TSQueryTemplateReq;
-import org.apache.iotdb.service.rpc.thrift.TSQueryTemplateResp;
-import org.apache.iotdb.service.rpc.thrift.TSRawDataQueryReq;
-import org.apache.iotdb.service.rpc.thrift.TSSetSchemaTemplateReq;
-import org.apache.iotdb.service.rpc.thrift.TSSetTimeZoneReq;
-import org.apache.iotdb.service.rpc.thrift.TSTracingInfo;
-import org.apache.iotdb.service.rpc.thrift.TSUnsetSchemaTemplateReq;
+import org.apache.iotdb.service.rpc.thrift.*;
 import org.apache.iotdb.tsfile.exception.filter.QueryFilterOptimizationException;
 import org.apache.iotdb.tsfile.exception.write.UnSupportedDataTypeException;
 import org.apache.iotdb.tsfile.file.metadata.enums.CompressionType;
@@ -161,11 +122,9 @@ import static org.apache.iotdb.db.utils.ErrorHandlingUtils.onQueryException;
 /** Thrift RPC implementation at server side. */
 public class TSServiceImpl implements TSIService.Iface {
 
-  private static final Coordinator coordinator = Coordinator.getInstance();
+  private static final SessionManager SESSION_MANAGER = SessionManager.getInstance();
 
-  public static SessionManager SESSION_MANAGER = SessionManager.getInstance();
-
-  protected class QueryTask implements Callable<TSExecuteStatementResp> {
+  private class QueryTask implements Callable<TSExecuteStatementResp> {
 
     private PhysicalPlan plan;
     private final long queryStartTime;
@@ -241,7 +200,7 @@ public class TSServiceImpl implements TSIService.Iface {
     }
   }
 
-  protected class FetchResultsTask implements Callable<TSFetchResultsResp> {
+  private class FetchResultsTask implements Callable<TSFetchResultsResp> {
 
     private final long sessionId;
     private final long queryId;
@@ -1228,46 +1187,6 @@ public class TSServiceImpl implements TSIService.Iface {
         allCheckSuccess, tsStatus, insertRowsPlan.getResults(), req.prefixPaths.size());
   }
 
-  public TSStatus insertRecordsV2(TSInsertRecordsReq req) {
-    long t1 = System.currentTimeMillis();
-    try {
-      if (!SESSION_MANAGER.checkLogin(req.getSessionId())) {
-        return getNotLoggedInStatus();
-      }
-
-      if (AUDIT_LOGGER.isDebugEnabled()) {
-        AUDIT_LOGGER.debug(
-            "Session {} insertRecords, first device {}, first time {}",
-            SESSION_MANAGER.getCurrSessionId(),
-            req.prefixPaths.get(0),
-            req.getTimestamps().get(0));
-      }
-
-      // Step 1: TODO(INSERT) transfer from TSInsertTabletsReq to Statement
-      InsertRowsStatement statement = (InsertRowsStatement) StatementGenerator.createStatement(req);
-
-      // Step 2: call the coordinator
-      long queryId = SESSION_MANAGER.requestQueryId(false);
-      ExecutionResult result =
-          coordinator.execute(
-              statement,
-              new QueryId(String.valueOf(queryId)),
-              QueryType.WRITE,
-              SESSION_MANAGER.getSessionInfo(req.sessionId),
-              "");
-
-      // TODO(INSERT) do this check in analyze
-      //      TSStatus status = serviceProvider.checkAuthority(insertTabletPlan,
-      // req.getSessionId());
-      return result.status;
-    } catch (Exception e) {
-      return onNPEOrUnexpectedException(
-          e, OperationType.INSERT_TABLET, TSStatusCode.EXECUTE_STATEMENT_ERROR);
-    } finally {
-      addOperationLatency(Operation.EXECUTE_RPC_BATCH_INSERT, t1);
-    }
-  }
-
   private TSStatus judgeFinalTsStatus(
       boolean allCheckSuccess,
       TSStatus executeTsStatus,
@@ -1335,47 +1254,6 @@ public class TSServiceImpl implements TSIService.Iface {
     return resp;
   }
 
-  public TSStatus insertRecordsOfOneDeviceV2(TSInsertRecordsOfOneDeviceReq req) {
-    long t1 = System.currentTimeMillis();
-    try {
-      if (!SESSION_MANAGER.checkLogin(req.getSessionId())) {
-        return getNotLoggedInStatus();
-      }
-
-      if (AUDIT_LOGGER.isDebugEnabled()) {
-        AUDIT_LOGGER.debug(
-            "Session {} insertRecords, device {}, first time {}",
-            SESSION_MANAGER.getCurrSessionId(),
-            req.prefixPath,
-            req.getTimestamps().get(0));
-      }
-
-      // Step 1: TODO(INSERT) transfer from TSInsertTabletsReq to Statement
-      InsertRowsOfOneDeviceStatement statement =
-          (InsertRowsOfOneDeviceStatement) StatementGenerator.createStatement(req);
-
-      // Step 2: call the coordinator
-      long queryId = SESSION_MANAGER.requestQueryId(false);
-      ExecutionResult result =
-          coordinator.execute(
-              statement,
-              new QueryId(String.valueOf(queryId)),
-              QueryType.WRITE,
-              SESSION_MANAGER.getSessionInfo(req.sessionId),
-              "");
-
-      // TODO(INSERT) do this check in analyze
-      //      TSStatus status = serviceProvider.checkAuthority(insertTabletPlan,
-      // req.getSessionId());
-      return result.status;
-    } catch (Exception e) {
-      return onNPEOrUnexpectedException(
-          e, OperationType.INSERT_TABLET, TSStatusCode.EXECUTE_STATEMENT_ERROR);
-    } finally {
-      addOperationLatency(Operation.EXECUTE_RPC_BATCH_INSERT, t1);
-    }
-  }
-
   @Override
   public TSStatus insertStringRecordsOfOneDevice(TSInsertStringRecordsOfOneDeviceReq req) {
     if (!SESSION_MANAGER.checkLogin(req.getSessionId())) {
@@ -1434,47 +1312,6 @@ public class TSServiceImpl implements TSIService.Iface {
         allCheckSuccess, tsStatus, insertRowsPlan.getResults(), req.timestamps.size());
   }
 
-  public TSStatus insertStringRecordsOfOneDeviceV2(TSInsertStringRecordsOfOneDeviceReq req) {
-    long t1 = System.currentTimeMillis();
-    try {
-      if (!SESSION_MANAGER.checkLogin(req.getSessionId())) {
-        return getNotLoggedInStatus();
-      }
-
-      if (AUDIT_LOGGER.isDebugEnabled()) {
-        AUDIT_LOGGER.debug(
-            "Session {} insertRecords, device {}, first time {}",
-            SESSION_MANAGER.getCurrSessionId(),
-            req.prefixPath,
-            req.getTimestamps().get(0));
-      }
-
-      // Step 1: TODO(INSERT) transfer from TSInsertTabletsReq to Statement
-      InsertRowsOfOneDeviceStatement statement =
-          (InsertRowsOfOneDeviceStatement) StatementGenerator.createStatement(req);
-
-      // Step 2: call the coordinator
-      long queryId = SESSION_MANAGER.requestQueryId(false);
-      ExecutionResult result =
-          coordinator.execute(
-              statement,
-              new QueryId(String.valueOf(queryId)),
-              QueryType.WRITE,
-              SESSION_MANAGER.getSessionInfo(req.sessionId),
-              "");
-
-      // TODO(INSERT) do this check in analyze
-      //      TSStatus status = serviceProvider.checkAuthority(insertTabletPlan,
-      // req.getSessionId());
-      return result.status;
-    } catch (Exception e) {
-      return onNPEOrUnexpectedException(
-          e, OperationType.INSERT_TABLET, TSStatusCode.EXECUTE_STATEMENT_ERROR);
-    } finally {
-      addOperationLatency(Operation.EXECUTE_RPC_BATCH_INSERT, t1);
-    }
-  }
-
   @Override
   public TSStatus insertStringRecords(TSInsertStringRecordsReq req) {
     if (!SESSION_MANAGER.checkLogin(req.getSessionId())) {
@@ -1528,44 +1365,6 @@ public class TSServiceImpl implements TSIService.Iface {
         allCheckSuccess, tsStatus, insertRowsPlan.getResults(), req.prefixPaths.size());
   }
 
-  public TSStatus insertStringRecordsV2(TSInsertStringRecordsReq req) {
-    long t1 = System.currentTimeMillis();
-    try {
-      if (!SESSION_MANAGER.checkLogin(req.getSessionId())) {
-        return getNotLoggedInStatus();
-      }
-
-      if (AUDIT_LOGGER.isDebugEnabled()) {
-        AUDIT_LOGGER.debug(
-            "Session {} insertRecords, first device {}, first time {}",
-            SESSION_MANAGER.getCurrSessionId(),
-            req.prefixPaths.get(0),
-            req.getTimestamps().get(0));
-      }
-
-      InsertRowsStatement statement = (InsertRowsStatement) StatementGenerator.createStatement(req);
-
-      long queryId = SESSION_MANAGER.requestQueryId(false);
-      ExecutionResult result =
-          coordinator.execute(
-              statement,
-              new QueryId(String.valueOf(queryId)),
-              QueryType.WRITE,
-              SESSION_MANAGER.getSessionInfo(req.sessionId),
-              "");
-
-      // TODO(INSERT) do this check in analyze
-      //      TSStatus status = serviceProvider.checkAuthority(insertTabletPlan,
-      // req.getSessionId());
-      return result.status;
-    } catch (Exception e) {
-      return onNPEOrUnexpectedException(
-          e, OperationType.INSERT_TABLET, TSStatusCode.EXECUTE_STATEMENT_ERROR);
-    } finally {
-      addOperationLatency(Operation.EXECUTE_RPC_BATCH_INSERT, t1);
-    }
-  }
-
   private void addMeasurementAndValue(
       InsertRowPlan insertRowPlan, List<String> measurements, List<String> values) {
     List<String> newMeasurements = new ArrayList<>(measurements.size());
@@ -1656,43 +1455,6 @@ public class TSServiceImpl implements TSIService.Iface {
     }
   }
 
-  public TSStatus insertRecordV2(TSInsertRecordReq req) {
-    long t1 = System.currentTimeMillis();
-    try {
-      if (!SESSION_MANAGER.checkLogin(req.getSessionId())) {
-        return getNotLoggedInStatus();
-      }
-
-      AUDIT_LOGGER.debug(
-          "Session {} insertRecord, device {}, time {}",
-          SESSION_MANAGER.getCurrSessionId(),
-          req.getPrefixPath(),
-          req.getTimestamp());
-
-      InsertRowStatement statement = (InsertRowStatement) StatementGenerator.createStatement(req);
-
-      // Step 2: call the coordinator
-      long queryId = SESSION_MANAGER.requestQueryId(false);
-      ExecutionResult result =
-          coordinator.execute(
-              statement,
-              new QueryId(String.valueOf(queryId)),
-              QueryType.WRITE,
-              SESSION_MANAGER.getSessionInfo(req.sessionId),
-              "");
-
-      // TODO(INSERT) do this check in analyze
-      //      TSStatus status = SESSION_MANAGER.checkAuthority(insertTabletPlan,
-      // req.getSessionId());
-      return result.status;
-    } catch (Exception e) {
-      return onNPEOrUnexpectedException(
-          e, OperationType.INSERT_TABLET, TSStatusCode.EXECUTE_STATEMENT_ERROR);
-    } finally {
-      addOperationLatency(Operation.EXECUTE_RPC_BATCH_INSERT, t1);
-    }
-  }
-
   @Override
   public TSStatus insertStringRecord(TSInsertStringRecordReq req) {
     try {
@@ -1724,43 +1486,6 @@ public class TSServiceImpl implements TSIService.Iface {
     }
   }
 
-  public TSStatus insertStringRecordV2(TSInsertStringRecordReq req) {
-    long t1 = System.currentTimeMillis();
-    try {
-      if (!SESSION_MANAGER.checkLogin(req.getSessionId())) {
-        return getNotLoggedInStatus();
-      }
-
-      AUDIT_LOGGER.debug(
-          "Session {} insertRecord, device {}, time {}",
-          SESSION_MANAGER.getCurrSessionId(),
-          req.getPrefixPath(),
-          req.getTimestamp());
-
-      InsertRowStatement statement = (InsertRowStatement) StatementGenerator.createStatement(req);
-
-      // Step 2: call the coordinator
-      long queryId = SESSION_MANAGER.requestQueryId(false);
-      ExecutionResult result =
-          coordinator.execute(
-              statement,
-              new QueryId(String.valueOf(queryId)),
-              QueryType.WRITE,
-              SESSION_MANAGER.getSessionInfo(req.sessionId),
-              "");
-
-      // TODO(INSERT) do this check in analyze
-      //      TSStatus status = serviceProvider.checkAuthority(insertTabletPlan,
-      // req.getSessionId());
-      return result.status;
-    } catch (Exception e) {
-      return onNPEOrUnexpectedException(
-          e, OperationType.INSERT_TABLET, TSStatusCode.EXECUTE_STATEMENT_ERROR);
-    } finally {
-      addOperationLatency(Operation.EXECUTE_RPC_BATCH_INSERT, t1);
-    }
-  }
-
   @Override
   public TSStatus deleteData(TSDeleteDataReq req) {
     try {
@@ -1819,39 +1544,6 @@ public class TSServiceImpl implements TSIService.Iface {
     }
   }
 
-  public TSStatus insertTabletV2(TSInsertTabletReq req) {
-    long t1 = System.currentTimeMillis();
-    try {
-      if (!SESSION_MANAGER.checkLogin(req.getSessionId())) {
-        return getNotLoggedInStatus();
-      }
-
-      // Step 1: TODO(INSERT) transfer from TSInsertTabletReq to Statement
-      InsertTabletStatement statement =
-          (InsertTabletStatement) StatementGenerator.createStatement(req);
-
-      // Step 2: call the coordinator
-      long queryId = SESSION_MANAGER.requestQueryId(false);
-      ExecutionResult result =
-          coordinator.execute(
-              statement,
-              new QueryId(String.valueOf(queryId)),
-              QueryType.WRITE,
-              SESSION_MANAGER.getSessionInfo(req.sessionId),
-              "");
-
-      // TODO(INSERT) do this check in analyze
-      //      TSStatus status = SESSION_MANAGER.checkAuthority(insertTabletPlan,
-      // req.getSessionId());
-      return result.status;
-    } catch (Exception e) {
-      return onNPEOrUnexpectedException(
-          e, OperationType.INSERT_TABLET, TSStatusCode.EXECUTE_STATEMENT_ERROR);
-    } finally {
-      addOperationLatency(Operation.EXECUTE_RPC_BATCH_INSERT, t1);
-    }
-  }
-
   @Override
   public TSStatus insertTablets(TSInsertTabletsReq req) {
     long t1 = System.currentTimeMillis();
@@ -1874,38 +1566,6 @@ public class TSServiceImpl implements TSIService.Iface {
     }
   }
 
-  public TSStatus insertTabletsV2(TSInsertTabletsReq req) {
-    long t1 = System.currentTimeMillis();
-    try {
-      if (!SESSION_MANAGER.checkLogin(req.getSessionId())) {
-        return getNotLoggedInStatus();
-      }
-
-      // Step 1: TODO(INSERT) transfer from TSInsertTabletsReq to Statement
-      InsertMultiTabletsStatement statement =
-          (InsertMultiTabletsStatement) StatementGenerator.createStatement(req);
-      // Step 2: call the coordinator
-      long queryId = SESSION_MANAGER.requestQueryId(false);
-      ExecutionResult result =
-          coordinator.execute(
-              statement,
-              new QueryId(String.valueOf(queryId)),
-              QueryType.WRITE,
-              SESSION_MANAGER.getSessionInfo(req.sessionId),
-              "");
-
-      // TODO(INSERT) do this check in analyze
-      //      TSStatus status = serviceProvider.checkAuthority(insertTabletPlan,
-      // req.getSessionId());
-      return result.status;
-    } catch (Exception e) {
-      return onNPEOrUnexpectedException(
-          e, OperationType.INSERT_TABLET, TSStatusCode.EXECUTE_STATEMENT_ERROR);
-    } finally {
-      addOperationLatency(Operation.EXECUTE_RPC_BATCH_INSERT, t1);
-    }
-  }
-
   private InsertTabletPlan constructInsertTabletPlan(TSInsertTabletsReq req, int i)
       throws IllegalPathException {
     InsertTabletPlan insertTabletPlan =
diff --git a/server/src/main/java/org/apache/iotdb/db/utils/QueryDataSetUtils.java b/server/src/main/java/org/apache/iotdb/db/utils/QueryDataSetUtils.java
index 29efa1a4c7..455c274f41 100644
--- a/server/src/main/java/org/apache/iotdb/db/utils/QueryDataSetUtils.java
+++ b/server/src/main/java/org/apache/iotdb/db/utils/QueryDataSetUtils.java
@@ -18,7 +18,7 @@
  */
 package org.apache.iotdb.db.utils;
 
-import org.apache.iotdb.db.mpp.execution.QueryExecution;
+import org.apache.iotdb.db.mpp.execution.IQueryExecution;
 import org.apache.iotdb.db.tools.watermark.WatermarkEncoder;
 import org.apache.iotdb.service.rpc.thrift.TSQueryDataSet;
 import org.apache.iotdb.tsfile.exception.write.UnSupportedDataTypeException;
@@ -177,7 +177,7 @@ public class QueryDataSetUtils {
   }
 
   public static TSQueryDataSet convertTsBlockByFetchSize(
-      QueryExecution queryExecution, int fetchSize) throws IOException {
+      IQueryExecution queryExecution, int fetchSize) throws IOException {
     int columnNum = queryExecution.getOutputValueColumnCount();
     TSQueryDataSet tsQueryDataSet = new TSQueryDataSet();
     // one time column and each value column has an actual value buffer and a bitmap value to