You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ja...@apache.org on 2022/12/29 05:59:08 UTC

[iotdb] 01/02: [To rel/1.0] Add SHOW_CQ privilege

This is an automated email from the ASF dual-hosted git repository.

jackietien pushed a commit to branch IOTDB-5217-1-0
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit 2a63e74ed8ff317b9ec1780a3ac76f499d44b0af
Author: JackieTien97 <ja...@gmail.com>
AuthorDate: Thu Dec 29 13:51:42 2022 +0800

    [To rel/1.0] Add SHOW_CQ privilege
---
 .../antlr4/org/apache/iotdb/db/qp/sql/SqlLexer.g4  |    6 +-
 .../java/org/apache/iotdb/db/it/cq/IoTDBCQIT.java  |   88 ++
 .../iotdb/commons/auth/entity/PrivilegeType.java   |    3 +-
 .../org/apache/iotdb/db/auth/AuthorityChecker.java |    2 +
 .../iotdb/db/query/control/SessionManager.java     |   22 -
 .../db/service/thrift/impl/TSServiceImpl.java      | 1430 --------------------
 6 files changed, 97 insertions(+), 1454 deletions(-)

diff --git a/antlr/src/main/antlr4/org/apache/iotdb/db/qp/sql/SqlLexer.g4 b/antlr/src/main/antlr4/org/apache/iotdb/db/qp/sql/SqlLexer.g4
index 5df211d6c1..f531c27425 100644
--- a/antlr/src/main/antlr4/org/apache/iotdb/db/qp/sql/SqlLexer.g4
+++ b/antlr/src/main/antlr4/org/apache/iotdb/db/qp/sql/SqlLexer.g4
@@ -716,7 +716,7 @@ PRIVILEGE_VALUE
     | GRANT_USER_PRIVILEGE | REVOKE_USER_PRIVILEGE | GRANT_USER_ROLE | REVOKE_USER_ROLE
     | CREATE_ROLE | DELETE_ROLE | LIST_ROLE | GRANT_ROLE_PRIVILEGE | REVOKE_ROLE_PRIVILEGE
     | CREATE_FUNCTION | DROP_FUNCTION | CREATE_TRIGGER | DROP_TRIGGER | START_TRIGGER | STOP_TRIGGER
-    | CREATE_CONTINUOUS_QUERY | DROP_CONTINUOUS_QUERY
+    | CREATE_CONTINUOUS_QUERY | DROP_CONTINUOUS_QUERY | SHOW_CONTINUOUS_QUERIES
     | APPLY_TEMPLATE | UPDATE_TEMPLATE | READ_TEMPLATE | READ_TEMPLATE_APPLICATION
     ;
 
@@ -840,6 +840,10 @@ DROP_CONTINUOUS_QUERY
     : D R O P '_' C O N T I N U O U S '_' Q U E R Y
     ;
 
+SHOW_CONTINUOUS_QUERIES
+    : S H O W '_' C O N T I N U O U S '_' Q U E R I E S
+    ;
+
 SCHEMA_REPLICATION_FACTOR
     : S C H E M A '_' R E P L I C A T I O N '_' F A C T O R
     ;
diff --git a/integration-test/src/test/java/org/apache/iotdb/db/it/cq/IoTDBCQIT.java b/integration-test/src/test/java/org/apache/iotdb/db/it/cq/IoTDBCQIT.java
index 3a520e5a71..b03ddec6f1 100644
--- a/integration-test/src/test/java/org/apache/iotdb/db/it/cq/IoTDBCQIT.java
+++ b/integration-test/src/test/java/org/apache/iotdb/db/it/cq/IoTDBCQIT.java
@@ -481,6 +481,94 @@ public class IoTDBCQIT {
     }
   }
 
+  @Test
+  public void testShowAuth() {
+    try (Connection connection = EnvFactory.getEnv().getConnection();
+        Statement statement = connection.createStatement()) {
+
+      String[] cqIds = {"show_cq_1", "show_cq_2", "show_cq_3", "show_cq_4"};
+      String[] cqSQLs = {
+        "CREATE CQ show_cq_1 \n"
+            + "RESAMPLE \n"
+            + "  EVERY 30m\n"
+            + "  BOUNDARY 0\n"
+            + "  RANGE 30m, 10m\n"
+            + "TIMEOUT POLICY BLOCKED\n"
+            + "BEGIN \n"
+            + "  SELECT count(s1)  \n"
+            + "    INTO root.sg_count.d(count_s1)\n"
+            + "    FROM root.sg.d\n"
+            + "    GROUP BY(30m)\n"
+            + "END",
+        "CREATE CQ show_cq_2\n"
+            + "BEGIN \n"
+            + "  SELECT count(s1)  \n"
+            + "    INTO root.sg_count.d(count_s1)\n"
+            + "    FROM root.sg.d\n"
+            + "    GROUP BY(30m)\n"
+            + "END",
+        "CREATE CQ show_cq_3\n"
+            + "RESAMPLE RANGE 30m, 0m\n"
+            + "TIMEOUT POLICY DISCARD\n"
+            + "BEGIN \n"
+            + "  SELECT count(s1)  \n"
+            + "    INTO root.sg_count.d(count_s1)\n"
+            + "    FROM root.sg.d\n"
+            + "    GROUP BY(10m)\n"
+            + "END",
+        "CREATE CQ show_cq_4\n"
+            + "RESAMPLE EVERY 30m \n"
+            + "TIMEOUT POLICY DISCARD\n"
+            + "BEGIN \n"
+            + "  SELECT count(s1)  \n"
+            + "    INTO root.sg_count.d(count_s1)\n"
+            + "    FROM root.sg.d\n"
+            + "    GROUP BY(10m)\n"
+            + "END"
+      };
+
+      for (String sql : cqSQLs) {
+        statement.execute(sql);
+      }
+
+      statement.execute("CREATE USER `zmty` 'zmty'");
+
+      try (Connection connection2 = EnvFactory.getEnv().getConnection("zmty", "zmty");
+          Statement statement2 = connection2.createStatement()) {
+        try {
+          statement2.executeQuery("show CQS");
+          fail();
+        } catch (Exception e) {
+          assertEquals(
+              TSStatusCode.NO_PERMISSION.getStatusCode()
+                  + ": No permissions for this operation, please add privilege SHOW_CONTINUOUS_QUERIES",
+              e.getMessage());
+        }
+
+        statement.execute("GRANT USER `zmty` PRIVILEGES SHOW_CONTINUOUS_QUERIES");
+
+        try (ResultSet resultSet = statement2.executeQuery("show CQS")) {
+
+          int cnt = 0;
+          while (resultSet.next()) {
+            // No need to add time column for aggregation query
+            assertEquals(cqIds[cnt], resultSet.getString(1));
+            assertEquals(cqSQLs[cnt], resultSet.getString(2));
+            assertEquals("ACTIVE", resultSet.getString(3));
+            cnt++;
+          }
+          assertEquals(cqIds.length, cnt);
+        }
+      }
+
+      for (String cqId : cqIds) {
+        statement.execute(String.format("DROP CQ %s;", cqId));
+      }
+    } catch (Exception e) {
+      fail(e.getMessage());
+    }
+  }
+
   // =======================================drop cq======================================
   @Test
   public void testDropCQ() {
diff --git a/node-commons/src/main/java/org/apache/iotdb/commons/auth/entity/PrivilegeType.java b/node-commons/src/main/java/org/apache/iotdb/commons/auth/entity/PrivilegeType.java
index 886d98d80b..36c2ecadca 100644
--- a/node-commons/src/main/java/org/apache/iotdb/commons/auth/entity/PrivilegeType.java
+++ b/node-commons/src/main/java/org/apache/iotdb/commons/auth/entity/PrivilegeType.java
@@ -55,7 +55,8 @@ public enum PrivilegeType {
   UPDATE_TEMPLATE,
   READ_TEMPLATE,
   APPLY_TEMPLATE(true),
-  READ_TEMPLATE_APPLICATION;
+  READ_TEMPLATE_APPLICATION,
+  SHOW_CONTINUOUS_QUERIES;
 
   private static final int PRIVILEGE_COUNT = values().length;
 
diff --git a/server/src/main/java/org/apache/iotdb/db/auth/AuthorityChecker.java b/server/src/main/java/org/apache/iotdb/db/auth/AuthorityChecker.java
index 40fc04e1d2..598ed232b0 100644
--- a/server/src/main/java/org/apache/iotdb/db/auth/AuthorityChecker.java
+++ b/server/src/main/java/org/apache/iotdb/db/auth/AuthorityChecker.java
@@ -360,6 +360,8 @@ public class AuthorityChecker {
       case SHOW_PATH_SET_SCHEMA_TEMPLATE:
       case SHOW_PATH_USING_SCHEMA_TEMPLATE:
         return PrivilegeType.READ_TEMPLATE_APPLICATION.ordinal();
+      case SHOW_CONTINUOUS_QUERIES:
+        return PrivilegeType.SHOW_CONTINUOUS_QUERIES.ordinal();
       default:
         logger.error("Unrecognizable operator type ({}) for AuthorityChecker.", type);
         return -1;
diff --git a/server/src/main/java/org/apache/iotdb/db/query/control/SessionManager.java b/server/src/main/java/org/apache/iotdb/db/query/control/SessionManager.java
index 68cc811b6e..cf9250aff9 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/control/SessionManager.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/control/SessionManager.java
@@ -20,7 +20,6 @@ package org.apache.iotdb.db.query.control;
 
 import org.apache.iotdb.common.rpc.thrift.TSStatus;
 import org.apache.iotdb.commons.auth.AuthException;
-import org.apache.iotdb.commons.auth.entity.PrivilegeType;
 import org.apache.iotdb.commons.conf.IoTDBConstant;
 import org.apache.iotdb.commons.service.JMXService;
 import org.apache.iotdb.db.auth.AuthorityChecker;
@@ -53,7 +52,6 @@ import java.util.function.Consumer;
 import java.util.stream.Collectors;
 
 import static org.apache.iotdb.db.utils.ErrorHandlingUtils.onNPEOrUnexpectedException;
-import static org.apache.iotdb.db.utils.ErrorHandlingUtils.onQueryException;
 
 public class SessionManager implements SessionManagerMBean {
   private static final Logger LOGGER = LoggerFactory.getLogger(SessionManager.class);
@@ -266,26 +264,6 @@ public class SessionManager implements SessionManagerMBean {
         username, plan.getAuthPaths(), plan.getOperatorType(), targetUser);
   }
 
-  /** Check whether specific Session has the authorization to given plan. */
-  public TSStatus checkAuthority(PhysicalPlan plan, IClientSession session) {
-    try {
-      if (!checkAuthorization(plan, session.getUsername())) {
-        return RpcUtils.getStatus(
-            TSStatusCode.NO_PERMISSION,
-            "No permissions for this operation, please add privilege "
-                + PrivilegeType.values()[
-                    AuthorityChecker.translateToPermissionId(plan.getOperatorType())]);
-      }
-    } catch (AuthException e) {
-      LOGGER.warn("meet error while checking authorization.", e);
-      return RpcUtils.getStatus(TSStatusCode.UNINITIALIZED_AUTH_ERROR, e.getMessage());
-    } catch (Exception e) {
-      return onQueryException(
-          e, OperationType.CHECK_AUTHORITY.getName(), TSStatusCode.EXECUTE_STATEMENT_ERROR);
-    }
-    return null;
-  }
-
   /**
    * this method can be only used in client-thread model.
    *
diff --git a/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/TSServiceImpl.java b/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/TSServiceImpl.java
deleted file mode 100644
index 54248476aa..0000000000
--- a/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/TSServiceImpl.java
+++ /dev/null
@@ -1,1430 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.iotdb.db.service.thrift.impl;
-
-import org.apache.iotdb.common.rpc.thrift.TEndPoint;
-import org.apache.iotdb.common.rpc.thrift.TSStatus;
-import org.apache.iotdb.commons.auth.AuthException;
-import org.apache.iotdb.commons.auth.authorizer.IAuthorizer;
-import org.apache.iotdb.commons.conf.CommonDescriptor;
-import org.apache.iotdb.commons.conf.IoTDBConstant;
-import org.apache.iotdb.commons.exception.IllegalPathException;
-import org.apache.iotdb.commons.exception.IoTDBException;
-import org.apache.iotdb.commons.exception.MetadataException;
-import org.apache.iotdb.commons.path.MeasurementPath;
-import org.apache.iotdb.commons.path.PartialPath;
-import org.apache.iotdb.commons.service.metric.MetricService;
-import org.apache.iotdb.commons.service.metric.enums.Metric;
-import org.apache.iotdb.commons.service.metric.enums.Operation;
-import org.apache.iotdb.commons.utils.PathUtils;
-import org.apache.iotdb.db.auth.AuthorityChecker;
-import org.apache.iotdb.db.auth.AuthorizerManager;
-import org.apache.iotdb.db.conf.IoTDBConfig;
-import org.apache.iotdb.db.conf.IoTDBDescriptor;
-import org.apache.iotdb.db.conf.OperationType;
-import org.apache.iotdb.db.exception.StorageEngineException;
-import org.apache.iotdb.db.exception.query.QueryProcessException;
-import org.apache.iotdb.db.qp.logical.Operator.OperatorType;
-import org.apache.iotdb.db.qp.physical.PhysicalPlan;
-import org.apache.iotdb.db.qp.physical.crud.QueryPlan;
-import org.apache.iotdb.db.qp.physical.crud.UDFPlan;
-import org.apache.iotdb.db.qp.physical.sys.AppendTemplatePlan;
-import org.apache.iotdb.db.qp.physical.sys.CreateAlignedTimeSeriesPlan;
-import org.apache.iotdb.db.qp.physical.sys.CreateMultiTimeSeriesPlan;
-import org.apache.iotdb.db.qp.physical.sys.CreateTemplatePlan;
-import org.apache.iotdb.db.qp.physical.sys.CreateTimeSeriesPlan;
-import org.apache.iotdb.db.qp.physical.sys.DeleteStorageGroupPlan;
-import org.apache.iotdb.db.qp.physical.sys.DeleteTimeSeriesPlan;
-import org.apache.iotdb.db.qp.physical.sys.DropTemplatePlan;
-import org.apache.iotdb.db.qp.physical.sys.PruneTemplatePlan;
-import org.apache.iotdb.db.qp.physical.sys.SetStorageGroupPlan;
-import org.apache.iotdb.db.qp.physical.sys.SetTemplatePlan;
-import org.apache.iotdb.db.qp.physical.sys.ShowQueryProcesslistPlan;
-import org.apache.iotdb.db.qp.physical.sys.UnsetTemplatePlan;
-import org.apache.iotdb.db.query.context.QueryContext;
-import org.apache.iotdb.db.query.control.SessionManager;
-import org.apache.iotdb.db.query.control.clientsession.IClientSession;
-import org.apache.iotdb.db.query.control.tracing.TracingConstant;
-import org.apache.iotdb.db.query.dataset.DirectAlignByTimeDataSet;
-import org.apache.iotdb.db.query.dataset.DirectNonAlignDataSet;
-import org.apache.iotdb.db.query.pool.QueryTaskManager;
-import org.apache.iotdb.db.service.IoTDB;
-import org.apache.iotdb.db.service.StaticResps;
-import org.apache.iotdb.db.service.basic.BasicOpenSessionResp;
-import org.apache.iotdb.db.service.basic.ServiceProvider;
-import org.apache.iotdb.db.sync.SyncService;
-import org.apache.iotdb.db.tools.watermark.GroupedLSBWatermarkEncoder;
-import org.apache.iotdb.db.tools.watermark.WatermarkEncoder;
-import org.apache.iotdb.db.utils.QueryDataSetUtils;
-import org.apache.iotdb.metrics.utils.MetricLevel;
-import org.apache.iotdb.rpc.RedirectException;
-import org.apache.iotdb.rpc.RpcUtils;
-import org.apache.iotdb.rpc.TSStatusCode;
-import org.apache.iotdb.service.rpc.thrift.ServerProperties;
-import org.apache.iotdb.service.rpc.thrift.TSAppendSchemaTemplateReq;
-import org.apache.iotdb.service.rpc.thrift.TSBackupConfigurationResp;
-import org.apache.iotdb.service.rpc.thrift.TSCancelOperationReq;
-import org.apache.iotdb.service.rpc.thrift.TSCloseOperationReq;
-import org.apache.iotdb.service.rpc.thrift.TSCloseSessionReq;
-import org.apache.iotdb.service.rpc.thrift.TSConnectionInfoResp;
-import org.apache.iotdb.service.rpc.thrift.TSCreateAlignedTimeseriesReq;
-import org.apache.iotdb.service.rpc.thrift.TSCreateMultiTimeseriesReq;
-import org.apache.iotdb.service.rpc.thrift.TSCreateSchemaTemplateReq;
-import org.apache.iotdb.service.rpc.thrift.TSCreateTimeseriesReq;
-import org.apache.iotdb.service.rpc.thrift.TSDeleteDataReq;
-import org.apache.iotdb.service.rpc.thrift.TSDropSchemaTemplateReq;
-import org.apache.iotdb.service.rpc.thrift.TSExecuteBatchStatementReq;
-import org.apache.iotdb.service.rpc.thrift.TSExecuteStatementReq;
-import org.apache.iotdb.service.rpc.thrift.TSExecuteStatementResp;
-import org.apache.iotdb.service.rpc.thrift.TSFetchMetadataReq;
-import org.apache.iotdb.service.rpc.thrift.TSFetchMetadataResp;
-import org.apache.iotdb.service.rpc.thrift.TSFetchResultsReq;
-import org.apache.iotdb.service.rpc.thrift.TSFetchResultsResp;
-import org.apache.iotdb.service.rpc.thrift.TSGetTimeZoneResp;
-import org.apache.iotdb.service.rpc.thrift.TSInsertRecordReq;
-import org.apache.iotdb.service.rpc.thrift.TSInsertRecordsOfOneDeviceReq;
-import org.apache.iotdb.service.rpc.thrift.TSInsertRecordsReq;
-import org.apache.iotdb.service.rpc.thrift.TSInsertStringRecordReq;
-import org.apache.iotdb.service.rpc.thrift.TSInsertStringRecordsOfOneDeviceReq;
-import org.apache.iotdb.service.rpc.thrift.TSInsertStringRecordsReq;
-import org.apache.iotdb.service.rpc.thrift.TSInsertTabletReq;
-import org.apache.iotdb.service.rpc.thrift.TSInsertTabletsReq;
-import org.apache.iotdb.service.rpc.thrift.TSLastDataQueryReq;
-import org.apache.iotdb.service.rpc.thrift.TSOpenSessionReq;
-import org.apache.iotdb.service.rpc.thrift.TSOpenSessionResp;
-import org.apache.iotdb.service.rpc.thrift.TSPruneSchemaTemplateReq;
-import org.apache.iotdb.service.rpc.thrift.TSQueryDataSet;
-import org.apache.iotdb.service.rpc.thrift.TSQueryNonAlignDataSet;
-import org.apache.iotdb.service.rpc.thrift.TSQueryTemplateReq;
-import org.apache.iotdb.service.rpc.thrift.TSQueryTemplateResp;
-import org.apache.iotdb.service.rpc.thrift.TSRawDataQueryReq;
-import org.apache.iotdb.service.rpc.thrift.TSSetSchemaTemplateReq;
-import org.apache.iotdb.service.rpc.thrift.TSSetTimeZoneReq;
-import org.apache.iotdb.service.rpc.thrift.TSTracingInfo;
-import org.apache.iotdb.service.rpc.thrift.TSUnsetSchemaTemplateReq;
-import org.apache.iotdb.service.rpc.thrift.TSyncIdentityInfo;
-import org.apache.iotdb.service.rpc.thrift.TSyncTransportMetaInfo;
-import org.apache.iotdb.tsfile.exception.filter.QueryFilterOptimizationException;
-import org.apache.iotdb.tsfile.exception.write.UnSupportedDataTypeException;
-import org.apache.iotdb.tsfile.file.metadata.enums.CompressionType;
-import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
-import org.apache.iotdb.tsfile.file.metadata.enums.TSEncoding;
-import org.apache.iotdb.tsfile.read.common.Path;
-import org.apache.iotdb.tsfile.read.query.dataset.QueryDataSet;
-
-import org.apache.thrift.TException;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.IOException;
-import java.nio.ByteBuffer;
-import java.sql.SQLException;
-import java.time.ZoneId;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collections;
-import java.util.List;
-import java.util.Map;
-import java.util.Map.Entry;
-import java.util.concurrent.Callable;
-import java.util.concurrent.Future;
-import java.util.stream.Collectors;
-
-import static org.apache.iotdb.db.service.basic.ServiceProvider.AUDIT_LOGGER;
-import static org.apache.iotdb.db.service.basic.ServiceProvider.CONFIG;
-import static org.apache.iotdb.db.service.basic.ServiceProvider.CURRENT_RPC_VERSION;
-import static org.apache.iotdb.db.service.basic.ServiceProvider.TRACING_MANAGER;
-import static org.apache.iotdb.db.utils.ErrorHandlingUtils.onIoTDBException;
-import static org.apache.iotdb.db.utils.ErrorHandlingUtils.onNPEOrUnexpectedException;
-import static org.apache.iotdb.db.utils.ErrorHandlingUtils.onNonQueryException;
-import static org.apache.iotdb.db.utils.ErrorHandlingUtils.onQueryException;
-
-/** Thrift RPC implementation at server side. */
-public class TSServiceImpl implements IClientRPCServiceWithHandler {
-
-  private static final SessionManager SESSION_MANAGER = SessionManager.getInstance();
-
-  private class QueryTask implements Callable<TSExecuteStatementResp> {
-
-    private final PhysicalPlan plan;
-    private final long queryStartTime;
-    private final IClientSession session;
-    private final String statement;
-    private final long statementId;
-    private final long timeout;
-    private final int fetchSize;
-    private final boolean isJdbcQuery;
-    private final boolean enableRedirectQuery;
-
-    /**
-     * Execute query statement, return TSExecuteStatementResp with dataset.
-     *
-     * @param plan must be a plan for Query: QueryPlan, ShowPlan, and some AuthorPlan
-     */
-    public QueryTask(
-        PhysicalPlan plan,
-        long queryStartTime,
-        IClientSession session,
-        String statement,
-        long statementId,
-        long timeout,
-        int fetchSize,
-        boolean isJdbcQuery,
-        boolean enableRedirectQuery) {
-      this.plan = plan;
-      this.queryStartTime = queryStartTime;
-      this.session = session;
-      this.statement = statement;
-      this.statementId = statementId;
-      this.timeout = timeout;
-      this.fetchSize = fetchSize;
-      this.isJdbcQuery = isJdbcQuery;
-      this.enableRedirectQuery = enableRedirectQuery;
-    }
-
-    @Override
-    public TSExecuteStatementResp call() throws Exception {
-      return null;
-    }
-  }
-
-  private class FetchResultsTask implements Callable<TSFetchResultsResp> {
-
-    private final IClientSession session;
-    private final long queryId;
-    private final int fetchSize;
-    private final boolean isAlign;
-
-    public FetchResultsTask(IClientSession session, long queryId, int fetchSize, boolean isAlign) {
-      this.session = session;
-      this.queryId = queryId;
-      this.fetchSize = fetchSize;
-      this.isAlign = isAlign;
-    }
-
-    @Override
-    public TSFetchResultsResp call() throws Exception {
-      return null;
-    }
-  }
-
-  // main logger
-  private static final Logger LOGGER = LoggerFactory.getLogger(TSServiceImpl.class);
-
-  private static final String INFO_INTERRUPT_ERROR =
-      "Current Thread interrupted when dealing with request {}";
-
-  protected final ServiceProvider serviceProvider;
-
-  public TSServiceImpl() {
-    super();
-    serviceProvider = IoTDB.serviceProvider;
-  }
-
-  @Override
-  public TSExecuteStatementResp executeQueryStatementV2(TSExecuteStatementReq req)
-      throws TException {
-    return null;
-  }
-
-  @Override
-  public TSExecuteStatementResp executeUpdateStatementV2(TSExecuteStatementReq req)
-      throws TException {
-    return null;
-  }
-
-  @Override
-  public TSExecuteStatementResp executeStatementV2(TSExecuteStatementReq req) throws TException {
-    return null;
-  }
-
-  @Override
-  public TSExecuteStatementResp executeRawDataQueryV2(TSRawDataQueryReq req) throws TException {
-    return null;
-  }
-
-  @Override
-  public TSExecuteStatementResp executeLastDataQueryV2(TSLastDataQueryReq req) throws TException {
-    return null;
-  }
-
-  @Override
-  public TSFetchResultsResp fetchResultsV2(TSFetchResultsReq req) throws TException {
-    return null;
-  }
-
-  @Override
-  public TSOpenSessionResp openSession(TSOpenSessionReq req) throws TException {
-    IoTDBConstant.ClientVersion clientVersion = parseClientVersion(req);
-    BasicOpenSessionResp openSessionResp =
-        SESSION_MANAGER.login(
-            SESSION_MANAGER.getCurrSession(),
-            req.username,
-            req.password,
-            req.zoneId,
-            req.client_protocol,
-            clientVersion);
-    TSStatus tsStatus = RpcUtils.getStatus(openSessionResp.getCode(), openSessionResp.getMessage());
-    TSOpenSessionResp resp = new TSOpenSessionResp(tsStatus, CURRENT_RPC_VERSION);
-    return resp.setSessionId(openSessionResp.getSessionId());
-  }
-
-  private IoTDBConstant.ClientVersion parseClientVersion(TSOpenSessionReq req) {
-    Map<String, String> configuration = req.configuration;
-    if (configuration != null && configuration.containsKey("version")) {
-      return IoTDBConstant.ClientVersion.valueOf(configuration.get("version"));
-    }
-    return IoTDBConstant.ClientVersion.V_0_12;
-  }
-
-  @Override
-  public TSStatus closeSession(TSCloseSessionReq req) {
-    throw new UnsupportedOperationException();
-  }
-
-  @Override
-  public TSStatus cancelOperation(TSCancelOperationReq req) {
-    // TODO implement
-    return RpcUtils.getStatus(TSStatusCode.QUERY_NOT_ALLOWED, "Cancellation is not implemented");
-  }
-
-  @Override
-  public TSStatus closeOperation(TSCloseOperationReq req) {
-    throw new UnsupportedOperationException();
-  }
-
-  @Override
-  public TSFetchMetadataResp fetchMetadata(TSFetchMetadataReq req) {
-    TSFetchMetadataResp resp = new TSFetchMetadataResp();
-
-    if (!SESSION_MANAGER.checkLogin(SESSION_MANAGER.getCurrSession())) {
-      return resp.setStatus(getNotLoggedInStatus());
-    }
-
-    TSStatus status;
-    try {
-      switch (req.getType()) {
-        case "METADATA_IN_JSON":
-          resp.setMetadataInJson("{}");
-          status = RpcUtils.getStatus(TSStatusCode.SUCCESS_STATUS);
-          break;
-        case "COLUMN":
-          resp.setDataType(getSeriesTypeByPath(new PartialPath(req.getColumnPath())).toString());
-          status = RpcUtils.getStatus(TSStatusCode.SUCCESS_STATUS);
-          break;
-        case "ALL_COLUMNS":
-          resp.setColumnsList(
-              getPaths(new PartialPath(req.getColumnPath())).stream()
-                  .map(PartialPath::getFullPath)
-                  .collect(Collectors.toList()));
-          status = RpcUtils.getStatus(TSStatusCode.SUCCESS_STATUS);
-          break;
-        default:
-          status = RpcUtils.getStatus(TSStatusCode.METADATA_ERROR, req.getType());
-          break;
-      }
-    } catch (MetadataException e) {
-      LOGGER.error(
-          String.format("Failed to fetch timeseries %s's metadata", req.getColumnPath()), e);
-      status = RpcUtils.getStatus(TSStatusCode.METADATA_ERROR, e.getMessage());
-    } catch (Exception e) {
-      status =
-          onNPEOrUnexpectedException(
-              e, OperationType.FETCH_METADATA, TSStatusCode.INTERNAL_SERVER_ERROR);
-    }
-    return resp.setStatus(status);
-  }
-
-  protected List<MeasurementPath> getPaths(PartialPath path) throws MetadataException {
-    return IoTDB.schemaProcessor.getMeasurementPaths(path);
-  }
-
-  protected TSDataType getSeriesTypeByPath(PartialPath path) throws MetadataException {
-    return IoTDB.schemaProcessor.getSeriesType(path);
-  }
-
-  @Override
-  public TSStatus executeBatchStatement(TSExecuteBatchStatementReq req) {
-    return null;
-  }
-
-  @Override
-  public TSExecuteStatementResp executeStatement(TSExecuteStatementReq req) {
-    String statement = req.getStatement();
-    try {
-      if (!SESSION_MANAGER.checkLogin(SESSION_MANAGER.getCurrSession())) {
-        return RpcUtils.getTSExecuteStatementResp(getNotLoggedInStatus());
-      }
-
-      long startTime = System.currentTimeMillis();
-      PhysicalPlan physicalPlan =
-          serviceProvider
-              .getPlanner()
-              .parseSQLToPhysicalPlan(
-                  statement,
-                  SESSION_MANAGER.getCurrSession().getZoneId(),
-                  SESSION_MANAGER.getCurrSession().getClientVersion());
-
-      if (physicalPlan.isQuery()) {
-        return submitQueryTask(physicalPlan, startTime, req);
-      } else {
-        return executeUpdateStatement(
-            statement,
-            req.statementId,
-            physicalPlan,
-            req.fetchSize,
-            req.timeout,
-            req.getSessionId());
-      }
-    } catch (InterruptedException e) {
-      LOGGER.error(INFO_INTERRUPT_ERROR, req, e);
-      Thread.currentThread().interrupt();
-      return RpcUtils.getTSExecuteStatementResp(
-          onQueryException(e, "\"" + statement + "\". " + OperationType.EXECUTE_STATEMENT));
-    } catch (Exception e) {
-      return RpcUtils.getTSExecuteStatementResp(
-          onQueryException(e, "\"" + statement + "\". " + OperationType.EXECUTE_STATEMENT));
-    }
-  }
-
-  @Override
-  public TSExecuteStatementResp executeQueryStatement(TSExecuteStatementReq req) {
-    try {
-      if (!SESSION_MANAGER.checkLogin(SESSION_MANAGER.getCurrSession())) {
-        return RpcUtils.getTSExecuteStatementResp(getNotLoggedInStatus());
-      }
-
-      long startTime = System.currentTimeMillis();
-      String statement = req.getStatement();
-      PhysicalPlan physicalPlan =
-          serviceProvider
-              .getPlanner()
-              .parseSQLToPhysicalPlan(
-                  statement,
-                  SESSION_MANAGER.getCurrSession().getZoneId(),
-                  SESSION_MANAGER.getCurrSession().getClientVersion());
-
-      if (physicalPlan.isQuery()) {
-        return submitQueryTask(physicalPlan, startTime, req);
-      } else {
-        return RpcUtils.getTSExecuteStatementResp(
-            TSStatusCode.EXECUTE_STATEMENT_ERROR, "Statement is not a query statement.");
-      }
-    } catch (InterruptedException e) {
-      LOGGER.error(INFO_INTERRUPT_ERROR, req, e);
-      Thread.currentThread().interrupt();
-      return RpcUtils.getTSExecuteStatementResp(
-          onQueryException(
-              e, "\"" + req.getStatement() + "\". " + OperationType.EXECUTE_QUERY_STATEMENT));
-    } catch (Exception e) {
-      return RpcUtils.getTSExecuteStatementResp(
-          onQueryException(
-              e, "\"" + req.getStatement() + "\". " + OperationType.EXECUTE_QUERY_STATEMENT));
-    }
-  }
-
-  @Override
-  public TSExecuteStatementResp executeRawDataQuery(TSRawDataQueryReq req) {
-    try {
-      if (!SESSION_MANAGER.checkLogin(SESSION_MANAGER.getCurrSession())) {
-        return RpcUtils.getTSExecuteStatementResp(getNotLoggedInStatus());
-      }
-
-      long startTime = System.currentTimeMillis();
-      PhysicalPlan physicalPlan =
-          serviceProvider
-              .getPlanner()
-              .rawDataQueryReqToPhysicalPlan(
-                  req,
-                  SESSION_MANAGER.getCurrSession().getZoneId(),
-                  SESSION_MANAGER.getCurrSession().getClientVersion());
-
-      if (physicalPlan.isQuery()) {
-        Future<TSExecuteStatementResp> resp =
-            QueryTaskManager.getInstance()
-                .submit(
-                    new QueryTask(
-                        physicalPlan,
-                        startTime,
-                        SESSION_MANAGER.getCurrSession(),
-                        "",
-                        req.statementId,
-                        CONFIG.getQueryTimeoutThreshold(),
-                        req.fetchSize,
-                        req.isJdbcQuery(),
-                        req.enableRedirectQuery));
-        return resp.get();
-      } else {
-        return RpcUtils.getTSExecuteStatementResp(
-            TSStatusCode.EXECUTE_STATEMENT_ERROR, "Statement is not a query statement.");
-      }
-    } catch (InterruptedException e) {
-      LOGGER.error(INFO_INTERRUPT_ERROR, req, e);
-      Thread.currentThread().interrupt();
-      return RpcUtils.getTSExecuteStatementResp(
-          onQueryException(e, OperationType.EXECUTE_RAW_DATA_QUERY));
-    } catch (Exception e) {
-      return RpcUtils.getTSExecuteStatementResp(
-          onQueryException(e, OperationType.EXECUTE_RAW_DATA_QUERY));
-    }
-  }
-
-  @Override
-  public TSExecuteStatementResp executeLastDataQuery(TSLastDataQueryReq req) {
-    try {
-      if (!SESSION_MANAGER.checkLogin(SESSION_MANAGER.getCurrSession())) {
-        return RpcUtils.getTSExecuteStatementResp(getNotLoggedInStatus());
-      }
-
-      long startTime = System.currentTimeMillis();
-      PhysicalPlan physicalPlan =
-          serviceProvider
-              .getPlanner()
-              .lastDataQueryReqToPhysicalPlan(
-                  req,
-                  SESSION_MANAGER.getCurrSession().getZoneId(),
-                  SESSION_MANAGER.getCurrSession().getClientVersion());
-
-      if (physicalPlan.isQuery()) {
-        Future<TSExecuteStatementResp> resp =
-            QueryTaskManager.getInstance()
-                .submit(
-                    new QueryTask(
-                        physicalPlan,
-                        startTime,
-                        SESSION_MANAGER.getCurrSession(),
-                        "",
-                        req.statementId,
-                        CONFIG.getQueryTimeoutThreshold(),
-                        req.fetchSize,
-                        req.isJdbcQuery(),
-                        req.enableRedirectQuery));
-        return resp.get();
-      } else {
-        return RpcUtils.getTSExecuteStatementResp(
-            TSStatusCode.EXECUTE_STATEMENT_ERROR, "Statement is not a query statement.");
-      }
-    } catch (InterruptedException e) {
-      LOGGER.error(INFO_INTERRUPT_ERROR, req, e);
-      Thread.currentThread().interrupt();
-      return RpcUtils.getTSExecuteStatementResp(
-          onQueryException(e, OperationType.EXECUTE_LAST_DATA_QUERY));
-    } catch (Exception e) {
-      return RpcUtils.getTSExecuteStatementResp(
-          onQueryException(e, OperationType.EXECUTE_LAST_DATA_QUERY));
-    }
-  }
-
-  private TSExecuteStatementResp submitQueryTask(
-      PhysicalPlan physicalPlan, long startTime, TSExecuteStatementReq req) throws Exception {
-    TSStatus status =
-        SESSION_MANAGER.checkAuthority(physicalPlan, SESSION_MANAGER.getCurrSession());
-    if (status != null) {
-      return new TSExecuteStatementResp(status);
-    }
-    QueryTask queryTask =
-        new QueryTask(
-            physicalPlan,
-            startTime,
-            SESSION_MANAGER.getCurrSession(),
-            req.statement,
-            req.statementId,
-            req.timeout,
-            req.fetchSize,
-            req.jdbcQuery,
-            req.enableRedirectQuery);
-    TSExecuteStatementResp resp;
-    if (physicalPlan instanceof ShowQueryProcesslistPlan) {
-      resp = queryTask.call();
-    } else {
-      resp = QueryTaskManager.getInstance().submit(queryTask).get();
-    }
-    return resp;
-  }
-
-  private TSExecuteStatementResp executeQueryPlan(
-      QueryPlan plan, QueryContext context, boolean isJdbcQuery, int fetchSize, String username)
-      throws TException, MetadataException, QueryProcessException, StorageEngineException,
-          SQLException, IOException, InterruptedException, QueryFilterOptimizationException,
-          AuthException {
-    // check permissions
-    List<? extends PartialPath> authPaths = plan.getAuthPaths();
-    if (authPaths != null
-        && !authPaths.isEmpty()
-        && !SESSION_MANAGER.checkAuthorization(plan, username)) {
-      return RpcUtils.getTSExecuteStatementResp(
-          RpcUtils.getStatus(
-              TSStatusCode.NO_PERMISSION,
-              "No permissions for this operation, please add privilege "
-                  + OperatorType.values()[
-                      AuthorityChecker.translateToPermissionId(plan.getOperatorType())]));
-    }
-
-    long queryId = context.getQueryId();
-    if (plan.isEnableTracing()) {
-      context.setEnableTracing(true);
-      TRACING_MANAGER.setStartTime(queryId, context.getStartTime(), context.getStatement());
-      TRACING_MANAGER.registerActivity(
-          queryId, TracingConstant.ACTIVITY_PARSE_SQL, System.currentTimeMillis());
-      TRACING_MANAGER.setSeriesPathNum(queryId, plan.getPaths().size());
-    }
-
-    TSExecuteStatementResp resp = null;
-    // execute it before createDataSet since it may change the content of query plan
-    if (!(plan instanceof UDFPlan)) {
-      resp = plan.getTSExecuteStatementResp(isJdbcQuery);
-    }
-    // create and cache dataset
-    QueryDataSet newDataSet = serviceProvider.createQueryDataSet(context, plan, fetchSize);
-
-    if (plan.isEnableTracing()) {
-      TRACING_MANAGER.registerActivity(
-          queryId, TracingConstant.ACTIVITY_CREATE_DATASET, System.currentTimeMillis());
-    }
-
-    if (newDataSet.getEndPoint() != null && plan.isEnableRedirect()) {
-      QueryDataSet.EndPoint endPoint = newDataSet.getEndPoint();
-      return redirectQueryToAnotherNode(resp, context, endPoint.getIp(), endPoint.getPort());
-    }
-
-    if (plan instanceof UDFPlan || plan.isGroupByLevel()) {
-      resp = plan.getTSExecuteStatementResp(isJdbcQuery);
-    }
-
-    if (newDataSet instanceof DirectNonAlignDataSet) {
-      resp.setNonAlignQueryDataSet(fillRpcNonAlignReturnData(fetchSize, newDataSet, username));
-    } else {
-      try {
-        TSQueryDataSet tsQueryDataSet = fillRpcReturnData(fetchSize, newDataSet, username);
-        resp.setQueryDataSet(tsQueryDataSet);
-      } catch (RedirectException e) {
-        if (plan.isEnableRedirect()) {
-          TEndPoint endPoint = e.getEndPoint();
-          return redirectQueryToAnotherNode(resp, context, endPoint.ip, endPoint.port);
-        } else {
-          LOGGER.error(
-              "execute {} error, if session does not support redirect, should not throw redirection exception.",
-              context.getStatement(),
-              e);
-        }
-      }
-    }
-
-    if (plan.isEnableTracing()) {
-      TRACING_MANAGER.registerActivity(
-          queryId, TracingConstant.ACTIVITY_REQUEST_COMPLETE, System.currentTimeMillis());
-      TSTracingInfo tsTracingInfo = fillRpcReturnTracingInfo(queryId);
-      resp.setTracingInfo(tsTracingInfo);
-    }
-    return resp;
-  }
-
-  private TSExecuteStatementResp executeShowOrAuthorPlan(
-      PhysicalPlan plan, QueryContext context, int fetchSize, String username)
-      throws QueryProcessException, TException, StorageEngineException, SQLException, IOException,
-          InterruptedException, QueryFilterOptimizationException, MetadataException, AuthException {
-    // create and cache dataset
-    QueryDataSet newDataSet = serviceProvider.createQueryDataSet(context, plan, fetchSize);
-    TSExecuteStatementResp resp = getListDataSetResp(plan, newDataSet);
-
-    resp.setQueryDataSet(fillRpcReturnData(fetchSize, newDataSet, username));
-    return resp;
-  }
-
-  /**
-   * Construct TSExecuteStatementResp and the header of list result set.
-   *
-   * @param plan maybe ShowPlan or AuthorPlan
-   */
-  private TSExecuteStatementResp getListDataSetResp(PhysicalPlan plan, QueryDataSet dataSet) {
-    TSExecuteStatementResp resp =
-        StaticResps.getNoTimeExecuteResp(
-            dataSet.getPaths().stream().map(Path::getFullPath).collect(Collectors.toList()),
-            dataSet.getDataTypes().stream().map(Enum::toString).collect(Collectors.toList()));
-    if (plan instanceof ShowQueryProcesslistPlan) {
-      resp.setIgnoreTimeStamp(false);
-    }
-    return resp;
-  }
-
-  /** Redirect query */
-  private TSExecuteStatementResp redirectQueryToAnotherNode(
-      TSExecuteStatementResp resp, QueryContext context, String ip, int port) {
-    LOGGER.debug(
-        "need to redirect {} {} to node {}:{}",
-        context.getStatement(),
-        context.getQueryId(),
-        ip,
-        port);
-    TSStatus status = new TSStatus();
-    status.setRedirectNode(new TEndPoint(ip, port));
-    status.setCode(TSStatusCode.REDIRECTION_RECOMMEND.getStatusCode());
-    resp.setStatus(status);
-    resp.setQueryId(context.getQueryId());
-    return resp;
-  }
-
-  @SuppressWarnings("squid:S3776") // Suppress high Cognitive Complexity warning
-  @Override
-  public TSFetchResultsResp fetchResults(TSFetchResultsReq req) {
-    try {
-      if (!SESSION_MANAGER.checkLogin(SESSION_MANAGER.getCurrSession())) {
-        return RpcUtils.getTSFetchResultsResp(getNotLoggedInStatus());
-      }
-
-      Future<TSFetchResultsResp> resp =
-          QueryTaskManager.getInstance()
-              .submit(
-                  new FetchResultsTask(
-                      SESSION_MANAGER.getCurrSession(), req.queryId, req.fetchSize, req.isAlign));
-      return resp.get();
-    } catch (InterruptedException e) {
-      LOGGER.error(INFO_INTERRUPT_ERROR, req, e);
-      Thread.currentThread().interrupt();
-      return RpcUtils.getTSFetchResultsResp(onQueryException(e, OperationType.FETCH_RESULTS));
-    } catch (Exception e) {
-      return RpcUtils.getTSFetchResultsResp(onQueryException(e, OperationType.FETCH_RESULTS));
-    }
-  }
-
-  private TSQueryDataSet fillRpcReturnData(
-      int fetchSize, QueryDataSet queryDataSet, String userName)
-      throws TException, AuthException, IOException, InterruptedException, QueryProcessException {
-    WatermarkEncoder encoder = getWatermarkEncoder(userName);
-    return queryDataSet instanceof DirectAlignByTimeDataSet
-        ? ((DirectAlignByTimeDataSet) queryDataSet).fillBuffer(fetchSize, encoder)
-        : QueryDataSetUtils.convertQueryDataSetByFetchSize(queryDataSet, fetchSize, encoder);
-  }
-
-  private TSQueryNonAlignDataSet fillRpcNonAlignReturnData(
-      int fetchSize, QueryDataSet queryDataSet, String userName)
-      throws TException, AuthException, IOException, QueryProcessException, InterruptedException {
-    WatermarkEncoder encoder = getWatermarkEncoder(userName);
-    return ((DirectNonAlignDataSet) queryDataSet).fillBuffer(fetchSize, encoder);
-  }
-
-  private TSTracingInfo fillRpcReturnTracingInfo(long queryId) {
-    return TRACING_MANAGER.fillRpcReturnTracingInfo(queryId);
-  }
-
-  private WatermarkEncoder getWatermarkEncoder(String userName) throws TException, AuthException {
-    IAuthorizer authorizer = AuthorizerManager.getInstance();
-
-    WatermarkEncoder encoder = null;
-    if (CONFIG.isEnableWatermark() && authorizer.isUserUseWaterMark(userName)) {
-      if (CONFIG.getWatermarkMethodName().equals(IoTDBConfig.WATERMARK_GROUPED_LSB)) {
-        encoder = new GroupedLSBWatermarkEncoder(CONFIG);
-      } else {
-        throw new UnSupportedDataTypeException(
-            String.format(
-                "Watermark method is not supported yet: %s", CONFIG.getWatermarkMethodName()));
-      }
-    }
-    return encoder;
-  }
-
-  /** update statement can be: 1. select-into statement 2. non-query statement */
-  @Override
-  public TSExecuteStatementResp executeUpdateStatement(TSExecuteStatementReq req) {
-    if (!SESSION_MANAGER.checkLogin(SESSION_MANAGER.getCurrSession())) {
-      return RpcUtils.getTSExecuteStatementResp(getNotLoggedInStatus());
-    }
-
-    try {
-      PhysicalPlan physicalPlan =
-          serviceProvider
-              .getPlanner()
-              .parseSQLToPhysicalPlan(
-                  req.statement,
-                  SESSION_MANAGER.getCurrSession().getZoneId(),
-                  SESSION_MANAGER.getCurrSession().getClientVersion());
-      return physicalPlan.isQuery()
-          ? RpcUtils.getTSExecuteStatementResp(
-              TSStatusCode.EXECUTE_STATEMENT_ERROR, "Statement is a query statement.")
-          : executeUpdateStatement(
-              req.statement,
-              req.statementId,
-              physicalPlan,
-              req.fetchSize,
-              req.timeout,
-              req.getSessionId());
-    } catch (InterruptedException e) {
-      LOGGER.error(INFO_INTERRUPT_ERROR, req, e);
-      Thread.currentThread().interrupt();
-      return RpcUtils.getTSExecuteStatementResp(
-          onQueryException(
-              e, "\"" + req.statement + "\". " + OperationType.EXECUTE_UPDATE_STATEMENT));
-    } catch (Exception e) {
-      return RpcUtils.getTSExecuteStatementResp(
-          onQueryException(
-              e, "\"" + req.statement + "\". " + OperationType.EXECUTE_UPDATE_STATEMENT));
-    }
-  }
-
-  /** update statement can be: 1. select-into statement 2. non-query statement */
-  private TSExecuteStatementResp executeUpdateStatement(
-      String statement,
-      long statementId,
-      PhysicalPlan plan,
-      int fetchSize,
-      long timeout,
-      long sessionId)
-      throws InterruptedException {
-    throw new UnsupportedOperationException();
-  }
-
-  private TSExecuteStatementResp executeNonQueryStatement(PhysicalPlan plan) {
-    TSStatus status = SESSION_MANAGER.checkAuthority(plan, SESSION_MANAGER.getCurrSession());
-    return status != null
-        ? new TSExecuteStatementResp(status)
-        : RpcUtils.getTSExecuteStatementResp(executeNonQueryPlan(plan))
-            .setQueryId(SESSION_MANAGER.requestQueryId());
-  }
-
-  @Override
-  public void handleClientExit() {
-    IClientSession session = SESSION_MANAGER.getCurrSession();
-    if (session != null) {
-      TSCloseSessionReq req = new TSCloseSessionReq();
-      closeSession(req);
-    }
-    SyncService.getInstance().handleClientExit();
-  }
-
-  @Override
-  public TSGetTimeZoneResp getTimeZone(long sessionId) {
-    try {
-      ZoneId zoneId = SESSION_MANAGER.getCurrSession().getZoneId();
-      return new TSGetTimeZoneResp(
-          RpcUtils.getStatus(TSStatusCode.SUCCESS_STATUS),
-          zoneId != null ? zoneId.toString() : "Unknown time zone");
-    } catch (Exception e) {
-      return new TSGetTimeZoneResp(
-          onNPEOrUnexpectedException(
-              e, OperationType.GET_TIME_ZONE, TSStatusCode.GENERATE_TIME_ZONE_ERROR),
-          "Unknown time zone");
-    }
-  }
-
-  @Override
-  public TSStatus setTimeZone(TSSetTimeZoneReq req) {
-    try {
-      SESSION_MANAGER.getCurrSession().setZoneId(ZoneId.of(req.timeZone));
-      return RpcUtils.getStatus(TSStatusCode.SUCCESS_STATUS);
-    } catch (Exception e) {
-      return onNPEOrUnexpectedException(
-          e, OperationType.SET_TIME_ZONE, TSStatusCode.SET_TIME_ZONE_ERROR);
-    }
-  }
-
-  @Override
-  public ServerProperties getProperties() {
-    ServerProperties properties = new ServerProperties();
-    properties.setVersion(IoTDBConstant.VERSION);
-    properties.setBuildInfo(IoTDBConstant.BUILD_INFO);
-    LOGGER.info("IoTDB server version: {}", IoTDBConstant.VERSION_WITH_BUILD);
-    properties.setSupportedTimeAggregationOperations(new ArrayList<>());
-    properties.getSupportedTimeAggregationOperations().add(IoTDBConstant.MAX_TIME);
-    properties.getSupportedTimeAggregationOperations().add(IoTDBConstant.MIN_TIME);
-    properties.setTimestampPrecision(
-        IoTDBDescriptor.getInstance().getConfig().getTimestampPrecision());
-    properties.setMaxConcurrentClientNum(
-        IoTDBDescriptor.getInstance().getConfig().getRpcMaxConcurrentClientNum());
-    properties.setWatermarkSecretKey(
-        IoTDBDescriptor.getInstance().getConfig().getWatermarkSecretKey());
-    properties.setWatermarkBitString(
-        IoTDBDescriptor.getInstance().getConfig().getWatermarkBitString());
-    properties.setWatermarkParamMarkRate(
-        IoTDBDescriptor.getInstance().getConfig().getWatermarkParamMarkRate());
-    properties.setWatermarkParamMaxRightBit(
-        IoTDBDescriptor.getInstance().getConfig().getWatermarkParamMaxRightBit());
-    properties.setIsReadOnly(CommonDescriptor.getInstance().getConfig().isReadOnly());
-    properties.setThriftMaxFrameSize(
-        IoTDBDescriptor.getInstance().getConfig().getThriftMaxFrameSize());
-    return properties;
-  }
-
-  @Override
-  public TSStatus insertRecords(TSInsertRecordsReq req) {
-    return null;
-  }
-
-  private TSStatus judgeFinalTsStatus(
-      boolean allCheckSuccess,
-      TSStatus executeTsStatus,
-      Map<Integer, TSStatus> checkTsStatus,
-      int totalRowCount) {
-
-    if (allCheckSuccess) {
-      return executeTsStatus;
-    }
-
-    if (executeTsStatus.subStatus == null) {
-      TSStatus[] tmpSubTsStatus = new TSStatus[totalRowCount];
-      Arrays.fill(tmpSubTsStatus, RpcUtils.SUCCESS_STATUS);
-      executeTsStatus.subStatus = Arrays.asList(tmpSubTsStatus);
-    }
-    for (Entry<Integer, TSStatus> entry : checkTsStatus.entrySet()) {
-      executeTsStatus.subStatus.set(entry.getKey(), entry.getValue());
-    }
-    return RpcUtils.getStatus(executeTsStatus.subStatus);
-  }
-
-  @Override
-  public TSStatus insertRecordsOfOneDevice(TSInsertRecordsOfOneDeviceReq req) {
-    return null;
-  }
-
-  @Override
-  public TSStatus insertStringRecordsOfOneDevice(TSInsertStringRecordsOfOneDeviceReq req) {
-    return null;
-  }
-
-  @Override
-  public TSStatus insertStringRecords(TSInsertStringRecordsReq req) {
-    return null;
-  }
-
-  @Override
-  public TSStatus testInsertTablet(TSInsertTabletReq req) {
-    LOGGER.debug("Test insert batch request receive.");
-    return RpcUtils.getStatus(TSStatusCode.SUCCESS_STATUS);
-  }
-
-  @Override
-  public TSStatus testInsertTablets(TSInsertTabletsReq req) {
-    LOGGER.debug("Test insert batch request receive.");
-    return RpcUtils.getStatus(TSStatusCode.SUCCESS_STATUS);
-  }
-
-  @Override
-  public TSStatus testInsertRecord(TSInsertRecordReq req) {
-    LOGGER.debug("Test insert row request receive.");
-    return RpcUtils.getStatus(TSStatusCode.SUCCESS_STATUS);
-  }
-
-  @Override
-  public TSStatus testInsertStringRecord(TSInsertStringRecordReq req) {
-    LOGGER.debug("Test insert string record request receive.");
-    return RpcUtils.getStatus(TSStatusCode.SUCCESS_STATUS);
-  }
-
-  @Override
-  public TSStatus testInsertRecords(TSInsertRecordsReq req) {
-    LOGGER.debug("Test insert row in batch request receive.");
-    return RpcUtils.getStatus(TSStatusCode.SUCCESS_STATUS);
-  }
-
-  @Override
-  public TSStatus testInsertRecordsOfOneDevice(TSInsertRecordsOfOneDeviceReq req) {
-    LOGGER.debug("Test insert rows in batch request receive.");
-    return RpcUtils.getStatus(TSStatusCode.SUCCESS_STATUS);
-  }
-
-  @Override
-  public TSStatus testInsertStringRecords(TSInsertStringRecordsReq req) {
-    LOGGER.debug("Test insert string records request receive.");
-    return RpcUtils.getStatus(TSStatusCode.SUCCESS_STATUS);
-  }
-
-  @Override
-  public TSStatus insertRecord(TSInsertRecordReq req) {
-    return null;
-  }
-
-  @Override
-  public TSStatus insertStringRecord(TSInsertStringRecordReq req) {
-    return null;
-  }
-
-  @Override
-  public TSStatus deleteData(TSDeleteDataReq req) {
-    return null;
-  }
-
-  @Override
-  public TSStatus insertTablet(TSInsertTabletReq req) {
-    return null;
-  }
-
-  @Override
-  public TSStatus insertTablets(TSInsertTabletsReq req) {
-    long t1 = System.currentTimeMillis();
-    try {
-      if (!SESSION_MANAGER.checkLogin(SESSION_MANAGER.getCurrSession())) {
-        return getNotLoggedInStatus();
-      }
-
-      return insertTabletsInternally(req);
-    } catch (IoTDBException e) {
-      return onIoTDBException(e, OperationType.INSERT_TABLETS, e.getErrorCode());
-    } catch (NullPointerException e) {
-      LOGGER.error("{}: error occurs when insertTablets", IoTDBConstant.GLOBAL_DB_NAME, e);
-      return RpcUtils.getStatus(TSStatusCode.EXECUTE_STATEMENT_ERROR);
-    } catch (Exception e) {
-      return onNPEOrUnexpectedException(
-          e, OperationType.INSERT_TABLETS, TSStatusCode.EXECUTE_STATEMENT_ERROR);
-    } finally {
-      addOperationLatency(Operation.EXECUTE_RPC_BATCH_INSERT, t1);
-    }
-  }
-
-  public TSStatus insertTabletsInternally(TSInsertTabletsReq req) throws MetadataException {
-    return null;
-  }
-
-  @Override
-  public TSStatus setStorageGroup(long sessionId, String storageGroup) {
-    try {
-      if (!SESSION_MANAGER.checkLogin(SESSION_MANAGER.getCurrSession())) {
-        return getNotLoggedInStatus();
-      }
-
-      SetStorageGroupPlan plan = new SetStorageGroupPlan(new PartialPath(storageGroup));
-      TSStatus status = SESSION_MANAGER.checkAuthority(plan, SESSION_MANAGER.getCurrSession());
-
-      return status != null ? status : executeNonQueryPlan(plan);
-    } catch (IoTDBException e) {
-      return onIoTDBException(e, OperationType.SET_STORAGE_GROUP, e.getErrorCode());
-    } catch (Exception e) {
-      return onNPEOrUnexpectedException(
-          e, OperationType.SET_STORAGE_GROUP, TSStatusCode.EXECUTE_STATEMENT_ERROR);
-    }
-  }
-
-  @Override
-  public TSStatus deleteStorageGroups(long sessionId, List<String> storageGroups) {
-    try {
-      if (!SESSION_MANAGER.checkLogin(SESSION_MANAGER.getCurrSession())) {
-        return getNotLoggedInStatus();
-      }
-
-      List<PartialPath> storageGroupList = new ArrayList<>();
-      for (String storageGroup : storageGroups) {
-        storageGroupList.add(new PartialPath(storageGroup));
-      }
-      DeleteStorageGroupPlan plan = new DeleteStorageGroupPlan(storageGroupList);
-      TSStatus status = SESSION_MANAGER.checkAuthority(plan, SESSION_MANAGER.getCurrSession());
-      return status != null ? status : executeNonQueryPlan(plan);
-    } catch (IoTDBException e) {
-      return onIoTDBException(e, OperationType.DELETE_STORAGE_GROUPS, e.getErrorCode());
-    } catch (Exception e) {
-      return onNPEOrUnexpectedException(
-          e, OperationType.DELETE_STORAGE_GROUPS, TSStatusCode.EXECUTE_STATEMENT_ERROR);
-    }
-  }
-
-  @Override
-  public TSStatus createTimeseries(TSCreateTimeseriesReq req) {
-    try {
-      if (!SESSION_MANAGER.checkLogin(SESSION_MANAGER.getCurrSession())) {
-        return getNotLoggedInStatus();
-      }
-
-      if (AUDIT_LOGGER.isDebugEnabled()) {
-        AUDIT_LOGGER.debug(
-            "Session-{} create timeseries {}", SESSION_MANAGER.getCurrSession(), req.getPath());
-      }
-
-      // measurementAlias is also a nodeName
-      PathUtils.checkIsLegalSingleMeasurementsAndUpdate(
-          Collections.singletonList(req.getMeasurementAlias()));
-      CreateTimeSeriesPlan plan =
-          new CreateTimeSeriesPlan(
-              new PartialPath(req.path),
-              TSDataType.values()[req.dataType],
-              TSEncoding.values()[req.encoding],
-              CompressionType.deserialize((byte) req.compressor),
-              req.props,
-              req.tags,
-              req.attributes,
-              req.measurementAlias);
-      TSStatus status = SESSION_MANAGER.checkAuthority(plan, SESSION_MANAGER.getCurrSession());
-      return status != null ? status : executeNonQueryPlan(plan);
-    } catch (IoTDBException e) {
-      return onIoTDBException(e, OperationType.CREATE_TIMESERIES, e.getErrorCode());
-    } catch (Exception e) {
-      return onNPEOrUnexpectedException(
-          e, OperationType.CREATE_TIMESERIES, TSStatusCode.EXECUTE_STATEMENT_ERROR);
-    }
-  }
-
-  @Override
-  public TSStatus createAlignedTimeseries(TSCreateAlignedTimeseriesReq req) {
-    try {
-      if (!SESSION_MANAGER.checkLogin(SESSION_MANAGER.getCurrSession())) {
-        return getNotLoggedInStatus();
-      }
-
-      // check whether measurement is legal according to syntax convention
-
-      PathUtils.checkIsLegalSingleMeasurementsAndUpdate(req.getMeasurements());
-
-      PathUtils.checkIsLegalSingleMeasurementsAndUpdate(req.getMeasurementAlias());
-
-      if (AUDIT_LOGGER.isDebugEnabled()) {
-        AUDIT_LOGGER.debug(
-            "Session-{} create aligned timeseries {}.{}",
-            SESSION_MANAGER.getCurrSession(),
-            req.getPrefixPath(),
-            req.getMeasurements());
-      }
-
-      List<TSDataType> dataTypes = new ArrayList<>();
-      for (int dataType : req.dataTypes) {
-        dataTypes.add(TSDataType.values()[dataType]);
-      }
-      List<TSEncoding> encodings = new ArrayList<>();
-      for (int encoding : req.encodings) {
-        encodings.add(TSEncoding.values()[encoding]);
-      }
-      List<CompressionType> compressors = new ArrayList<>();
-      for (int compressor : req.compressors) {
-        compressors.add(CompressionType.deserialize((byte) compressor));
-      }
-
-      CreateAlignedTimeSeriesPlan plan =
-          new CreateAlignedTimeSeriesPlan(
-              new PartialPath(req.prefixPath),
-              req.measurements,
-              dataTypes,
-              encodings,
-              compressors,
-              req.measurementAlias,
-              req.tagsList,
-              req.attributesList);
-      TSStatus status = SESSION_MANAGER.checkAuthority(plan, SESSION_MANAGER.getCurrSession());
-      return status != null ? status : executeNonQueryPlan(plan);
-    } catch (IoTDBException e) {
-      return onIoTDBException(e, OperationType.CREATE_ALIGNED_TIMESERIES, e.getErrorCode());
-    } catch (Exception e) {
-      return onNPEOrUnexpectedException(
-          e, OperationType.CREATE_ALIGNED_TIMESERIES, TSStatusCode.EXECUTE_STATEMENT_ERROR);
-    }
-  }
-
-  @SuppressWarnings("squid:S3776") // Suppress high Cognitive Complexity warning
-  @Override
-  public TSStatus createMultiTimeseries(TSCreateMultiTimeseriesReq req) {
-    try {
-      if (!SESSION_MANAGER.checkLogin(SESSION_MANAGER.getCurrSession())) {
-        return getNotLoggedInStatus();
-      }
-
-      if (AUDIT_LOGGER.isDebugEnabled()) {
-        AUDIT_LOGGER.debug(
-            "Session-{} create {} timeseries, the first is {}",
-            SESSION_MANAGER.getCurrSession(),
-            req.getPaths().size(),
-            req.getPaths().get(0));
-      }
-
-      // measurementAlias is also a nodeName
-
-      PathUtils.checkIsLegalSingleMeasurementsAndUpdate(req.measurementAliasList);
-
-      CreateMultiTimeSeriesPlan multiPlan = new CreateMultiTimeSeriesPlan();
-      List<PartialPath> paths = new ArrayList<>(req.paths.size());
-      List<TSDataType> dataTypes = new ArrayList<>(req.dataTypes.size());
-      List<TSEncoding> encodings = new ArrayList<>(req.dataTypes.size());
-      List<CompressionType> compressors = new ArrayList<>(req.paths.size());
-      List<String> alias = null;
-      if (req.measurementAliasList != null) {
-        alias = new ArrayList<>(req.paths.size());
-      }
-      List<Map<String, String>> props = null;
-      if (req.propsList != null) {
-        props = new ArrayList<>(req.paths.size());
-      }
-      List<Map<String, String>> tags = null;
-      if (req.tagsList != null) {
-        tags = new ArrayList<>(req.paths.size());
-      }
-      List<Map<String, String>> attributes = null;
-      if (req.attributesList != null) {
-        attributes = new ArrayList<>(req.paths.size());
-      }
-
-      // for authority check
-      CreateTimeSeriesPlan plan = new CreateTimeSeriesPlan();
-      for (int i = 0; i < req.paths.size(); i++) {
-        plan.setPath(new PartialPath(req.paths.get(i)));
-        TSStatus status = SESSION_MANAGER.checkAuthority(plan, SESSION_MANAGER.getCurrSession());
-        if (status != null) {
-          // not authorized
-          multiPlan.getResults().put(i, status);
-        }
-
-        paths.add(new PartialPath(req.paths.get(i)));
-        compressors.add(CompressionType.deserialize(req.compressors.get(i).byteValue()));
-        if (alias != null) {
-          alias.add(req.measurementAliasList.get(i));
-        }
-        if (props != null) {
-          props.add(req.propsList.get(i));
-        }
-        if (tags != null) {
-          tags.add(req.tagsList.get(i));
-        }
-        if (attributes != null) {
-          attributes.add(req.attributesList.get(i));
-        }
-      }
-      for (int i = 0; i < req.dataTypes.size(); i++) {
-        dataTypes.add(TSDataType.values()[req.dataTypes.get(i)]);
-        encodings.add(TSEncoding.values()[req.encodings.get(i)]);
-      }
-
-      multiPlan.setPaths(paths);
-      multiPlan.setDataTypes(dataTypes);
-      multiPlan.setEncodings(encodings);
-      multiPlan.setCompressors(compressors);
-      multiPlan.setAlias(alias);
-      multiPlan.setProps(props);
-      multiPlan.setTags(tags);
-      multiPlan.setAttributes(attributes);
-      multiPlan.setIndexes(new ArrayList<>());
-
-      return executeNonQueryPlan(multiPlan);
-    } catch (IoTDBException e) {
-      return onIoTDBException(e, OperationType.CREATE_MULTI_TIMESERIES, e.getErrorCode());
-    } catch (Exception e) {
-      LOGGER.error("creating multi timeseries fails", e);
-      return onNPEOrUnexpectedException(
-          e, OperationType.CREATE_MULTI_TIMESERIES, TSStatusCode.EXECUTE_STATEMENT_ERROR);
-    }
-  }
-
-  @Override
-  public TSStatus deleteTimeseries(long sessionId, List<String> paths) {
-    try {
-      if (!SESSION_MANAGER.checkLogin(SESSION_MANAGER.getCurrSession())) {
-        return getNotLoggedInStatus();
-      }
-
-      List<PartialPath> pathList = new ArrayList<>();
-      for (String path : paths) {
-        pathList.add(new PartialPath(path));
-      }
-      DeleteTimeSeriesPlan plan = new DeleteTimeSeriesPlan(pathList);
-      TSStatus status = SESSION_MANAGER.checkAuthority(plan, SESSION_MANAGER.getCurrSession());
-      return status != null ? status : executeNonQueryPlan(plan);
-    } catch (IoTDBException e) {
-      return onIoTDBException(e, OperationType.DELETE_TIMESERIES, e.getErrorCode());
-    } catch (Exception e) {
-      return onNPEOrUnexpectedException(
-          e, OperationType.DELETE_TIMESERIES, TSStatusCode.EXECUTE_STATEMENT_ERROR);
-    }
-  }
-
-  @Override
-  public long requestStatementId(long sessionId) {
-    return SESSION_MANAGER.requestStatementId(SESSION_MANAGER.getCurrSession());
-  }
-
-  @Override
-  public TSStatus createSchemaTemplate(TSCreateSchemaTemplateReq req) throws TException {
-    try {
-      if (!SESSION_MANAGER.checkLogin(SESSION_MANAGER.getCurrSession())) {
-        return getNotLoggedInStatus();
-      }
-
-      if (AUDIT_LOGGER.isDebugEnabled()) {
-        AUDIT_LOGGER.debug(
-            "Session-{} create schema template {}",
-            SESSION_MANAGER.getCurrSession(),
-            req.getName());
-      }
-
-      CreateTemplatePlan plan;
-      // Construct plan from serialized request
-      ByteBuffer buffer = ByteBuffer.wrap(req.getSerializedTemplate());
-      plan = CreateTemplatePlan.deserializeFromReq(buffer);
-      // check whether measurement is legal according to syntax convention
-      PathUtils.checkIsLegalMeasurementListsAndUpdate(plan.getMeasurements());
-      TSStatus status = SESSION_MANAGER.checkAuthority(plan, SESSION_MANAGER.getCurrSession());
-
-      return status != null ? status : executeNonQueryPlan(plan);
-    } catch (IoTDBException e) {
-      return onIoTDBException(e, OperationType.CREATE_SCHEMA_TEMPLATE, e.getErrorCode());
-    } catch (Exception e) {
-      return onNPEOrUnexpectedException(
-          e, OperationType.CREATE_SCHEMA_TEMPLATE, TSStatusCode.EXECUTE_STATEMENT_ERROR);
-    }
-  }
-
-  @Override
-  public TSStatus appendSchemaTemplate(TSAppendSchemaTemplateReq req) {
-    try {
-      // check whether measurement is legal according to syntax convention
-      PathUtils.checkIsLegalMeasurementsAndUpdate(req.getMeasurements());
-    } catch (IoTDBException e) {
-      onIoTDBException(e, OperationType.EXECUTE_NON_QUERY_PLAN, e.getErrorCode());
-    }
-
-    int size = req.getMeasurementsSize();
-    String[] measurements = new String[size];
-    TSDataType[] dataTypes = new TSDataType[size];
-    TSEncoding[] encodings = new TSEncoding[size];
-    CompressionType[] compressionTypes = new CompressionType[size];
-
-    for (int i = 0; i < req.getDataTypesSize(); i++) {
-      measurements[i] = req.getMeasurements().get(i);
-      dataTypes[i] = TSDataType.values()[req.getDataTypes().get(i)];
-      encodings[i] = TSEncoding.values()[req.getEncodings().get(i)];
-      compressionTypes[i] = CompressionType.deserialize(req.getCompressors().get(i).byteValue());
-    }
-
-    AppendTemplatePlan plan =
-        new AppendTemplatePlan(
-            req.getName(), req.isAligned, measurements, dataTypes, encodings, compressionTypes);
-    TSStatus status = SESSION_MANAGER.checkAuthority(plan, SESSION_MANAGER.getCurrSession());
-    return status != null ? status : executeNonQueryPlan(plan);
-  }
-
-  @Override
-  public TSStatus pruneSchemaTemplate(TSPruneSchemaTemplateReq req) {
-    PruneTemplatePlan plan =
-        new PruneTemplatePlan(req.getName(), Collections.singletonList(req.getPath()));
-    TSStatus status = SESSION_MANAGER.checkAuthority(plan, SESSION_MANAGER.getCurrSession());
-    return status != null ? status : executeNonQueryPlan(plan);
-  }
-
-  @Override
-  public TSQueryTemplateResp querySchemaTemplate(TSQueryTemplateReq req) {
-    return null;
-  }
-
-  @Override
-  public TSStatus setSchemaTemplate(TSSetSchemaTemplateReq req) throws TException {
-    if (!SESSION_MANAGER.checkLogin(SESSION_MANAGER.getCurrSession())) {
-      return getNotLoggedInStatus();
-    }
-
-    if (AUDIT_LOGGER.isDebugEnabled()) {
-      AUDIT_LOGGER.debug(
-          "Session-{} set device template {}.{}",
-          SESSION_MANAGER.getCurrSession(),
-          req.getTemplateName(),
-          req.getPrefixPath());
-    }
-
-    try {
-      SetTemplatePlan plan = new SetTemplatePlan(req.templateName, req.prefixPath);
-      TSStatus status = SESSION_MANAGER.checkAuthority(plan, SESSION_MANAGER.getCurrSession());
-      return status != null ? status : executeNonQueryPlan(plan);
-    } catch (IllegalPathException e) {
-      return onIoTDBException(e, OperationType.EXECUTE_STATEMENT, e.getErrorCode());
-    }
-  }
-
-  @Override
-  public TSStatus unsetSchemaTemplate(TSUnsetSchemaTemplateReq req) throws TException {
-    if (!SESSION_MANAGER.checkLogin(SESSION_MANAGER.getCurrSession())) {
-      return getNotLoggedInStatus();
-    }
-
-    if (AUDIT_LOGGER.isDebugEnabled()) {
-      AUDIT_LOGGER.debug(
-          "Session-{} unset schema template {}.{}",
-          SESSION_MANAGER.getCurrSession(),
-          req.getPrefixPath(),
-          req.getTemplateName());
-    }
-
-    try {
-      UnsetTemplatePlan plan = new UnsetTemplatePlan(req.prefixPath, req.templateName);
-      TSStatus status = SESSION_MANAGER.checkAuthority(plan, SESSION_MANAGER.getCurrSession());
-      return status != null ? status : executeNonQueryPlan(plan);
-    } catch (IllegalPathException e) {
-      return onIoTDBException(e, OperationType.EXECUTE_STATEMENT, e.getErrorCode());
-    }
-  }
-
-  @Override
-  public TSStatus dropSchemaTemplate(TSDropSchemaTemplateReq req) throws TException {
-    if (!SESSION_MANAGER.checkLogin(SESSION_MANAGER.getCurrSession())) {
-      return getNotLoggedInStatus();
-    }
-
-    if (AUDIT_LOGGER.isDebugEnabled()) {
-      AUDIT_LOGGER.debug(
-          "Session-{} drop schema template {}.",
-          SESSION_MANAGER.getCurrSession(),
-          req.getTemplateName());
-    }
-
-    DropTemplatePlan plan = new DropTemplatePlan(req.templateName);
-    TSStatus status = SESSION_MANAGER.checkAuthority(plan, SESSION_MANAGER.getCurrSession());
-    return status != null ? status : executeNonQueryPlan(plan);
-  }
-
-  @Override
-  public TSStatus handshake(TSyncIdentityInfo info) throws TException {
-    return SyncService.getInstance()
-        .handshake(info, SESSION_MANAGER.getCurrSession().getClientAddress(), null, null);
-  }
-
-  @Override
-  public TSStatus sendPipeData(ByteBuffer buff) throws TException {
-    return SyncService.getInstance().transportPipeData(buff);
-  }
-
-  @Override
-  public TSStatus sendFile(TSyncTransportMetaInfo metaInfo, ByteBuffer buff) throws TException {
-    return SyncService.getInstance().transportFile(metaInfo, buff);
-  }
-
-  @Override
-  public TSBackupConfigurationResp getBackupConfiguration() {
-    return new TSBackupConfigurationResp(RpcUtils.getStatus(TSStatusCode.SUCCESS_STATUS));
-  }
-
-  @Override
-  public TSConnectionInfoResp fetchAllConnectionsInfo() {
-    throw new UnsupportedOperationException();
-  }
-
-  protected TSStatus executeNonQueryPlan(PhysicalPlan plan) {
-    try {
-      return serviceProvider.executeNonQuery(plan)
-          ? RpcUtils.getStatus(TSStatusCode.SUCCESS_STATUS, "Execute successfully")
-          : RpcUtils.getStatus(TSStatusCode.EXECUTE_STATEMENT_ERROR);
-    } catch (Exception e) {
-      return onNonQueryException(e, OperationType.EXECUTE_NON_QUERY_PLAN);
-    }
-  }
-
-  private TSStatus getNotLoggedInStatus() {
-    return RpcUtils.getStatus(
-        TSStatusCode.NOT_LOGIN,
-        "Log in failed. Either you are not authorized or the session has timed out.");
-  }
-
-  /** Add stat of operation into metrics */
-  private void addOperationLatency(Operation operation, long startTime) {
-    MetricService.getInstance()
-        .histogram(
-            System.currentTimeMillis() - startTime,
-            Metric.OPERATION.toString(),
-            MetricLevel.IMPORTANT,
-            "name",
-            operation.getName());
-  }
-}