You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ro...@apache.org on 2021/11/15 07:46:49 UTC

[iotdb] branch master updated: [IOTDB-1923] Separate the request unpacking and execution processing logic of TSServiceImpl (#4316)

This is an automated email from the ASF dual-hosted git repository.

rong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/master by this push:
     new c81d6bb  [IOTDB-1923] Separate the request unpacking and execution processing logic of TSServiceImpl (#4316)
c81d6bb is described below

commit c81d6bb30b755de8a6089b1b0fdd8f5ff2eb7e72
Author: Xieqijun <44...@users.noreply.github.com>
AuthorDate: Mon Nov 15 15:46:20 2021 +0800

    [IOTDB-1923] Separate the request unpacking and execution processing logic of TSServiceImpl (#4316)
    
    Co-authored-by: Steve Yurong Su <ro...@apache.org>
---
 .../org/apache/iotdb/db/conf/IoTDBConstant.java    |   1 +
 .../org/apache/iotdb/db/service/TSServiceImpl.java | 410 +++------------------
 .../db/service/basic/BasicOpenSessionResp.java     |  34 ++
 .../db/service/basic/BasicServiceProvider.java     | 268 ++++++++++++++
 .../db/service/basic/QueryFrequencyRecorder.java   |  54 +++
 .../apache/iotdb/db/utils/ErrorHandlingUtils.java  | 152 ++++++++
 6 files changed, 552 insertions(+), 367 deletions(-)

diff --git a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConstant.java b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConstant.java
index e74f5c0..362a3ee 100644
--- a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConstant.java
+++ b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConstant.java
@@ -37,6 +37,7 @@ public class IoTDBConstant {
           : VERSION.split("\\.")[0] + "." + VERSION.split("\\.")[1];
 
   public static final String AUDIT_LOGGER_NAME = "IoTDB_AUDIT_LOGGER";
+  public static final String SLOW_SQL_LOGGER_NAME = "SLOW_SQL";
 
   public static final String IOTDB_JMX_PORT = "iotdb.jmx.port";
 
diff --git a/server/src/main/java/org/apache/iotdb/db/service/TSServiceImpl.java b/server/src/main/java/org/apache/iotdb/db/service/TSServiceImpl.java
index 911be23..08ee23c 100644
--- a/server/src/main/java/org/apache/iotdb/db/service/TSServiceImpl.java
+++ b/server/src/main/java/org/apache/iotdb/db/service/TSServiceImpl.java
@@ -19,10 +19,8 @@
 package org.apache.iotdb.db.service;
 
 import org.apache.iotdb.db.auth.AuthException;
-import org.apache.iotdb.db.auth.AuthorityChecker;
 import org.apache.iotdb.db.auth.authorizer.BasicAuthorizer;
 import org.apache.iotdb.db.auth.authorizer.IAuthorizer;
-import org.apache.iotdb.db.concurrent.IoTDBThreadPoolFactory;
 import org.apache.iotdb.db.conf.IoTDBConfig;
 import org.apache.iotdb.db.conf.IoTDBConstant;
 import org.apache.iotdb.db.conf.IoTDBDescriptor;
@@ -30,20 +28,16 @@ import org.apache.iotdb.db.conf.OperationType;
 import org.apache.iotdb.db.cost.statistic.Measurement;
 import org.apache.iotdb.db.cost.statistic.Operation;
 import org.apache.iotdb.db.engine.selectinto.InsertTabletPlansIterator;
-import org.apache.iotdb.db.exception.*;
+import org.apache.iotdb.db.exception.IoTDBException;
+import org.apache.iotdb.db.exception.QueryInBatchStatementException;
+import org.apache.iotdb.db.exception.StorageEngineException;
 import org.apache.iotdb.db.exception.metadata.IllegalPathException;
 import org.apache.iotdb.db.exception.metadata.MetadataException;
-import org.apache.iotdb.db.exception.metadata.StorageGroupNotSetException;
 import org.apache.iotdb.db.exception.query.QueryProcessException;
-import org.apache.iotdb.db.exception.query.QueryTimeoutRuntimeException;
-import org.apache.iotdb.db.exception.runtime.SQLParserException;
 import org.apache.iotdb.db.metadata.PartialPath;
 import org.apache.iotdb.db.metadata.template.TemplateQueryType;
 import org.apache.iotdb.db.metrics.server.SqlArgument;
-import org.apache.iotdb.db.qp.Planner;
 import org.apache.iotdb.db.qp.constant.SQLConstant;
-import org.apache.iotdb.db.qp.executor.IPlanExecutor;
-import org.apache.iotdb.db.qp.executor.PlanExecutor;
 import org.apache.iotdb.db.qp.logical.Operator.OperatorType;
 import org.apache.iotdb.db.qp.physical.PhysicalPlan;
 import org.apache.iotdb.db.qp.physical.crud.AggregationPlan;
@@ -71,22 +65,18 @@ import org.apache.iotdb.db.qp.physical.sys.CreateMultiTimeSeriesPlan;
 import org.apache.iotdb.db.qp.physical.sys.CreateTimeSeriesPlan;
 import org.apache.iotdb.db.qp.physical.sys.DeleteStorageGroupPlan;
 import org.apache.iotdb.db.qp.physical.sys.DeleteTimeSeriesPlan;
-import org.apache.iotdb.db.qp.physical.sys.FlushPlan;
 import org.apache.iotdb.db.qp.physical.sys.SetStorageGroupPlan;
-import org.apache.iotdb.db.qp.physical.sys.SetSystemModePlan;
 import org.apache.iotdb.db.qp.physical.sys.ShowPlan;
 import org.apache.iotdb.db.qp.physical.sys.ShowQueryProcesslistPlan;
 import org.apache.iotdb.db.query.aggregation.AggregateResult;
 import org.apache.iotdb.db.query.context.QueryContext;
-import org.apache.iotdb.db.query.control.QueryTimeManager;
-import org.apache.iotdb.db.query.control.SessionManager;
-import org.apache.iotdb.db.query.control.SessionTimeoutManager;
 import org.apache.iotdb.db.query.control.tracing.TracingConstant;
-import org.apache.iotdb.db.query.control.tracing.TracingManager;
 import org.apache.iotdb.db.query.dataset.AlignByDeviceDataSet;
 import org.apache.iotdb.db.query.dataset.DirectAlignByTimeDataSet;
 import org.apache.iotdb.db.query.dataset.DirectNonAlignDataSet;
 import org.apache.iotdb.db.query.expression.ResultColumn;
+import org.apache.iotdb.db.service.basic.BasicOpenSessionResp;
+import org.apache.iotdb.db.service.basic.BasicServiceProvider;
 import org.apache.iotdb.db.tools.watermark.GroupedLSBWatermarkEncoder;
 import org.apache.iotdb.db.tools.watermark.WatermarkEncoder;
 import org.apache.iotdb.db.utils.QueryDataSetUtils;
@@ -124,7 +114,6 @@ import org.apache.iotdb.service.rpc.thrift.TSInsertTabletsReq;
 import org.apache.iotdb.service.rpc.thrift.TSLastDataQueryReq;
 import org.apache.iotdb.service.rpc.thrift.TSOpenSessionReq;
 import org.apache.iotdb.service.rpc.thrift.TSOpenSessionResp;
-import org.apache.iotdb.service.rpc.thrift.TSProtocolVersion;
 import org.apache.iotdb.service.rpc.thrift.TSPruneSchemaTemplateReq;
 import org.apache.iotdb.service.rpc.thrift.TSQueryDataSet;
 import org.apache.iotdb.service.rpc.thrift.TSQueryNonAlignDataSet;
@@ -145,7 +134,6 @@ import org.apache.iotdb.tsfile.read.common.Path;
 import org.apache.iotdb.tsfile.read.query.dataset.QueryDataSet;
 
 import com.google.common.primitives.Bytes;
-import org.antlr.v4.runtime.misc.ParseCancellationException;
 import org.apache.thrift.TException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -163,68 +151,32 @@ import java.util.List;
 import java.util.Map;
 import java.util.Map.Entry;
 import java.util.Set;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicInteger;
 import java.util.stream.Collectors;
 
+import static org.apache.iotdb.db.utils.ErrorHandlingUtils.onIoTDBException;
+import static org.apache.iotdb.db.utils.ErrorHandlingUtils.onNPEOrUnexpectedException;
+import static org.apache.iotdb.db.utils.ErrorHandlingUtils.onNonQueryException;
+import static org.apache.iotdb.db.utils.ErrorHandlingUtils.onQueryException;
+import static org.apache.iotdb.db.utils.ErrorHandlingUtils.tryCatchQueryException;
+
 /** Thrift RPC implementation at server side. */
-public class TSServiceImpl implements TSIService.Iface {
+public class TSServiceImpl extends BasicServiceProvider implements TSIService.Iface {
 
+  // main logger
   private static final Logger LOGGER = LoggerFactory.getLogger(TSServiceImpl.class);
-  private static final Logger SLOW_SQL_LOGGER = LoggerFactory.getLogger("SLOW_SQL");
-  private static final Logger QUERY_FREQUENCY_LOGGER = LoggerFactory.getLogger("QUERY_FREQUENCY");
-  private static final Logger DETAILED_FAILURE_QUERY_TRACE_LOGGER =
-      LoggerFactory.getLogger("DETAILED_FAILURE_QUERY_TRACE");
-  private static final Logger AUDIT_LOGGER =
-      LoggerFactory.getLogger(IoTDBConstant.AUDIT_LOGGER_NAME);
-
-  private static final String INFO_NOT_LOGIN = "{}: Not login. ";
-  private static final String INFO_PARSING_SQL_ERROR =
-      "Error occurred while parsing SQL to physical plan: ";
-  private static final String INFO_CHECK_METADATA_ERROR = "Check metadata error: ";
-  private static final String INFO_QUERY_PROCESS_ERROR = "Error occurred in query process: ";
-  private static final String INFO_NOT_ALLOWED_IN_BATCH_ERROR =
-      "The query statement is not allowed in batch: ";
+
   private static final String INFO_INTERRUPT_ERROR =
       "Current Thread interrupted when dealing with request {}";
 
-  public static final TSProtocolVersion CURRENT_RPC_VERSION =
-      TSProtocolVersion.IOTDB_SERVICE_PROTOCOL_V3;
-
-  private static final int MAX_SIZE =
-      IoTDBDescriptor.getInstance().getConfig().getQueryCacheSizeInMetric();
+  private static final int MAX_SIZE = CONFIG.getQueryCacheSizeInMetric();
   private static final int DELETE_SIZE = 20;
 
-  private final IoTDBConfig config = IoTDBDescriptor.getInstance().getConfig();
-
   private static final List<SqlArgument> sqlArgumentList = new ArrayList<>(MAX_SIZE);
-  private static final AtomicInteger queryCount = new AtomicInteger(0);
-  private final QueryTimeManager queryTimeManager = QueryTimeManager.getInstance();
-  private final SessionManager sessionManager = SessionManager.getInstance();
-  private final TracingManager tracingManager = TracingManager.getInstance();
 
   private long startTime = -1L;
 
-  protected Planner processor;
-  protected IPlanExecutor executor;
-
   public TSServiceImpl() throws QueryProcessException {
-    processor = new Planner();
-    executor = new PlanExecutor();
-
-    ScheduledExecutorService timedQuerySqlCountThread =
-        IoTDBThreadPoolFactory.newSingleThreadScheduledExecutor("timedQuerySqlCount");
-    timedQuerySqlCountThread.scheduleAtFixedRate(
-        () -> {
-          if (queryCount.get() != 0) {
-            QUERY_FREQUENCY_LOGGER.info(
-                "Query count in current 1 minute {} ", queryCount.getAndSet(0));
-          }
-        },
-        config.getFrequencyIntervalInMinute(),
-        config.getFrequencyIntervalInMinute(),
-        TimeUnit.MINUTES);
+    super();
   }
 
   public static List<SqlArgument> getSqlArgumentList() {
@@ -233,74 +185,17 @@ public class TSServiceImpl implements TSIService.Iface {
 
   @Override
   public TSOpenSessionResp openSession(TSOpenSessionReq req) throws TException {
-    boolean status;
-    IAuthorizer authorizer;
-    try {
-      authorizer = BasicAuthorizer.getInstance();
-    } catch (AuthException e) {
-      throw new TException(e);
-    }
-    String loginMessage = null;
-    try {
-      status = authorizer.login(req.getUsername(), req.getPassword());
-    } catch (AuthException e) {
-      LOGGER.info("meet error while logging in.", e);
-      status = false;
-      loginMessage = e.getMessage();
-    }
-
-    TSStatus tsStatus;
-    long sessionId = -1;
-    if (status) {
-      // check the version compatibility
-      boolean compatible = checkCompatibility(req.getClient_protocol());
-      if (!compatible) {
-        tsStatus =
-            RpcUtils.getStatus(
-                TSStatusCode.INCOMPATIBLE_VERSION,
-                "The version is incompatible, please upgrade to " + IoTDBConstant.VERSION);
-        TSOpenSessionResp resp = new TSOpenSessionResp(tsStatus, CURRENT_RPC_VERSION);
-        resp.setSessionId(sessionId);
-        return resp;
-      }
-
-      tsStatus = RpcUtils.getStatus(TSStatusCode.SUCCESS_STATUS, "Login successfully");
-
-      sessionId = sessionManager.requestSessionId(req.getUsername(), req.getZoneId());
-      AUDIT_LOGGER.info("User {} opens Session-{}", req.getUsername(), sessionId);
-      LOGGER.info(
-          "{}: Login status: {}. User : {}",
-          IoTDBConstant.GLOBAL_DB_NAME,
-          tsStatus.message,
-          req.getUsername());
-    } else {
-      tsStatus =
-          RpcUtils.getStatus(
-              TSStatusCode.WRONG_LOGIN_PASSWORD_ERROR,
-              loginMessage != null ? loginMessage : "Authentication failed.");
-      AUDIT_LOGGER.info(
-          "User {} opens Session failed with an incorrect password", req.getUsername());
-    }
-
-    SessionTimeoutManager.getInstance().register(sessionId);
-
+    BasicOpenSessionResp openSessionResp =
+        openSession(req.username, req.password, req.zoneId, req.client_protocol);
+    TSStatus tsStatus = RpcUtils.getStatus(openSessionResp.getCode(), openSessionResp.getMessage());
     TSOpenSessionResp resp = new TSOpenSessionResp(tsStatus, CURRENT_RPC_VERSION);
-    return resp.setSessionId(sessionId);
-  }
-
-  private boolean checkCompatibility(TSProtocolVersion version) {
-    return version.equals(CURRENT_RPC_VERSION);
+    return resp.setSessionId(openSessionResp.getSessionId());
   }
 
   @Override
   public TSStatus closeSession(TSCloseSessionReq req) {
-    long sessionId = req.getSessionId();
-    AUDIT_LOGGER.info("Session-{} is closing", sessionId);
-
-    sessionManager.removeCurrSessionId();
-
     return new TSStatus(
-        !SessionTimeoutManager.getInstance().unregister(sessionId)
+        !closeSession(req.sessionId)
             ? RpcUtils.getStatus(TSStatusCode.NOT_LOGIN_ERROR)
             : RpcUtils.getStatus(TSStatusCode.SUCCESS_STATUS));
   }
@@ -313,38 +208,8 @@ public class TSServiceImpl implements TSIService.Iface {
 
   @Override
   public TSStatus closeOperation(TSCloseOperationReq req) {
-    if (!checkLogin(req.getSessionId())) {
-      return getNotLoggedInStatus();
-    }
-
-    if (AUDIT_LOGGER.isDebugEnabled()) {
-      AUDIT_LOGGER.debug(
-          "{}: receive close operation from Session {}",
-          IoTDBConstant.GLOBAL_DB_NAME,
-          sessionManager.getCurrSessionId());
-    }
-
-    try {
-      if (req.isSetStatementId()) {
-        if (req.isSetQueryId()) {
-          sessionManager.closeDataset(req.statementId, req.queryId);
-        } else {
-          sessionManager.closeStatement(req.sessionId, req.statementId);
-        }
-        return RpcUtils.getStatus(TSStatusCode.SUCCESS_STATUS);
-      } else {
-        return RpcUtils.getStatus(
-            TSStatusCode.CLOSE_OPERATION_ERROR, "statement id not set by client.");
-      }
-    } catch (Exception e) {
-      return onNPEOrUnexpectedException(
-          e, OperationType.CLOSE_OPERATION, TSStatusCode.CLOSE_OPERATION_ERROR);
-    }
-  }
-
-  /** release single operation resource */
-  protected void releaseQueryResource(long queryId) throws StorageEngineException {
-    sessionManager.releaseQueryResource(queryId);
+    return closeOperation(
+        req.sessionId, req.queryId, req.statementId, req.isSetStatementId(), req.isSetQueryId());
   }
 
   @Override
@@ -359,7 +224,7 @@ public class TSServiceImpl implements TSIService.Iface {
     try {
       switch (req.getType()) {
         case "METADATA_IN_JSON":
-          resp.setMetadataInJson(getMetadataInString());
+          resp.setMetadataInJson(IoTDB.metaManager.getMetadataInString());
           status = RpcUtils.getStatus(TSStatusCode.SUCCESS_STATUS);
           break;
         case "COLUMN":
@@ -389,10 +254,6 @@ public class TSServiceImpl implements TSIService.Iface {
     return resp.setStatus(status);
   }
 
-  private String getMetadataInString() {
-    return IoTDB.metaManager.getMetadataInString();
-  }
-
   protected List<PartialPath> getPaths(PartialPath path) throws MetadataException {
     return IoTDB.metaManager.getFlatMeasurementPaths(path);
   }
@@ -546,7 +407,6 @@ public class TSServiceImpl implements TSIService.Iface {
             multiPlan = new CreateMultiTimeSeriesPlan();
             executeList.add(multiPlan);
           }
-
           TSStatus status = checkAuthority(physicalPlan, req.getSessionId());
           if (status != null) {
             multiPlan.getResults().put(i, status);
@@ -692,7 +552,7 @@ public class TSServiceImpl implements TSIService.Iface {
               req.statementId,
               physicalPlan,
               req.fetchSize,
-              config.getQueryTimeoutThreshold(),
+              CONFIG.getQueryTimeoutThreshold(),
               req.sessionId,
               req.isEnableRedirectQuery(),
               req.isJdbcQuery())
@@ -724,7 +584,7 @@ public class TSServiceImpl implements TSIService.Iface {
               req.statementId,
               physicalPlan,
               req.fetchSize,
-              config.getQueryTimeoutThreshold(),
+              CONFIG.getQueryTimeoutThreshold(),
               req.sessionId,
               req.isEnableRedirectQuery(),
               req.isJdbcQuery())
@@ -758,7 +618,7 @@ public class TSServiceImpl implements TSIService.Iface {
       throws QueryProcessException, SQLException, StorageEngineException,
           QueryFilterOptimizationException, MetadataException, IOException, InterruptedException,
           TException, AuthException {
-    queryCount.incrementAndGet();
+    queryFrequencyRecorder.incrementAndGet();
     AUDIT_LOGGER.debug(
         "Session {} execute Query: {}", sessionManager.getCurrSessionId(), statement);
 
@@ -858,7 +718,7 @@ public class TSServiceImpl implements TSIService.Iface {
         tracingManager.setSeriesPathNum(queryId, ((AlignByDeviceDataSet) newDataSet).getPathsNum());
       }
 
-      if (config.isEnableMetricService()) {
+      if (CONFIG.isEnableMetricService()) {
         long endTime = System.currentTimeMillis();
         SqlArgument sqlArgument = new SqlArgument(resp, plan, statement, queryStartTime, endTime);
         synchronized (sqlArgumentList) {
@@ -885,7 +745,7 @@ public class TSServiceImpl implements TSIService.Iface {
     } finally {
       Measurement.INSTANCE.addOperationLatency(Operation.EXECUTE_QUERY, queryStartTime);
       long costTime = System.currentTimeMillis() - queryStartTime;
-      if (costTime >= config.getSlowQueryThreshold()) {
+      if (costTime >= CONFIG.getSlowQueryThreshold()) {
         SLOW_SQL_LOGGER.info("Cost: {} ms, sql is {}", costTime, statement);
       }
     }
@@ -1075,7 +935,7 @@ public class TSServiceImpl implements TSIService.Iface {
     final SelectIntoPlan selectIntoPlan = (SelectIntoPlan) physicalPlan;
     final QueryPlan queryPlan = selectIntoPlan.getQueryPlan();
 
-    queryCount.incrementAndGet();
+    queryFrequencyRecorder.incrementAndGet();
     AUDIT_LOGGER.debug(
         "Session {} execute select into: {}", sessionManager.getCurrSessionId(), statement);
     if (physicalPlan instanceof QueryPlan && ((QueryPlan) physicalPlan).isEnableTracing()) {
@@ -1104,7 +964,7 @@ public class TSServiceImpl implements TSIService.Iface {
       sessionManager.releaseQueryResourceNoExceptions(queryId);
       Measurement.INSTANCE.addOperationLatency(Operation.EXECUTE_SELECT_INTO, startTime);
       long costTime = System.currentTimeMillis() - startTime;
-      if (costTime >= config.getSlowQueryThreshold()) {
+      if (costTime >= CONFIG.getSlowQueryThreshold()) {
         SLOW_SQL_LOGGER.info("Cost: {} ms, sql is {}", costTime, statement);
       }
     }
@@ -1116,6 +976,7 @@ public class TSServiceImpl implements TSIService.Iface {
     for (int i = 0; i < insertTabletPlans.size(); i++) {
       InsertTabletPlan insertTabletPlan = insertTabletPlans.get(i);
       TSStatus status = checkAuthority(insertTabletPlan, sessionId);
+
       if (status != null) {
         // not authorized
         insertMultiTabletPlan.getResults().put(i, status);
@@ -1222,35 +1083,18 @@ public class TSServiceImpl implements TSIService.Iface {
     }
 
     WatermarkEncoder encoder = null;
-    if (config.isEnableWatermark() && authorizer.isUserUseWaterMark(userName)) {
-      if (config.getWatermarkMethodName().equals(IoTDBConfig.WATERMARK_GROUPED_LSB)) {
-        encoder = new GroupedLSBWatermarkEncoder(config);
+    if (CONFIG.isEnableWatermark() && authorizer.isUserUseWaterMark(userName)) {
+      if (CONFIG.getWatermarkMethodName().equals(IoTDBConfig.WATERMARK_GROUPED_LSB)) {
+        encoder = new GroupedLSBWatermarkEncoder(CONFIG);
       } else {
         throw new UnSupportedDataTypeException(
             String.format(
-                "Watermark method is not supported yet: %s", config.getWatermarkMethodName()));
+                "Watermark method is not supported yet: %s", CONFIG.getWatermarkMethodName()));
       }
     }
     return encoder;
   }
 
-  /** create QueryDataSet and buffer it for fetchResults */
-  private QueryDataSet createQueryDataSet(
-      QueryContext context, PhysicalPlan physicalPlan, int fetchSize)
-      throws QueryProcessException, QueryFilterOptimizationException, StorageEngineException,
-          IOException, MetadataException, SQLException, TException, InterruptedException {
-
-    QueryDataSet queryDataSet = executor.processQuery(physicalPlan, context);
-    queryDataSet.setFetchSize(fetchSize);
-    sessionManager.setDataset(context.getQueryId(), queryDataSet);
-    return queryDataSet;
-  }
-
-  protected QueryContext genQueryContext(
-      long queryId, boolean debug, long startTime, String statement, long timeout) {
-    return new QueryContext(queryId, debug, startTime, statement, timeout);
-  }
-
   /** update statement can be: 1. select-into statement 2. non-query statement */
   @Override
   public TSExecuteStatementResp executeUpdateStatement(TSExecuteStatementReq req) {
@@ -1307,30 +1151,6 @@ public class TSServiceImpl implements TSIService.Iface {
             .setQueryId(sessionManager.requestQueryId(false));
   }
 
-  /**
-   * Check whether current user has logged in.
-   *
-   * @return true: If logged in; false: If not logged in
-   */
-  private boolean checkLogin(long sessionId) {
-    boolean isLoggedIn = sessionManager.getUsername(sessionId) != null;
-    if (!isLoggedIn) {
-      LOGGER.info(INFO_NOT_LOGIN, IoTDBConstant.GLOBAL_DB_NAME);
-    } else {
-      SessionTimeoutManager.getInstance().refresh(sessionId);
-    }
-    return isLoggedIn;
-  }
-
-  private boolean checkAuthorization(List<PartialPath> paths, PhysicalPlan plan, String username)
-      throws AuthException {
-    String targetUser = null;
-    if (plan instanceof AuthorPlan) {
-      targetUser = ((AuthorPlan) plan).getUserName();
-    }
-    return AuthorityChecker.check(username, paths, plan.getOperatorType(), targetUser);
-  }
-
   protected void handleClientExit() {
     Long sessionId = sessionManager.getCurrSessionId();
     if (sessionId != null) {
@@ -1535,6 +1355,7 @@ public class TSServiceImpl implements TSIService.Iface {
         plan.setNeedInferType(true);
         plan.setAligned(req.isAligned);
         TSStatus status = checkAuthority(plan, req.getSessionId());
+
         if (status != null) {
           insertRowsPlan.getResults().put(i, status);
           allCheckSuccess = false;
@@ -1641,7 +1462,6 @@ public class TSServiceImpl implements TSIService.Iface {
               req.getMeasurements().toArray(new String[0]),
               req.values,
               req.isAligned);
-
       TSStatus status = checkAuthority(plan, req.getSessionId());
       return status != null ? status : executeNonQueryPlan(plan);
     } catch (IoTDBException e) {
@@ -1673,7 +1493,6 @@ public class TSServiceImpl implements TSIService.Iface {
       plan.setValues(req.getValues().toArray(new Object[0]));
       plan.setNeedInferType(true);
       plan.setAligned(req.isAligned);
-
       TSStatus status = checkAuthority(plan, req.getSessionId());
       return status != null ? status : executeNonQueryPlan(plan);
     } catch (IoTDBException e) {
@@ -1699,8 +1518,8 @@ public class TSServiceImpl implements TSIService.Iface {
         paths.add(new PartialPath(path));
       }
       plan.addPaths(paths);
-
       TSStatus status = checkAuthority(plan, req.getSessionId());
+
       return status != null ? new TSStatus(status) : new TSStatus(executeNonQueryPlan(plan));
     } catch (IoTDBException e) {
       return onIoTDBException(e, OperationType.DELETE_DATA, e.getErrorCode());
@@ -1729,8 +1548,8 @@ public class TSServiceImpl implements TSIService.Iface {
       insertTabletPlan.setRowCount(req.size);
       insertTabletPlan.setDataTypes(req.types);
       insertTabletPlan.setAligned(req.isAligned);
-
       TSStatus status = checkAuthority(insertTabletPlan, req.getSessionId());
+
       return status != null ? status : executeNonQueryPlan(insertTabletPlan);
     } catch (IoTDBException e) {
       return onIoTDBException(e, OperationType.INSERT_TABLET, e.getErrorCode());
@@ -1811,8 +1630,8 @@ public class TSServiceImpl implements TSIService.Iface {
       }
 
       SetStorageGroupPlan plan = new SetStorageGroupPlan(new PartialPath(storageGroup));
-
       TSStatus status = checkAuthority(plan, sessionId);
+
       return status != null ? status : executeNonQueryPlan(plan);
     } catch (IoTDBException e) {
       return onIoTDBException(e, OperationType.SET_STORAGE_GROUP, e.getErrorCode());
@@ -1834,7 +1653,6 @@ public class TSServiceImpl implements TSIService.Iface {
         storageGroupList.add(new PartialPath(storageGroup));
       }
       DeleteStorageGroupPlan plan = new DeleteStorageGroupPlan(storageGroupList);
-
       TSStatus status = checkAuthority(plan, sessionId);
       return status != null ? status : executeNonQueryPlan(plan);
     } catch (IoTDBException e) {
@@ -1867,7 +1685,6 @@ public class TSServiceImpl implements TSIService.Iface {
               req.tags,
               req.attributes,
               req.measurementAlias);
-
       TSStatus status = checkAuthority(plan, req.getSessionId());
       return status != null ? status : executeNonQueryPlan(plan);
     } catch (IoTDBException e) {
@@ -1921,7 +1738,6 @@ public class TSServiceImpl implements TSIService.Iface {
               encodings,
               CompressionType.values()[req.compressor],
               req.measurementAlias);
-
       TSStatus status = checkAuthority(plan, req.getSessionId());
       return status != null ? status : executeNonQueryPlan(plan);
     } catch (IoTDBException e) {
@@ -1974,7 +1790,6 @@ public class TSServiceImpl implements TSIService.Iface {
       CreateTimeSeriesPlan plan = new CreateTimeSeriesPlan();
       for (int i = 0; i < req.paths.size(); i++) {
         plan.setPath(new PartialPath(req.paths.get(i)));
-
         TSStatus status = checkAuthority(plan, req.getSessionId());
         if (status != null) {
           // not authorized
@@ -2033,7 +1848,6 @@ public class TSServiceImpl implements TSIService.Iface {
         pathList.add(new PartialPath(path));
       }
       DeleteTimeSeriesPlan plan = new DeleteTimeSeriesPlan(pathList);
-
       TSStatus status = checkAuthority(plan, sessionId);
       return status != null ? status : executeNonQueryPlan(plan);
     } catch (IoTDBException e) {
@@ -2098,7 +1912,6 @@ public class TSServiceImpl implements TSIService.Iface {
             new CreateTemplatePlan(
                 req.getName(), measurements, dataTypes, encodings, compressionTypes);
       }
-
       TSStatus status = checkAuthority(plan, req.getSessionId());
       return status != null ? status : executeNonQueryPlan(plan);
     } catch (Exception e) {
@@ -2125,7 +1938,6 @@ public class TSServiceImpl implements TSIService.Iface {
     AppendTemplatePlan plan =
         new AppendTemplatePlan(
             req.getName(), req.isAligned, measurements, dataTypes, encodings, compressionTypes);
-
     TSStatus status = checkAuthority(plan, req.getSessionId());
     return status != null ? status : executeNonQueryPlan(plan);
   }
@@ -2187,7 +1999,6 @@ public class TSServiceImpl implements TSIService.Iface {
     }
 
     SetSchemaTemplatePlan plan = new SetSchemaTemplatePlan(req.templateName, req.prefixPath);
-
     TSStatus status = checkAuthority(plan, req.getSessionId());
     return status != null ? status : executeNonQueryPlan(plan);
   }
@@ -2207,162 +2018,27 @@ public class TSServiceImpl implements TSIService.Iface {
     }
 
     UnsetSchemaTemplatePlan plan = new UnsetSchemaTemplatePlan(req.prefixPath, req.templateName);
-
     TSStatus status = checkAuthority(plan, req.getSessionId());
     return status != null ? status : executeNonQueryPlan(plan);
   }
 
-  private TSStatus checkAuthority(PhysicalPlan plan, long sessionId) {
-    List<PartialPath> paths = plan.getPaths();
-    try {
-      if (!checkAuthorization(paths, plan, sessionManager.getUsername(sessionId))) {
-        return RpcUtils.getStatus(
-            TSStatusCode.NO_PERMISSION_ERROR,
-            "No permissions for this operation " + plan.getOperatorType());
-      }
-    } catch (AuthException e) {
-      LOGGER.warn("meet error while checking authorization.", e);
-      return RpcUtils.getStatus(TSStatusCode.UNINITIALIZED_AUTH_ERROR, e.getMessage());
-    } catch (Exception e) {
-      return onNPEOrUnexpectedException(
-          e, OperationType.CHECK_AUTHORITY, TSStatusCode.EXECUTE_STATEMENT_ERROR);
-    }
-    return null;
-  }
-
   protected TSStatus executeNonQueryPlan(PhysicalPlan plan) {
-    boolean isSuccessful;
     try {
-      plan.checkIntegrity();
-      isSuccessful = executeNonQuery(plan);
+      return executeNonQuery(plan)
+          ? RpcUtils.getStatus(TSStatusCode.SUCCESS_STATUS, "Execute successfully")
+          : RpcUtils.getStatus(TSStatusCode.EXECUTE_STATEMENT_ERROR);
     } catch (Exception e) {
       return onNonQueryException(e, OperationType.EXECUTE_NON_QUERY_PLAN);
     }
-
-    return isSuccessful
-        ? RpcUtils.getStatus(TSStatusCode.SUCCESS_STATUS, "Execute successfully")
-        : RpcUtils.getStatus(TSStatusCode.EXECUTE_STATEMENT_ERROR);
-  }
-
-  private boolean executeNonQuery(PhysicalPlan plan)
-      throws QueryProcessException, StorageGroupNotSetException, StorageEngineException {
-    if (!(plan instanceof SetSystemModePlan)
-        && !(plan instanceof FlushPlan)
-        && IoTDBDescriptor.getInstance().getConfig().isReadOnly()) {
-      throw new QueryProcessException(
-          "Current system mode is read-only, does not support non-query operation");
-    }
-    return executor.processNonQuery(plan);
   }
 
   protected TSDataType getSeriesTypeByPath(PartialPath path) throws MetadataException {
     return SchemaUtils.getSeriesTypeByPath(path);
   }
 
-  private TSStatus onQueryException(Exception e, String operation) {
-    TSStatus status = tryCatchQueryException(e);
-    return status != null
-        ? status
-        : onNPEOrUnexpectedException(e, operation, TSStatusCode.INTERNAL_SERVER_ERROR);
-  }
-
-  private TSStatus onQueryException(Exception e, OperationType operation) {
-    return onQueryException(e, operation.getName());
-  }
-
-  private TSStatus tryCatchQueryException(Exception e) {
-    if (e instanceof QueryTimeoutRuntimeException) {
-      DETAILED_FAILURE_QUERY_TRACE_LOGGER.warn(e.getMessage(), e);
-      return RpcUtils.getStatus(TSStatusCode.TIME_OUT, getRootCause(e));
-    } else if (e instanceof ParseCancellationException) {
-      DETAILED_FAILURE_QUERY_TRACE_LOGGER.warn(INFO_PARSING_SQL_ERROR, e);
-      return RpcUtils.getStatus(
-          TSStatusCode.SQL_PARSE_ERROR, INFO_PARSING_SQL_ERROR + getRootCause(e));
-    } else if (e instanceof SQLParserException) {
-      DETAILED_FAILURE_QUERY_TRACE_LOGGER.warn(INFO_CHECK_METADATA_ERROR, e);
-      return RpcUtils.getStatus(
-          TSStatusCode.METADATA_ERROR, INFO_CHECK_METADATA_ERROR + getRootCause(e));
-    } else if (e instanceof QueryProcessException) {
-      DETAILED_FAILURE_QUERY_TRACE_LOGGER.warn(INFO_QUERY_PROCESS_ERROR, e);
-      return RpcUtils.getStatus(
-          TSStatusCode.QUERY_PROCESS_ERROR, INFO_QUERY_PROCESS_ERROR + getRootCause(e));
-    } else if (e instanceof QueryInBatchStatementException) {
-      DETAILED_FAILURE_QUERY_TRACE_LOGGER.warn(INFO_NOT_ALLOWED_IN_BATCH_ERROR, e);
-      return RpcUtils.getStatus(
-          TSStatusCode.QUERY_NOT_ALLOWED, INFO_NOT_ALLOWED_IN_BATCH_ERROR + getRootCause(e));
-    } else if (e instanceof IoTDBException && !(e instanceof StorageGroupNotReadyException)) {
-      DETAILED_FAILURE_QUERY_TRACE_LOGGER.warn(INFO_QUERY_PROCESS_ERROR, e);
-      return RpcUtils.getStatus(((IoTDBException) e).getErrorCode(), getRootCause(e));
-    }
-    return null;
-  }
-
-  private TSStatus onNonQueryException(Exception e, String operation) {
-    TSStatus status = tryCatchNonQueryException(e);
-    return status != null
-        ? status
-        : onNPEOrUnexpectedException(e, operation, TSStatusCode.INTERNAL_SERVER_ERROR);
-  }
-
-  private TSStatus onNonQueryException(Exception e, OperationType operation) {
-    return onNonQueryException(e, operation.getName());
-  }
-
-  private TSStatus tryCatchNonQueryException(Exception e) {
-    String message = "Exception occurred while processing non-query. ";
-    if (e instanceof BatchProcessException) {
-      LOGGER.warn(message, e);
-      return RpcUtils.getStatus(Arrays.asList(((BatchProcessException) e).getFailingStatus()));
-    } else if (e instanceof IoTDBException && !(e instanceof StorageGroupNotReadyException)) {
-      if (((IoTDBException) e).isUserException()) {
-        LOGGER.warn(message + e.getMessage());
-      } else {
-        LOGGER.warn(message, e);
-      }
-      return RpcUtils.getStatus(((IoTDBException) e).getErrorCode(), getRootCause(e));
-    }
-    return null;
-  }
-
-  private TSStatus onNPEOrUnexpectedException(
-      Exception e, String operation, TSStatusCode statusCode) {
-    String message = String.format("[%s] Exception occurred: %s failed. ", statusCode, operation);
-    if (e instanceof NullPointerException) {
-      LOGGER.error("Status code: {}, operation: {} failed", statusCode, operation, e);
-    } else {
-      LOGGER.warn("Status code: {}, operation: {} failed", statusCode, operation, e);
-    }
-    return RpcUtils.getStatus(statusCode, message + e.getMessage());
-  }
-
-  private TSStatus onNPEOrUnexpectedException(
-      Exception e, OperationType operation, TSStatusCode statusCode) {
-    return onNPEOrUnexpectedException(e, operation.getName(), statusCode);
-  }
-
-  private TSStatus onIoTDBException(Exception e, String operation, int errorCode) {
-    TSStatusCode statusCode = TSStatusCode.representOf(errorCode);
-    String message =
-        String.format(
-            "[%s] Exception occurred: %s failed. %s", statusCode, operation, e.getMessage());
-    LOGGER.warn("Status code: {}, operation: {} failed", statusCode, operation, e);
-    return RpcUtils.getStatus(errorCode, message);
-  }
-
-  private TSStatus onIoTDBException(Exception e, OperationType operation, int errorCode) {
-    return onIoTDBException(e, operation.getName(), errorCode);
-  }
-
   private TSStatus getNotLoggedInStatus() {
     return RpcUtils.getStatus(
         TSStatusCode.NOT_LOGIN_ERROR,
         "Log in failed. Either you are not authorized or the session has timed out.");
   }
-
-  private String getRootCause(Throwable e) {
-    while (e.getCause() != null) {
-      e = e.getCause();
-    }
-    return e.getMessage();
-  }
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/service/basic/BasicOpenSessionResp.java b/server/src/main/java/org/apache/iotdb/db/service/basic/BasicOpenSessionResp.java
new file mode 100644
index 0000000..755d9d7
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/service/basic/BasicOpenSessionResp.java
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.db.service.basic;
+
+import org.apache.iotdb.service.rpc.thrift.TSStatus;
+
+public class BasicOpenSessionResp extends TSStatus {
+  private long sessionId;
+
+  public long getSessionId() {
+    return sessionId;
+  }
+
+  public BasicOpenSessionResp sessionId(long sessionId) {
+    this.sessionId = sessionId;
+    return this;
+  }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/service/basic/BasicServiceProvider.java b/server/src/main/java/org/apache/iotdb/db/service/basic/BasicServiceProvider.java
new file mode 100644
index 0000000..0067bba
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/service/basic/BasicServiceProvider.java
@@ -0,0 +1,268 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.db.service.basic;
+
+import org.apache.iotdb.db.auth.AuthException;
+import org.apache.iotdb.db.auth.AuthorityChecker;
+import org.apache.iotdb.db.auth.authorizer.BasicAuthorizer;
+import org.apache.iotdb.db.auth.authorizer.IAuthorizer;
+import org.apache.iotdb.db.conf.IoTDBConfig;
+import org.apache.iotdb.db.conf.IoTDBConstant;
+import org.apache.iotdb.db.conf.IoTDBDescriptor;
+import org.apache.iotdb.db.conf.OperationType;
+import org.apache.iotdb.db.exception.StorageEngineException;
+import org.apache.iotdb.db.exception.metadata.MetadataException;
+import org.apache.iotdb.db.exception.metadata.StorageGroupNotSetException;
+import org.apache.iotdb.db.exception.query.QueryProcessException;
+import org.apache.iotdb.db.metadata.PartialPath;
+import org.apache.iotdb.db.qp.Planner;
+import org.apache.iotdb.db.qp.executor.IPlanExecutor;
+import org.apache.iotdb.db.qp.executor.PlanExecutor;
+import org.apache.iotdb.db.qp.physical.PhysicalPlan;
+import org.apache.iotdb.db.qp.physical.sys.AuthorPlan;
+import org.apache.iotdb.db.qp.physical.sys.FlushPlan;
+import org.apache.iotdb.db.qp.physical.sys.SetSystemModePlan;
+import org.apache.iotdb.db.query.context.QueryContext;
+import org.apache.iotdb.db.query.control.QueryTimeManager;
+import org.apache.iotdb.db.query.control.SessionManager;
+import org.apache.iotdb.db.query.control.SessionTimeoutManager;
+import org.apache.iotdb.db.query.control.tracing.TracingManager;
+import org.apache.iotdb.rpc.RpcUtils;
+import org.apache.iotdb.rpc.TSStatusCode;
+import org.apache.iotdb.service.rpc.thrift.TSProtocolVersion;
+import org.apache.iotdb.service.rpc.thrift.TSStatus;
+import org.apache.iotdb.tsfile.exception.filter.QueryFilterOptimizationException;
+import org.apache.iotdb.tsfile.read.query.dataset.QueryDataSet;
+
+import org.apache.thrift.TException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.sql.SQLException;
+import java.util.List;
+
+import static org.apache.iotdb.db.utils.ErrorHandlingUtils.onNPEOrUnexpectedException;
+
+public class BasicServiceProvider {
+
+  private static final Logger LOGGER = LoggerFactory.getLogger(BasicServiceProvider.class);
+  protected static final Logger AUDIT_LOGGER =
+      LoggerFactory.getLogger(IoTDBConstant.AUDIT_LOGGER_NAME);
+  protected static final Logger SLOW_SQL_LOGGER =
+      LoggerFactory.getLogger(IoTDBConstant.SLOW_SQL_LOGGER_NAME);
+
+  protected static final TSProtocolVersion CURRENT_RPC_VERSION =
+      TSProtocolVersion.IOTDB_SERVICE_PROTOCOL_V3;
+
+  protected static final IoTDBConfig CONFIG = IoTDBDescriptor.getInstance().getConfig();
+
+  protected final QueryTimeManager queryTimeManager = QueryTimeManager.getInstance();
+  protected final SessionManager sessionManager = SessionManager.getInstance();
+  protected final TracingManager tracingManager = TracingManager.getInstance();
+  protected final QueryFrequencyRecorder queryFrequencyRecorder;
+
+  protected Planner processor;
+  protected IPlanExecutor executor;
+
+  public BasicServiceProvider() throws QueryProcessException {
+    queryFrequencyRecorder = new QueryFrequencyRecorder(CONFIG);
+    processor = new Planner();
+    executor = new PlanExecutor();
+  }
+
+  /**
+   * Check whether current user has logged in.
+   *
+   * @return true: If logged in; false: If not logged in
+   */
+  protected boolean checkLogin(long sessionId) {
+    boolean isLoggedIn = sessionManager.getUsername(sessionId) != null;
+    if (!isLoggedIn) {
+      LOGGER.info("{}: Not login. ", IoTDBConstant.GLOBAL_DB_NAME);
+    } else {
+      SessionTimeoutManager.getInstance().refresh(sessionId);
+    }
+    return isLoggedIn;
+  }
+
+  protected boolean checkAuthorization(List<PartialPath> paths, PhysicalPlan plan, String username)
+      throws AuthException {
+    String targetUser = null;
+    if (plan instanceof AuthorPlan) {
+      targetUser = ((AuthorPlan) plan).getUserName();
+    }
+    return AuthorityChecker.check(username, paths, plan.getOperatorType(), targetUser);
+  }
+
+  protected TSStatus checkAuthority(PhysicalPlan plan, long sessionId) {
+    List<PartialPath> paths = plan.getPaths();
+    try {
+      if (!checkAuthorization(paths, plan, sessionManager.getUsername(sessionId))) {
+        return RpcUtils.getStatus(
+            TSStatusCode.NO_PERMISSION_ERROR,
+            "No permissions for this operation " + plan.getOperatorType());
+      }
+    } catch (AuthException e) {
+      LOGGER.warn("meet error while checking authorization.", e);
+      return RpcUtils.getStatus(TSStatusCode.UNINITIALIZED_AUTH_ERROR, e.getMessage());
+    } catch (Exception e) {
+      return onNPEOrUnexpectedException(
+          e, OperationType.CHECK_AUTHORITY, TSStatusCode.EXECUTE_STATEMENT_ERROR);
+    }
+    return null;
+  }
+
+  protected BasicOpenSessionResp openSession(
+      String username, String password, String zoneId, TSProtocolVersion tsProtocolVersion)
+      throws TException {
+    BasicOpenSessionResp openSessionResp = new BasicOpenSessionResp();
+
+    boolean status;
+    IAuthorizer authorizer;
+    try {
+      authorizer = BasicAuthorizer.getInstance();
+    } catch (AuthException e) {
+      throw new TException(e);
+    }
+    String loginMessage = null;
+    try {
+      status = authorizer.login(username, password);
+    } catch (AuthException e) {
+      LOGGER.info("meet error while logging in.", e);
+      status = false;
+      loginMessage = e.getMessage();
+    }
+
+    long sessionId = -1;
+    if (status) {
+      // check the version compatibility
+      boolean compatible = checkCompatibility(tsProtocolVersion);
+      if (!compatible) {
+        openSessionResp.setCode(TSStatusCode.INCOMPATIBLE_VERSION.getStatusCode());
+        openSessionResp.setMessage(
+            "The version is incompatible, please upgrade to " + IoTDBConstant.VERSION);
+        return openSessionResp.sessionId(sessionId);
+      }
+
+      openSessionResp.setCode(TSStatusCode.SUCCESS_STATUS.getStatusCode());
+      openSessionResp.setMessage("Login successfully");
+
+      sessionId = sessionManager.requestSessionId(username, zoneId);
+      AUDIT_LOGGER.info("User {} opens Session-{}", username, sessionId);
+      LOGGER.info(
+          "{}: Login status: {}. User : {}",
+          IoTDBConstant.GLOBAL_DB_NAME,
+          openSessionResp.getMessage(),
+          username);
+    } else {
+      openSessionResp.setMessage(loginMessage != null ? loginMessage : "Authentication failed.");
+      openSessionResp.setCode(TSStatusCode.WRONG_LOGIN_PASSWORD_ERROR.getStatusCode());
+
+      sessionId = sessionManager.requestSessionId(username, zoneId);
+      AUDIT_LOGGER.info("User {} opens Session failed with an incorrect password", username);
+    }
+
+    SessionTimeoutManager.getInstance().register(sessionId);
+    return openSessionResp.sessionId(sessionId);
+  }
+
+  protected boolean closeSession(long sessionId) {
+    AUDIT_LOGGER.info("Session-{} is closing", sessionId);
+
+    sessionManager.removeCurrSessionId();
+
+    return SessionTimeoutManager.getInstance().unregister(sessionId);
+  }
+
+  protected TSStatus closeOperation(
+      long sessionId,
+      long queryId,
+      long statementId,
+      boolean haveStatementId,
+      boolean haveSetQueryId) {
+    if (!checkLogin(sessionId)) {
+      return RpcUtils.getStatus(
+          TSStatusCode.NOT_LOGIN_ERROR,
+          "Log in failed. Either you are not authorized or the session has timed out.");
+    }
+
+    if (AUDIT_LOGGER.isDebugEnabled()) {
+      AUDIT_LOGGER.debug(
+          "{}: receive close operation from Session {}",
+          IoTDBConstant.GLOBAL_DB_NAME,
+          sessionManager.getCurrSessionId());
+    }
+
+    try {
+      if (haveStatementId) {
+        if (haveSetQueryId) {
+          sessionManager.closeDataset(statementId, queryId);
+        } else {
+          sessionManager.closeStatement(sessionId, statementId);
+        }
+        return RpcUtils.getStatus(TSStatusCode.SUCCESS_STATUS);
+      } else {
+        return RpcUtils.getStatus(
+            TSStatusCode.CLOSE_OPERATION_ERROR, "statement id not set by client.");
+      }
+    } catch (Exception e) {
+      return onNPEOrUnexpectedException(
+          e, OperationType.CLOSE_OPERATION, TSStatusCode.CLOSE_OPERATION_ERROR);
+    }
+  }
+
+  protected QueryContext genQueryContext(
+      long queryId, boolean debug, long startTime, String statement, long timeout) {
+    return new QueryContext(queryId, debug, startTime, statement, timeout);
+  }
+
+  /** create QueryDataSet and buffer it for fetchResults */
+  protected QueryDataSet createQueryDataSet(
+      QueryContext context, PhysicalPlan physicalPlan, int fetchSize)
+      throws QueryProcessException, QueryFilterOptimizationException, StorageEngineException,
+          IOException, MetadataException, SQLException, TException, InterruptedException {
+
+    QueryDataSet queryDataSet = executor.processQuery(physicalPlan, context);
+    queryDataSet.setFetchSize(fetchSize);
+    sessionManager.setDataset(context.getQueryId(), queryDataSet);
+    return queryDataSet;
+  }
+
+  protected boolean executeNonQuery(PhysicalPlan plan)
+      throws QueryProcessException, StorageGroupNotSetException, StorageEngineException {
+    plan.checkIntegrity();
+    if (!(plan instanceof SetSystemModePlan)
+        && !(plan instanceof FlushPlan)
+        && IoTDBDescriptor.getInstance().getConfig().isReadOnly()) {
+      throw new QueryProcessException(
+          "Current system mode is read-only, does not support non-query operation");
+    }
+    return executor.processNonQuery(plan);
+  }
+
+  /** release single operation resource */
+  protected void releaseQueryResource(long queryId) throws StorageEngineException {
+    sessionManager.releaseQueryResource(queryId);
+  }
+
+  private boolean checkCompatibility(TSProtocolVersion version) {
+    return version.equals(CURRENT_RPC_VERSION);
+  }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/service/basic/QueryFrequencyRecorder.java b/server/src/main/java/org/apache/iotdb/db/service/basic/QueryFrequencyRecorder.java
new file mode 100644
index 0000000..0c5cf6f
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/service/basic/QueryFrequencyRecorder.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.db.service.basic;
+
+import org.apache.iotdb.db.concurrent.IoTDBThreadPoolFactory;
+import org.apache.iotdb.db.conf.IoTDBConfig;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+
+public class QueryFrequencyRecorder {
+
+  private static final Logger QUERY_FREQUENCY_LOGGER = LoggerFactory.getLogger("QUERY_FREQUENCY");
+  private static final AtomicInteger QUERY_COUNT = new AtomicInteger(0);
+
+  public QueryFrequencyRecorder(IoTDBConfig config) {
+    ScheduledExecutorService timedQuerySqlCountThread =
+        IoTDBThreadPoolFactory.newSingleThreadScheduledExecutor("timedQuerySqlCount");
+    timedQuerySqlCountThread.scheduleAtFixedRate(
+        () -> {
+          if (QUERY_COUNT.get() != 0) {
+            QUERY_FREQUENCY_LOGGER.info(
+                "Query count in current 1 minute {} ", QUERY_COUNT.getAndSet(0));
+          }
+        },
+        config.getFrequencyIntervalInMinute(),
+        config.getFrequencyIntervalInMinute(),
+        TimeUnit.MINUTES);
+  }
+
+  public void incrementAndGet() {
+    QUERY_COUNT.incrementAndGet();
+  }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/utils/ErrorHandlingUtils.java b/server/src/main/java/org/apache/iotdb/db/utils/ErrorHandlingUtils.java
new file mode 100644
index 0000000..41db954
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/utils/ErrorHandlingUtils.java
@@ -0,0 +1,152 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.db.utils;
+
+import org.apache.iotdb.db.conf.OperationType;
+import org.apache.iotdb.db.exception.BatchProcessException;
+import org.apache.iotdb.db.exception.IoTDBException;
+import org.apache.iotdb.db.exception.QueryInBatchStatementException;
+import org.apache.iotdb.db.exception.StorageGroupNotReadyException;
+import org.apache.iotdb.db.exception.query.QueryProcessException;
+import org.apache.iotdb.db.exception.query.QueryTimeoutRuntimeException;
+import org.apache.iotdb.db.exception.runtime.SQLParserException;
+import org.apache.iotdb.rpc.RpcUtils;
+import org.apache.iotdb.rpc.TSStatusCode;
+import org.apache.iotdb.service.rpc.thrift.TSStatus;
+
+import org.antlr.v4.runtime.misc.ParseCancellationException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Arrays;
+
+public class ErrorHandlingUtils {
+
+  private static final Logger LOGGER = LoggerFactory.getLogger(ErrorHandlingUtils.class);
+  private static final Logger DETAILED_FAILURE_QUERY_TRACE_LOGGER =
+      LoggerFactory.getLogger("DETAILED_FAILURE_QUERY_TRACE");
+
+  private static final String INFO_PARSING_SQL_ERROR =
+      "Error occurred while parsing SQL to physical plan: ";
+  private static final String INFO_CHECK_METADATA_ERROR = "Check metadata error: ";
+  private static final String INFO_QUERY_PROCESS_ERROR = "Error occurred in query process: ";
+  private static final String INFO_NOT_ALLOWED_IN_BATCH_ERROR =
+      "The query statement is not allowed in batch: ";
+
+  public static TSStatus onNPEOrUnexpectedException(
+      Exception e, String operation, TSStatusCode statusCode) {
+    String message = String.format("[%s] Exception occurred: %s failed. ", statusCode, operation);
+    if (e instanceof NullPointerException) {
+      LOGGER.error("Status code: {}, operation: {} failed", statusCode, operation, e);
+    } else {
+      LOGGER.warn("Status code: {}, operation: {} failed", statusCode, operation, e);
+    }
+    return RpcUtils.getStatus(statusCode, message + e.getMessage());
+  }
+
+  public static TSStatus onNPEOrUnexpectedException(
+      Exception e, OperationType operation, TSStatusCode statusCode) {
+    return onNPEOrUnexpectedException(e, operation.getName(), statusCode);
+  }
+
+  public static String getRootCause(Throwable e) {
+    while (e.getCause() != null) {
+      e = e.getCause();
+    }
+    return e.getMessage();
+  }
+
+  public static TSStatus onQueryException(Exception e, String operation) {
+    TSStatus status = tryCatchQueryException(e);
+    return status != null
+        ? status
+        : onNPEOrUnexpectedException(e, operation, TSStatusCode.INTERNAL_SERVER_ERROR);
+  }
+
+  public static TSStatus onQueryException(Exception e, OperationType operation) {
+    return onQueryException(e, operation.getName());
+  }
+
+  public static TSStatus tryCatchQueryException(Exception e) {
+    if (e instanceof QueryTimeoutRuntimeException) {
+      DETAILED_FAILURE_QUERY_TRACE_LOGGER.warn(e.getMessage(), e);
+      return RpcUtils.getStatus(TSStatusCode.TIME_OUT, getRootCause(e));
+    } else if (e instanceof ParseCancellationException) {
+      DETAILED_FAILURE_QUERY_TRACE_LOGGER.warn(INFO_PARSING_SQL_ERROR, e);
+      return RpcUtils.getStatus(
+          TSStatusCode.SQL_PARSE_ERROR, INFO_PARSING_SQL_ERROR + getRootCause(e));
+    } else if (e instanceof SQLParserException) {
+      DETAILED_FAILURE_QUERY_TRACE_LOGGER.warn(INFO_CHECK_METADATA_ERROR, e);
+      return RpcUtils.getStatus(
+          TSStatusCode.METADATA_ERROR, INFO_CHECK_METADATA_ERROR + getRootCause(e));
+    } else if (e instanceof QueryProcessException) {
+      DETAILED_FAILURE_QUERY_TRACE_LOGGER.warn(INFO_QUERY_PROCESS_ERROR, e);
+      return RpcUtils.getStatus(
+          TSStatusCode.QUERY_PROCESS_ERROR, INFO_QUERY_PROCESS_ERROR + getRootCause(e));
+    } else if (e instanceof QueryInBatchStatementException) {
+      DETAILED_FAILURE_QUERY_TRACE_LOGGER.warn(INFO_NOT_ALLOWED_IN_BATCH_ERROR, e);
+      return RpcUtils.getStatus(
+          TSStatusCode.QUERY_NOT_ALLOWED, INFO_NOT_ALLOWED_IN_BATCH_ERROR + getRootCause(e));
+    } else if (e instanceof IoTDBException && !(e instanceof StorageGroupNotReadyException)) {
+      DETAILED_FAILURE_QUERY_TRACE_LOGGER.warn(INFO_QUERY_PROCESS_ERROR, e);
+      return RpcUtils.getStatus(((IoTDBException) e).getErrorCode(), getRootCause(e));
+    }
+    return null;
+  }
+
+  public static TSStatus onNonQueryException(Exception e, String operation) {
+    TSStatus status = tryCatchNonQueryException(e);
+    return status != null
+        ? status
+        : onNPEOrUnexpectedException(e, operation, TSStatusCode.INTERNAL_SERVER_ERROR);
+  }
+
+  public static TSStatus onNonQueryException(Exception e, OperationType operation) {
+    return onNonQueryException(e, operation.getName());
+  }
+
+  public static TSStatus tryCatchNonQueryException(Exception e) {
+    String message = "Exception occurred while processing non-query. ";
+    if (e instanceof BatchProcessException) {
+      LOGGER.warn(message, e);
+      return RpcUtils.getStatus(Arrays.asList(((BatchProcessException) e).getFailingStatus()));
+    } else if (e instanceof IoTDBException && !(e instanceof StorageGroupNotReadyException)) {
+      if (((IoTDBException) e).isUserException()) {
+        LOGGER.warn(message + e.getMessage());
+      } else {
+        LOGGER.warn(message, e);
+      }
+      return RpcUtils.getStatus(((IoTDBException) e).getErrorCode(), getRootCause(e));
+    }
+    return null;
+  }
+
+  public static TSStatus onIoTDBException(Exception e, String operation, int errorCode) {
+    TSStatusCode statusCode = TSStatusCode.representOf(errorCode);
+    String message =
+        String.format(
+            "[%s] Exception occurred: %s failed. %s", statusCode, operation, e.getMessage());
+    LOGGER.warn("Status code: {}, operation: {} failed", statusCode, operation, e);
+    return RpcUtils.getStatus(errorCode, message);
+  }
+
+  public static TSStatus onIoTDBException(Exception e, OperationType operation, int errorCode) {
+    return onIoTDBException(e, operation.getName(), errorCode);
+  }
+}