You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by er...@apache.org on 2022/04/22 07:28:04 UTC

[iotdb] 01/01: Cluster Test refine: Demo 1

This is an automated email from the ASF dual-hosted git repository.

ericpai pushed a commit to branch feature/it-demo
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit 7d5490b9481dfc4232dfc7b9d58477b9ecbad15c
Author: ericpai <er...@hotmail.com>
AuthorDate: Fri Apr 22 15:26:14 2022 +0800

    Cluster Test refine: Demo 1
---
 checkstyle.xml                                     |  24 +-
 .../iotdb/cluster/server/ClusterTSServiceImpl.java |   7 +
 .../iotdb/integration/env/ClusterEnvBase.java      |  30 +-
 .../itbase/runtime/ClusterTestConnection.java      | 300 +++++++
 .../iotdb/itbase/runtime/ClusterTestResultSet.java | 865 +++++++++++++++++++++
 .../iotdb/itbase/runtime/ClusterTestStatement.java | 284 +++++++
 .../aggregation/IoTDBAggregationIT.java            |  34 +-
 .../db/service/thrift/impl/TSServiceImpl.java      | 219 +++---
 8 files changed, 1616 insertions(+), 147 deletions(-)

diff --git a/checkstyle.xml b/checkstyle.xml
index 8dd797112f..4d4eb175a9 100644
--- a/checkstyle.xml
+++ b/checkstyle.xml
@@ -1,5 +1,4 @@
-<?xml version="1.0" encoding="UTF-8"?><!DOCTYPE module PUBLIC "-//Checkstyle//DTD Checkstyle Configuration 1.3//EN"
-        "https://checkstyle.org/dtds/configuration_1_3.dtd">
+<?xml version="1.0" encoding="UTF-8"?><!DOCTYPE module PUBLIC "-//Checkstyle//DTD Checkstyle Configuration 1.3//EN" "https://checkstyle.org/dtds/configuration_1_3.dtd">
 
 <!--
 
@@ -45,10 +44,8 @@
         <module name="OuterTypeFilename"/>
         <module name="IllegalTokenText">
             <property name="tokens" value="STRING_LITERAL, CHAR_LITERAL"/>
-            <property name="format"
-                      value="\\u00(09|0(a|A)|0(c|C)|0(d|D)|22|27|5(C|c))|\\(0(10|11|12|14|15|42|47)|134)"/>
-            <property name="message"
-                      value="Consider using special escape sequence instead of octal value or Unicode escaped value."/>
+            <property name="format" value="\\u00(09|0(a|A)|0(c|C)|0(d|D)|22|27|5(C|c))|\\(0(10|11|12|14|15|42|47)|134)"/>
+            <property name="message" value="Consider using special escape sequence instead of octal value or Unicode escaped value."/>
         </module>
         <module name="AvoidEscapedUnicodeCharacters">
             <property name="allowEscapesForControlCharacters" value="true"/>
@@ -72,22 +69,19 @@
         <module name="LeftCurly"/>
         <module name="RightCurly">
             <property name="id" value="RightCurlySame"/>
-            <property name="tokens"
-                      value="LITERAL_TRY, LITERAL_CATCH, LITERAL_FINALLY, LITERAL_IF, LITERAL_ELSE,                     LITERAL_DO"/>
+            <property name="tokens" value="LITERAL_TRY, LITERAL_CATCH, LITERAL_FINALLY, LITERAL_IF, LITERAL_ELSE,                     LITERAL_DO"/>
         </module>
         <module name="RightCurly">
             <property name="id" value="RightCurlyAlone"/>
             <property name="option" value="alone"/>
-            <property name="tokens"
-                      value="CLASS_DEF, METHOD_DEF, CTOR_DEF, LITERAL_FOR, LITERAL_WHILE, STATIC_INIT,                     INSTANCE_INIT"/>
+            <property name="tokens" value="CLASS_DEF, METHOD_DEF, CTOR_DEF, LITERAL_FOR, LITERAL_WHILE, STATIC_INIT,                     INSTANCE_INIT"/>
         </module>
         <module name="WhitespaceAround">
             <property name="allowEmptyConstructors" value="true"/>
             <property name="allowEmptyMethods" value="true"/>
             <property name="allowEmptyTypes" value="true"/>
             <property name="allowEmptyLoops" value="true"/>
-            <message key="ws.notFollowed"
-                     value="WhitespaceAround: ''{0}'' is not followed by whitespace. Empty blocks may only be represented as '{}' when not part of a multi-block statement (4.1.3)"/>
+            <message key="ws.notFollowed" value="WhitespaceAround: ''{0}'' is not followed by whitespace. Empty blocks may only be represented as '{}' when not part of a multi-block statement (4.1.3)"/>
             <message key="ws.notPreceded" value="WhitespaceAround: ''{0}'' is not preceded with whitespace."/>
         </module>
         <module name="OneStatementPerLine"/>
@@ -196,8 +190,7 @@
         <module name="ParenPad"/>
         <module name="OperatorWrap">
             <property name="option" value="NL"/>
-            <property name="tokens"
-                      value="BAND, BOR, BSR, BXOR, DIV, EQUAL, GE, GT, LAND, LE, LITERAL_INSTANCEOF, LOR,                     LT, MINUS, MOD, NOT_EQUAL, PLUS, QUESTION, SL, SR, STAR, METHOD_REF "/>
+            <property name="tokens" value="BAND, BOR, BSR, BXOR, DIV, EQUAL, GE, GT, LAND, LE, LITERAL_INSTANCEOF, LOR,                     LT, MINUS, MOD, NOT_EQUAL, PLUS, QUESTION, SL, SR, STAR, METHOD_REF "/>
         </module>
         <module name="AnnotationLocation">
             <property name="id" value="AnnotationLocationMostCases"/>
@@ -211,8 +204,7 @@
         <module name="NonEmptyAtclauseDescription"/>
         <module name="JavadocTagContinuationIndentation"/>
         <module name="SummaryJavadoc">
-            <property name="forbiddenSummaryFragments"
-                      value="^@return the *|^This method returns |^A [{]@code [a-zA-Z0-9]+[}]( is a )"/>
+            <property name="forbiddenSummaryFragments" value="^@return the *|^This method returns |^A [{]@code [a-zA-Z0-9]+[}]( is a )"/>
         </module>
         <module name="JavadocParagraph"/>
         <module name="AtclauseOrder">
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/server/ClusterTSServiceImpl.java b/cluster/src/main/java/org/apache/iotdb/cluster/server/ClusterTSServiceImpl.java
index 0c7528dc22..f740f6aaf0 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/server/ClusterTSServiceImpl.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/server/ClusterTSServiceImpl.java
@@ -19,9 +19,11 @@
 
 package org.apache.iotdb.cluster.server;
 
+import org.apache.iotdb.cluster.query.manage.ClusterSessionManager;
 import org.apache.iotdb.cluster.server.basic.ClusterServiceProvider;
 import org.apache.iotdb.common.rpc.thrift.TSStatus;
 import org.apache.iotdb.db.qp.physical.PhysicalPlan;
+import org.apache.iotdb.db.query.control.SessionManager;
 import org.apache.iotdb.db.service.IoTDB;
 import org.apache.iotdb.db.service.thrift.impl.TSServiceImpl;
 
@@ -44,4 +46,9 @@ public class ClusterTSServiceImpl extends TSServiceImpl {
   protected TSStatus executeNonQueryPlan(PhysicalPlan plan) {
     return clusterServiceProvider.executeNonQueryPlan(plan);
   }
+
+  @Override
+  public SessionManager getSessionManager() {
+    return ClusterSessionManager.getInstance();
+  }
 }
diff --git a/integration/src/main/java/org/apache/iotdb/integration/env/ClusterEnvBase.java b/integration/src/main/java/org/apache/iotdb/integration/env/ClusterEnvBase.java
index b7ae7ff61c..06e153c72e 100644
--- a/integration/src/main/java/org/apache/iotdb/integration/env/ClusterEnvBase.java
+++ b/integration/src/main/java/org/apache/iotdb/integration/env/ClusterEnvBase.java
@@ -19,6 +19,7 @@
 package org.apache.iotdb.integration.env;
 
 import org.apache.iotdb.itbase.env.BaseEnv;
+import org.apache.iotdb.itbase.runtime.ClusterTestConnection;
 import org.apache.iotdb.jdbc.Config;
 import org.apache.iotdb.jdbc.Constant;
 import org.apache.iotdb.jdbc.IoTDBConnection;
@@ -37,7 +38,6 @@ import java.sql.Statement;
 import java.util.ArrayList;
 import java.util.List;
 
-import static org.apache.iotdb.jdbc.Config.VERSION;
 import static org.junit.Assert.fail;
 
 public abstract class ClusterEnvBase implements BaseEnv {
@@ -212,11 +212,11 @@ public abstract class ClusterEnvBase implements BaseEnv {
 
   @Override
   public Connection getConnection() throws SQLException {
-    Connection connection = null;
-
+    Connection writeConnection = null;
+    List<Connection> readConnections = new ArrayList<>();
     try {
       Class.forName(Config.JDBC_DRIVER_NAME);
-      connection =
+      Connection connection =
           DriverManager.getConnection(
               Config.IOTDB_URL_PREFIX
                   + this.nodes.get(0).getIp()
@@ -224,11 +224,29 @@ public abstract class ClusterEnvBase implements BaseEnv {
                   + this.nodes.get(0).getPort(),
               System.getProperty("User", "root"),
               System.getProperty("Password", "root"));
+      writeConnection = connection;
     } catch (ClassNotFoundException e) {
       e.printStackTrace();
       fail();
     }
-    return connection;
+    for (int i = 1; i < this.nodes.size(); i++) {
+      try {
+        Class.forName(Config.JDBC_DRIVER_NAME);
+        Connection readConnection =
+            DriverManager.getConnection(
+                Config.IOTDB_URL_PREFIX
+                    + this.nodes.get(i).getIp()
+                    + ":"
+                    + this.nodes.get(i).getPort(),
+                System.getProperty("User", "root"),
+                System.getProperty("Password", "root"));
+        readConnections.add(readConnection);
+      } catch (ClassNotFoundException e) {
+        e.printStackTrace();
+        fail();
+      }
+    }
+    return new ClusterTestConnection(writeConnection, readConnections);
   }
 
   public IoTDBConnection getConnection(int queryTimeout) throws SQLException {
@@ -265,7 +283,7 @@ public abstract class ClusterEnvBase implements BaseEnv {
                   + ":"
                   + this.nodes.get(0).getPort()
                   + "?"
-                  + VERSION
+                  + Config.VERSION
                   + "="
                   + version.toString(),
               System.getProperty("User", "root"),
diff --git a/integration/src/main/java/org/apache/iotdb/itbase/runtime/ClusterTestConnection.java b/integration/src/main/java/org/apache/iotdb/itbase/runtime/ClusterTestConnection.java
new file mode 100644
index 0000000000..5437c550a1
--- /dev/null
+++ b/integration/src/main/java/org/apache/iotdb/itbase/runtime/ClusterTestConnection.java
@@ -0,0 +1,300 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.itbase.runtime;
+
+import java.sql.Array;
+import java.sql.Blob;
+import java.sql.CallableStatement;
+import java.sql.Clob;
+import java.sql.Connection;
+import java.sql.DatabaseMetaData;
+import java.sql.NClob;
+import java.sql.PreparedStatement;
+import java.sql.SQLClientInfoException;
+import java.sql.SQLException;
+import java.sql.SQLWarning;
+import java.sql.SQLXML;
+import java.sql.Savepoint;
+import java.sql.Statement;
+import java.sql.Struct;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.concurrent.Executor;
+
+public class ClusterTestConnection implements Connection {
+
+  private final Connection writeConn;
+  private final List<Connection> readConns;
+
+  public ClusterTestConnection(Connection writeConn, List<Connection> readConns) {
+    this.writeConn = writeConn;
+    this.readConns = readConns;
+  }
+
+  @Override
+  public Statement createStatement() throws SQLException {
+    return new ClusterTestStatement(writeConn, readConns);
+  }
+
+  @Override
+  public PreparedStatement prepareStatement(String sql) {
+    throw new UnsupportedOperationException();
+  }
+
+  @Override
+  public CallableStatement prepareCall(String sql) {
+    throw new UnsupportedOperationException();
+  }
+
+  @Override
+  public String nativeSQL(String sql) {
+    throw new UnsupportedOperationException();
+  }
+
+  @Override
+  public void setAutoCommit(boolean autoCommit) {
+    throw new UnsupportedOperationException();
+  }
+
+  @Override
+  public boolean getAutoCommit() {
+    throw new UnsupportedOperationException();
+  }
+
+  @Override
+  public void commit() {
+    throw new UnsupportedOperationException();
+  }
+
+  @Override
+  public void rollback() {
+    throw new UnsupportedOperationException();
+  }
+
+  @Override
+  public void close() throws SQLException {}
+
+  @Override
+  public boolean isClosed() throws SQLException {
+    return false;
+  }
+
+  @Override
+  public DatabaseMetaData getMetaData() throws SQLException {
+    return null;
+  }
+
+  @Override
+  public void setReadOnly(boolean readOnly) throws SQLException {}
+
+  @Override
+  public boolean isReadOnly() throws SQLException {
+    return false;
+  }
+
+  @Override
+  public void setCatalog(String catalog) throws SQLException {}
+
+  @Override
+  public String getCatalog() throws SQLException {
+    return null;
+  }
+
+  @Override
+  public void setTransactionIsolation(int level) throws SQLException {}
+
+  @Override
+  public int getTransactionIsolation() throws SQLException {
+    return 0;
+  }
+
+  @Override
+  public SQLWarning getWarnings() throws SQLException {
+    return null;
+  }
+
+  @Override
+  public void clearWarnings() throws SQLException {}
+
+  @Override
+  public Statement createStatement(int resultSetType, int resultSetConcurrency)
+      throws SQLException {
+    return null;
+  }
+
+  @Override
+  public PreparedStatement prepareStatement(String sql, int resultSetType, int resultSetConcurrency)
+      throws SQLException {
+    return null;
+  }
+
+  @Override
+  public CallableStatement prepareCall(String sql, int resultSetType, int resultSetConcurrency)
+      throws SQLException {
+    return null;
+  }
+
+  @Override
+  public Map<String, Class<?>> getTypeMap() throws SQLException {
+    return null;
+  }
+
+  @Override
+  public void setTypeMap(Map<String, Class<?>> map) throws SQLException {}
+
+  @Override
+  public void setHoldability(int holdability) throws SQLException {}
+
+  @Override
+  public int getHoldability() throws SQLException {
+    return 0;
+  }
+
+  @Override
+  public Savepoint setSavepoint() throws SQLException {
+    return null;
+  }
+
+  @Override
+  public Savepoint setSavepoint(String name) throws SQLException {
+    return null;
+  }
+
+  @Override
+  public void rollback(Savepoint savepoint) throws SQLException {}
+
+  @Override
+  public void releaseSavepoint(Savepoint savepoint) throws SQLException {}
+
+  @Override
+  public Statement createStatement(
+      int resultSetType, int resultSetConcurrency, int resultSetHoldability) throws SQLException {
+    return null;
+  }
+
+  @Override
+  public PreparedStatement prepareStatement(
+      String sql, int resultSetType, int resultSetConcurrency, int resultSetHoldability)
+      throws SQLException {
+    return null;
+  }
+
+  @Override
+  public CallableStatement prepareCall(
+      String sql, int resultSetType, int resultSetConcurrency, int resultSetHoldability)
+      throws SQLException {
+    return null;
+  }
+
+  @Override
+  public PreparedStatement prepareStatement(String sql, int autoGeneratedKeys) throws SQLException {
+    return null;
+  }
+
+  @Override
+  public PreparedStatement prepareStatement(String sql, int[] columnIndexes) throws SQLException {
+    return null;
+  }
+
+  @Override
+  public PreparedStatement prepareStatement(String sql, String[] columnNames) throws SQLException {
+    return null;
+  }
+
+  @Override
+  public Clob createClob() throws SQLException {
+    return null;
+  }
+
+  @Override
+  public Blob createBlob() throws SQLException {
+    return null;
+  }
+
+  @Override
+  public NClob createNClob() throws SQLException {
+    return null;
+  }
+
+  @Override
+  public SQLXML createSQLXML() throws SQLException {
+    return null;
+  }
+
+  @Override
+  public boolean isValid(int timeout) throws SQLException {
+    return false;
+  }
+
+  @Override
+  public void setClientInfo(String name, String value) throws SQLClientInfoException {}
+
+  @Override
+  public void setClientInfo(Properties properties) throws SQLClientInfoException {}
+
+  @Override
+  public String getClientInfo(String name) throws SQLException {
+    return null;
+  }
+
+  @Override
+  public Properties getClientInfo() throws SQLException {
+    return null;
+  }
+
+  @Override
+  public Array createArrayOf(String typeName, Object[] elements) throws SQLException {
+    return null;
+  }
+
+  @Override
+  public Struct createStruct(String typeName, Object[] attributes) throws SQLException {
+    return null;
+  }
+
+  @Override
+  public void setSchema(String schema) throws SQLException {}
+
+  @Override
+  public String getSchema() throws SQLException {
+    return null;
+  }
+
+  @Override
+  public void abort(Executor executor) throws SQLException {}
+
+  @Override
+  public void setNetworkTimeout(Executor executor, int milliseconds) throws SQLException {}
+
+  @Override
+  public int getNetworkTimeout() throws SQLException {
+    return 0;
+  }
+
+  @Override
+  public <T> T unwrap(Class<T> iface) throws SQLException {
+    return null;
+  }
+
+  @Override
+  public boolean isWrapperFor(Class<?> iface) throws SQLException {
+    return false;
+  }
+}
diff --git a/integration/src/main/java/org/apache/iotdb/itbase/runtime/ClusterTestResultSet.java b/integration/src/main/java/org/apache/iotdb/itbase/runtime/ClusterTestResultSet.java
new file mode 100644
index 0000000000..e9f6b7714e
--- /dev/null
+++ b/integration/src/main/java/org/apache/iotdb/itbase/runtime/ClusterTestResultSet.java
@@ -0,0 +1,865 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.itbase.runtime;
+
+import java.io.InputStream;
+import java.io.Reader;
+import java.math.BigDecimal;
+import java.net.URL;
+import java.sql.Array;
+import java.sql.Blob;
+import java.sql.Clob;
+import java.sql.Date;
+import java.sql.NClob;
+import java.sql.Ref;
+import java.sql.ResultSet;
+import java.sql.ResultSetMetaData;
+import java.sql.RowId;
+import java.sql.SQLException;
+import java.sql.SQLWarning;
+import java.sql.SQLXML;
+import java.sql.Statement;
+import java.sql.Time;
+import java.sql.Timestamp;
+import java.util.ArrayList;
+import java.util.Calendar;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+
+public class ClusterTestResultSet implements ResultSet {
+
+  private final List<ResultSet> resultSets;
+
+  public ClusterTestResultSet(List<Statement> statements, String sql) throws SQLException {
+    resultSets = new ArrayList<>(statements.size());
+    for (Statement statement : statements) {
+      resultSets.add(statement.executeQuery(sql));
+    }
+  }
+
+  @Override
+  public boolean next() throws SQLException {
+    boolean hasNext = resultSets.get(0).next();
+    for (int i = 1; i < resultSets.size(); i++) {
+      boolean currentSetHasNext = resultSets.get(i).next();
+      if (currentSetHasNext != hasNext) {
+        throw new SQLException("datasets are inconsistent!");
+      }
+    }
+    return hasNext;
+  }
+
+  @Override
+  public void close() throws SQLException {
+    SQLException ex = null;
+    for (ResultSet rs : resultSets) {
+      try {
+        rs.close();
+      } catch (SQLException e) {
+        ex = e;
+      }
+    }
+    if (ex != null) {
+      throw ex;
+    }
+  }
+
+  @Override
+  public boolean wasNull() throws SQLException {
+    boolean wasNull = resultSets.get(0).wasNull();
+    for (int i = 1; i < resultSets.size(); i++) {
+      boolean currentWasNull = resultSets.get(i).wasNull();
+      if (currentWasNull != wasNull) {
+        throw new SQLException("datasets are inconsistent!");
+      }
+    }
+    return wasNull;
+  }
+
+  @Override
+  public String getString(String columnLabel) throws SQLException {
+    String result = resultSets.get(0).getString(columnLabel);
+    System.out.println("Result 0 get [" + columnLabel + "]: " + result);
+    for (int i = 1; i < resultSets.size(); i++) {
+      String currentResult = resultSets.get(i).getString(columnLabel);
+      System.out.println("Result " + i + " get [" + columnLabel + "]: " + result);
+      if (!Objects.equals(result, currentResult)) {
+        throw new SQLException("datasets are inconsistent!");
+      }
+    }
+    return result;
+  }
+
+  @Override
+  public String getString(int columnIndex) throws SQLException {
+    return null;
+  }
+
+  @Override
+  public boolean getBoolean(int columnIndex) throws SQLException {
+    return false;
+  }
+
+  @Override
+  public byte getByte(int columnIndex) throws SQLException {
+    return 0;
+  }
+
+  @Override
+  public short getShort(int columnIndex) throws SQLException {
+    return 0;
+  }
+
+  @Override
+  public int getInt(int columnIndex) throws SQLException {
+    return 0;
+  }
+
+  @Override
+  public long getLong(int columnIndex) throws SQLException {
+    return 0;
+  }
+
+  @Override
+  public float getFloat(int columnIndex) throws SQLException {
+    return 0;
+  }
+
+  @Override
+  public double getDouble(int columnIndex) throws SQLException {
+    return 0;
+  }
+
+  @Override
+  public BigDecimal getBigDecimal(int columnIndex, int scale) throws SQLException {
+    return null;
+  }
+
+  @Override
+  public byte[] getBytes(int columnIndex) throws SQLException {
+    return new byte[0];
+  }
+
+  @Override
+  public Date getDate(int columnIndex) throws SQLException {
+    return null;
+  }
+
+  @Override
+  public Time getTime(int columnIndex) throws SQLException {
+    return null;
+  }
+
+  @Override
+  public Timestamp getTimestamp(int columnIndex) throws SQLException {
+    return null;
+  }
+
+  @Override
+  public InputStream getAsciiStream(int columnIndex) throws SQLException {
+    return null;
+  }
+
+  @Override
+  public InputStream getUnicodeStream(int columnIndex) throws SQLException {
+    return null;
+  }
+
+  @Override
+  public InputStream getBinaryStream(int columnIndex) throws SQLException {
+    return null;
+  }
+
+  @Override
+  public boolean getBoolean(String columnLabel) throws SQLException {
+    return false;
+  }
+
+  @Override
+  public byte getByte(String columnLabel) throws SQLException {
+    return 0;
+  }
+
+  @Override
+  public short getShort(String columnLabel) throws SQLException {
+    return 0;
+  }
+
+  @Override
+  public int getInt(String columnLabel) throws SQLException {
+    return 0;
+  }
+
+  @Override
+  public long getLong(String columnLabel) throws SQLException {
+    return 0;
+  }
+
+  @Override
+  public float getFloat(String columnLabel) throws SQLException {
+    return 0;
+  }
+
+  @Override
+  public double getDouble(String columnLabel) throws SQLException {
+    return 0;
+  }
+
+  @Override
+  public BigDecimal getBigDecimal(String columnLabel, int scale) throws SQLException {
+    return null;
+  }
+
+  @Override
+  public byte[] getBytes(String columnLabel) throws SQLException {
+    return new byte[0];
+  }
+
+  @Override
+  public Date getDate(String columnLabel) throws SQLException {
+    return null;
+  }
+
+  @Override
+  public Time getTime(String columnLabel) throws SQLException {
+    return null;
+  }
+
+  @Override
+  public Timestamp getTimestamp(String columnLabel) throws SQLException {
+    return null;
+  }
+
+  @Override
+  public InputStream getAsciiStream(String columnLabel) throws SQLException {
+    return null;
+  }
+
+  @Override
+  public InputStream getUnicodeStream(String columnLabel) throws SQLException {
+    return null;
+  }
+
+  @Override
+  public InputStream getBinaryStream(String columnLabel) throws SQLException {
+    return null;
+  }
+
+  @Override
+  public SQLWarning getWarnings() throws SQLException {
+    return null;
+  }
+
+  @Override
+  public void clearWarnings() throws SQLException {}
+
+  @Override
+  public String getCursorName() throws SQLException {
+    return null;
+  }
+
+  @Override
+  public ResultSetMetaData getMetaData() throws SQLException {
+    return null;
+  }
+
+  @Override
+  public Object getObject(int columnIndex) throws SQLException {
+    return null;
+  }
+
+  @Override
+  public Object getObject(String columnLabel) throws SQLException {
+    return null;
+  }
+
+  @Override
+  public int findColumn(String columnLabel) throws SQLException {
+    return 0;
+  }
+
+  @Override
+  public Reader getCharacterStream(int columnIndex) throws SQLException {
+    return null;
+  }
+
+  @Override
+  public Reader getCharacterStream(String columnLabel) throws SQLException {
+    return null;
+  }
+
+  @Override
+  public BigDecimal getBigDecimal(int columnIndex) throws SQLException {
+    return null;
+  }
+
+  @Override
+  public BigDecimal getBigDecimal(String columnLabel) throws SQLException {
+    return null;
+  }
+
+  @Override
+  public boolean isBeforeFirst() throws SQLException {
+    return false;
+  }
+
+  @Override
+  public boolean isAfterLast() throws SQLException {
+    return false;
+  }
+
+  @Override
+  public boolean isFirst() throws SQLException {
+    return false;
+  }
+
+  @Override
+  public boolean isLast() throws SQLException {
+    return false;
+  }
+
+  @Override
+  public void beforeFirst() throws SQLException {}
+
+  @Override
+  public void afterLast() throws SQLException {}
+
+  @Override
+  public boolean first() throws SQLException {
+    return false;
+  }
+
+  @Override
+  public boolean last() throws SQLException {
+    return false;
+  }
+
+  @Override
+  public int getRow() throws SQLException {
+    return 0;
+  }
+
+  @Override
+  public boolean absolute(int row) throws SQLException {
+    return false;
+  }
+
+  @Override
+  public boolean relative(int rows) throws SQLException {
+    return false;
+  }
+
+  @Override
+  public boolean previous() throws SQLException {
+    return false;
+  }
+
+  @Override
+  public void setFetchDirection(int direction) throws SQLException {}
+
+  @Override
+  public int getFetchDirection() throws SQLException {
+    return 0;
+  }
+
+  @Override
+  public void setFetchSize(int rows) throws SQLException {}
+
+  @Override
+  public int getFetchSize() throws SQLException {
+    return 0;
+  }
+
+  @Override
+  public int getType() throws SQLException {
+    return 0;
+  }
+
+  @Override
+  public int getConcurrency() throws SQLException {
+    return 0;
+  }
+
+  @Override
+  public boolean rowUpdated() throws SQLException {
+    return false;
+  }
+
+  @Override
+  public boolean rowInserted() throws SQLException {
+    return false;
+  }
+
+  @Override
+  public boolean rowDeleted() throws SQLException {
+    return false;
+  }
+
+  @Override
+  public void updateNull(int columnIndex) throws SQLException {}
+
+  @Override
+  public void updateBoolean(int columnIndex, boolean x) throws SQLException {}
+
+  @Override
+  public void updateByte(int columnIndex, byte x) throws SQLException {}
+
+  @Override
+  public void updateShort(int columnIndex, short x) throws SQLException {}
+
+  @Override
+  public void updateInt(int columnIndex, int x) throws SQLException {}
+
+  @Override
+  public void updateLong(int columnIndex, long x) throws SQLException {}
+
+  @Override
+  public void updateFloat(int columnIndex, float x) throws SQLException {}
+
+  @Override
+  public void updateDouble(int columnIndex, double x) throws SQLException {}
+
+  @Override
+  public void updateBigDecimal(int columnIndex, BigDecimal x) throws SQLException {}
+
+  @Override
+  public void updateString(int columnIndex, String x) throws SQLException {}
+
+  @Override
+  public void updateBytes(int columnIndex, byte[] x) throws SQLException {}
+
+  @Override
+  public void updateDate(int columnIndex, Date x) throws SQLException {}
+
+  @Override
+  public void updateTime(int columnIndex, Time x) throws SQLException {}
+
+  @Override
+  public void updateTimestamp(int columnIndex, Timestamp x) throws SQLException {}
+
+  @Override
+  public void updateAsciiStream(int columnIndex, InputStream x, int length) throws SQLException {}
+
+  @Override
+  public void updateBinaryStream(int columnIndex, InputStream x, int length) throws SQLException {}
+
+  @Override
+  public void updateCharacterStream(int columnIndex, Reader x, int length) throws SQLException {}
+
+  @Override
+  public void updateObject(int columnIndex, Object x, int scaleOrLength) throws SQLException {}
+
+  @Override
+  public void updateObject(int columnIndex, Object x) throws SQLException {}
+
+  @Override
+  public void updateNull(String columnLabel) throws SQLException {}
+
+  @Override
+  public void updateBoolean(String columnLabel, boolean x) throws SQLException {}
+
+  @Override
+  public void updateByte(String columnLabel, byte x) throws SQLException {}
+
+  @Override
+  public void updateShort(String columnLabel, short x) throws SQLException {}
+
+  @Override
+  public void updateInt(String columnLabel, int x) throws SQLException {}
+
+  @Override
+  public void updateLong(String columnLabel, long x) throws SQLException {}
+
+  @Override
+  public void updateFloat(String columnLabel, float x) throws SQLException {}
+
+  @Override
+  public void updateDouble(String columnLabel, double x) throws SQLException {}
+
+  @Override
+  public void updateBigDecimal(String columnLabel, BigDecimal x) throws SQLException {}
+
+  @Override
+  public void updateString(String columnLabel, String x) throws SQLException {}
+
+  @Override
+  public void updateBytes(String columnLabel, byte[] x) throws SQLException {}
+
+  @Override
+  public void updateDate(String columnLabel, Date x) throws SQLException {}
+
+  @Override
+  public void updateTime(String columnLabel, Time x) throws SQLException {}
+
+  @Override
+  public void updateTimestamp(String columnLabel, Timestamp x) throws SQLException {}
+
+  @Override
+  public void updateAsciiStream(String columnLabel, InputStream x, int length)
+      throws SQLException {}
+
+  @Override
+  public void updateBinaryStream(String columnLabel, InputStream x, int length)
+      throws SQLException {}
+
+  @Override
+  public void updateCharacterStream(String columnLabel, Reader reader, int length)
+      throws SQLException {}
+
+  @Override
+  public void updateObject(String columnLabel, Object x, int scaleOrLength) throws SQLException {}
+
+  @Override
+  public void updateObject(String columnLabel, Object x) throws SQLException {}
+
+  @Override
+  public void insertRow() throws SQLException {}
+
+  @Override
+  public void updateRow() throws SQLException {}
+
+  @Override
+  public void deleteRow() throws SQLException {}
+
+  @Override
+  public void refreshRow() throws SQLException {}
+
+  @Override
+  public void cancelRowUpdates() throws SQLException {}
+
+  @Override
+  public void moveToInsertRow() throws SQLException {}
+
+  @Override
+  public void moveToCurrentRow() throws SQLException {}
+
+  @Override
+  public Statement getStatement() throws SQLException {
+    return null;
+  }
+
+  @Override
+  public Object getObject(int columnIndex, Map<String, Class<?>> map) throws SQLException {
+    return null;
+  }
+
+  @Override
+  public Ref getRef(int columnIndex) throws SQLException {
+    return null;
+  }
+
+  @Override
+  public Blob getBlob(int columnIndex) throws SQLException {
+    return null;
+  }
+
+  @Override
+  public Clob getClob(int columnIndex) throws SQLException {
+    return null;
+  }
+
+  @Override
+  public Array getArray(int columnIndex) throws SQLException {
+    return null;
+  }
+
+  @Override
+  public Object getObject(String columnLabel, Map<String, Class<?>> map) throws SQLException {
+    return null;
+  }
+
+  @Override
+  public Ref getRef(String columnLabel) throws SQLException {
+    return null;
+  }
+
+  @Override
+  public Blob getBlob(String columnLabel) throws SQLException {
+    return null;
+  }
+
+  @Override
+  public Clob getClob(String columnLabel) throws SQLException {
+    return null;
+  }
+
+  @Override
+  public Array getArray(String columnLabel) throws SQLException {
+    return null;
+  }
+
+  @Override
+  public Date getDate(int columnIndex, Calendar cal) throws SQLException {
+    return null;
+  }
+
+  @Override
+  public Date getDate(String columnLabel, Calendar cal) throws SQLException {
+    return null;
+  }
+
+  @Override
+  public Time getTime(int columnIndex, Calendar cal) throws SQLException {
+    return null;
+  }
+
+  @Override
+  public Time getTime(String columnLabel, Calendar cal) throws SQLException {
+    return null;
+  }
+
+  @Override
+  public Timestamp getTimestamp(int columnIndex, Calendar cal) throws SQLException {
+    return null;
+  }
+
+  @Override
+  public Timestamp getTimestamp(String columnLabel, Calendar cal) throws SQLException {
+    return null;
+  }
+
+  @Override
+  public URL getURL(int columnIndex) throws SQLException {
+    return null;
+  }
+
+  @Override
+  public URL getURL(String columnLabel) throws SQLException {
+    return null;
+  }
+
+  @Override
+  public void updateRef(int columnIndex, Ref x) throws SQLException {}
+
+  @Override
+  public void updateRef(String columnLabel, Ref x) throws SQLException {}
+
+  @Override
+  public void updateBlob(int columnIndex, Blob x) throws SQLException {}
+
+  @Override
+  public void updateBlob(String columnLabel, Blob x) throws SQLException {}
+
+  @Override
+  public void updateClob(int columnIndex, Clob x) throws SQLException {}
+
+  @Override
+  public void updateClob(String columnLabel, Clob x) throws SQLException {}
+
+  @Override
+  public void updateArray(int columnIndex, Array x) throws SQLException {}
+
+  @Override
+  public void updateArray(String columnLabel, Array x) throws SQLException {}
+
+  @Override
+  public RowId getRowId(int columnIndex) throws SQLException {
+    return null;
+  }
+
+  @Override
+  public RowId getRowId(String columnLabel) throws SQLException {
+    return null;
+  }
+
+  @Override
+  public void updateRowId(int columnIndex, RowId x) throws SQLException {}
+
+  @Override
+  public void updateRowId(String columnLabel, RowId x) throws SQLException {}
+
+  @Override
+  public int getHoldability() throws SQLException {
+    return 0;
+  }
+
+  @Override
+  public boolean isClosed() throws SQLException {
+    return false;
+  }
+
+  @Override
+  public void updateNString(int columnIndex, String nString) throws SQLException {}
+
+  @Override
+  public void updateNString(String columnLabel, String nString) throws SQLException {}
+
+  @Override
+  public void updateNClob(int columnIndex, NClob nClob) throws SQLException {}
+
+  @Override
+  public void updateNClob(String columnLabel, NClob nClob) throws SQLException {}
+
+  @Override
+  public NClob getNClob(int columnIndex) throws SQLException {
+    return null;
+  }
+
+  @Override
+  public NClob getNClob(String columnLabel) throws SQLException {
+    return null;
+  }
+
+  @Override
+  public SQLXML getSQLXML(int columnIndex) throws SQLException {
+    return null;
+  }
+
+  @Override
+  public SQLXML getSQLXML(String columnLabel) throws SQLException {
+    return null;
+  }
+
+  @Override
+  public void updateSQLXML(int columnIndex, SQLXML xmlObject) throws SQLException {}
+
+  @Override
+  public void updateSQLXML(String columnLabel, SQLXML xmlObject) throws SQLException {}
+
+  @Override
+  public String getNString(int columnIndex) throws SQLException {
+    return null;
+  }
+
+  @Override
+  public String getNString(String columnLabel) throws SQLException {
+    return null;
+  }
+
+  @Override
+  public Reader getNCharacterStream(int columnIndex) throws SQLException {
+    return null;
+  }
+
+  @Override
+  public Reader getNCharacterStream(String columnLabel) throws SQLException {
+    return null;
+  }
+
+  @Override
+  public void updateNCharacterStream(int columnIndex, Reader x, long length) throws SQLException {}
+
+  @Override
+  public void updateNCharacterStream(String columnLabel, Reader reader, long length)
+      throws SQLException {}
+
+  @Override
+  public void updateAsciiStream(int columnIndex, InputStream x, long length) throws SQLException {}
+
+  @Override
+  public void updateBinaryStream(int columnIndex, InputStream x, long length) throws SQLException {}
+
+  @Override
+  public void updateCharacterStream(int columnIndex, Reader x, long length) throws SQLException {}
+
+  @Override
+  public void updateAsciiStream(String columnLabel, InputStream x, long length)
+      throws SQLException {}
+
+  @Override
+  public void updateBinaryStream(String columnLabel, InputStream x, long length)
+      throws SQLException {}
+
+  @Override
+  public void updateCharacterStream(String columnLabel, Reader reader, long length)
+      throws SQLException {}
+
+  @Override
+  public void updateBlob(int columnIndex, InputStream inputStream, long length)
+      throws SQLException {}
+
+  @Override
+  public void updateBlob(String columnLabel, InputStream inputStream, long length)
+      throws SQLException {}
+
+  @Override
+  public void updateClob(int columnIndex, Reader reader, long length) throws SQLException {}
+
+  @Override
+  public void updateClob(String columnLabel, Reader reader, long length) throws SQLException {}
+
+  @Override
+  public void updateNClob(int columnIndex, Reader reader, long length) throws SQLException {}
+
+  @Override
+  public void updateNClob(String columnLabel, Reader reader, long length) throws SQLException {}
+
+  @Override
+  public void updateNCharacterStream(int columnIndex, Reader x) throws SQLException {}
+
+  @Override
+  public void updateNCharacterStream(String columnLabel, Reader reader) throws SQLException {}
+
+  @Override
+  public void updateAsciiStream(int columnIndex, InputStream x) throws SQLException {}
+
+  @Override
+  public void updateBinaryStream(int columnIndex, InputStream x) throws SQLException {}
+
+  @Override
+  public void updateCharacterStream(int columnIndex, Reader x) throws SQLException {}
+
+  @Override
+  public void updateAsciiStream(String columnLabel, InputStream x) throws SQLException {}
+
+  @Override
+  public void updateBinaryStream(String columnLabel, InputStream x) throws SQLException {}
+
+  @Override
+  public void updateCharacterStream(String columnLabel, Reader reader) throws SQLException {}
+
+  @Override
+  public void updateBlob(int columnIndex, InputStream inputStream) throws SQLException {}
+
+  @Override
+  public void updateBlob(String columnLabel, InputStream inputStream) throws SQLException {}
+
+  @Override
+  public void updateClob(int columnIndex, Reader reader) throws SQLException {}
+
+  @Override
+  public void updateClob(String columnLabel, Reader reader) throws SQLException {}
+
+  @Override
+  public void updateNClob(int columnIndex, Reader reader) throws SQLException {}
+
+  @Override
+  public void updateNClob(String columnLabel, Reader reader) throws SQLException {}
+
+  @Override
+  public <T> T getObject(int columnIndex, Class<T> type) throws SQLException {
+    return null;
+  }
+
+  @Override
+  public <T> T getObject(String columnLabel, Class<T> type) throws SQLException {
+    return null;
+  }
+
+  @Override
+  public <T> T unwrap(Class<T> iface) throws SQLException {
+    return null;
+  }
+
+  @Override
+  public boolean isWrapperFor(Class<?> iface) throws SQLException {
+    return false;
+  }
+}
diff --git a/integration/src/main/java/org/apache/iotdb/itbase/runtime/ClusterTestStatement.java b/integration/src/main/java/org/apache/iotdb/itbase/runtime/ClusterTestStatement.java
new file mode 100644
index 0000000000..bffeb95601
--- /dev/null
+++ b/integration/src/main/java/org/apache/iotdb/itbase/runtime/ClusterTestStatement.java
@@ -0,0 +1,284 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.itbase.runtime;
+
+import java.sql.Connection;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.sql.SQLWarning;
+import java.sql.Statement;
+import java.util.ArrayList;
+import java.util.List;
+
+public class ClusterTestStatement implements Statement {
+
+  private final Statement writeStatement;
+  private final List<Statement> readStatements = new ArrayList<>();
+  private boolean closed = false;
+  private int maxRows = Integer.MAX_VALUE;
+  private int queryTimeout = Integer.MAX_VALUE;
+
+  public ClusterTestStatement(Connection writeConnection, List<Connection> readConnections)
+      throws SQLException {
+    this.writeStatement = writeConnection.createStatement();
+    updateConfig(writeStatement);
+    for (Connection readConnection : readConnections) {
+      Statement readStatement = readConnection.createStatement();
+      this.readStatements.add(readStatement);
+      updateConfig(readStatement);
+    }
+  }
+
+  private void updateConfig(Statement statement) throws SQLException {
+    maxRows = Math.min(statement.getMaxRows(), maxRows);
+    queryTimeout = Math.min(statement.getQueryTimeout(), queryTimeout);
+  }
+
+  @Override
+  public ResultSet executeQuery(String sql) throws SQLException {
+    return new ClusterTestResultSet(readStatements, sql);
+  }
+
+  @Override
+  public int executeUpdate(String sql) throws SQLException {
+    return writeStatement.executeUpdate(sql);
+  }
+
+  @Override
+  public void close() throws SQLException {
+    if (writeStatement != null) {
+      writeStatement.close();
+    }
+    readStatements.forEach(
+        r -> {
+          try {
+            r.close();
+          } catch (SQLException e) {
+            throw new RuntimeException(e);
+          }
+        });
+    closed = true;
+  }
+
+  @Override
+  public int getMaxFieldSize() throws SQLException {
+    return writeStatement.getMaxFieldSize();
+  }
+
+  @Override
+  public void setMaxFieldSize(int max) throws SQLException {
+    writeStatement.setMaxFieldSize(max);
+  }
+
+  @Override
+  public int getMaxRows() {
+    return maxRows;
+  }
+
+  @Override
+  public void setMaxRows(int max) throws SQLException {
+    for (Statement readStatement : readStatements) {
+      readStatement.setMaxRows(max);
+    }
+    maxRows = max;
+  }
+
+  @Override
+  public void setEscapeProcessing(boolean enable) throws SQLException {
+    writeStatement.setEscapeProcessing(enable);
+    for (Statement readStatement : readStatements) {
+      readStatement.setEscapeProcessing(enable);
+    }
+  }
+
+  @Override
+  public int getQueryTimeout() throws SQLException {
+    return 0;
+  }
+
+  @Override
+  public void setQueryTimeout(int seconds) throws SQLException {}
+
+  @Override
+  public void cancel() throws SQLException {
+    throw new UnsupportedOperationException();
+  }
+
+  @Override
+  public SQLWarning getWarnings() throws SQLException {
+    throw new UnsupportedOperationException();
+  }
+
+  @Override
+  public void clearWarnings() throws SQLException {
+    throw new UnsupportedOperationException();
+  }
+
+  @Override
+  public void setCursorName(String name) throws SQLException {
+    throw new UnsupportedOperationException();
+  }
+
+  @Override
+  public boolean execute(String sql) throws SQLException {
+    return writeStatement.execute(sql);
+  }
+
+  @Override
+  public ResultSet getResultSet() throws SQLException {
+    throw new UnsupportedOperationException(
+        "In integration test you must use Statement.executeQuery() to query data");
+  }
+
+  @Override
+  public int getUpdateCount() throws SQLException {
+    return 0;
+  }
+
+  @Override
+  public boolean getMoreResults() throws SQLException {
+    return false;
+  }
+
+  @Override
+  public void setFetchDirection(int direction) throws SQLException {}
+
+  @Override
+  public int getFetchDirection() throws SQLException {
+    return 0;
+  }
+
+  @Override
+  public void setFetchSize(int rows) throws SQLException {}
+
+  @Override
+  public int getFetchSize() throws SQLException {
+    return 0;
+  }
+
+  @Override
+  public int getResultSetConcurrency() throws SQLException {
+    return 0;
+  }
+
+  @Override
+  public int getResultSetType() throws SQLException {
+    return 0;
+  }
+
+  @Override
+  public void addBatch(String sql) throws SQLException {
+    writeStatement.addBatch(sql);
+  }
+
+  @Override
+  public void clearBatch() throws SQLException {
+    writeStatement.clearBatch();
+  }
+
+  @Override
+  public int[] executeBatch() throws SQLException {
+    return writeStatement.executeBatch();
+  }
+
+  @Override
+  public Connection getConnection() throws SQLException {
+    throw new UnsupportedOperationException();
+  }
+
+  @Override
+  public boolean getMoreResults(int current) throws SQLException {
+    throw new UnsupportedOperationException();
+  }
+
+  @Override
+  public ResultSet getGeneratedKeys() throws SQLException {
+    throw new UnsupportedOperationException();
+  }
+
+  @Override
+  public int executeUpdate(String sql, int autoGeneratedKeys) throws SQLException {
+    throw new UnsupportedOperationException();
+  }
+
+  @Override
+  public int executeUpdate(String sql, int[] columnIndexes) throws SQLException {
+    throw new UnsupportedOperationException();
+  }
+
+  @Override
+  public int executeUpdate(String sql, String[] columnNames) throws SQLException {
+    throw new UnsupportedOperationException();
+  }
+
+  @Override
+  public boolean execute(String sql, int autoGeneratedKeys) throws SQLException {
+    throw new UnsupportedOperationException();
+  }
+
+  @Override
+  public boolean execute(String sql, int[] columnIndexes) throws SQLException {
+    throw new UnsupportedOperationException();
+  }
+
+  @Override
+  public boolean execute(String sql, String[] columnNames) throws SQLException {
+    throw new UnsupportedOperationException();
+  }
+
+  @Override
+  public int getResultSetHoldability() throws SQLException {
+    throw new UnsupportedOperationException();
+  }
+
+  @Override
+  public boolean isClosed() throws SQLException {
+    return closed;
+  }
+
+  @Override
+  public void setPoolable(boolean poolable) throws SQLException {
+    throw new UnsupportedOperationException();
+  }
+
+  @Override
+  public boolean isPoolable() throws SQLException {
+    throw new UnsupportedOperationException();
+  }
+
+  @Override
+  public void closeOnCompletion() throws SQLException {
+    throw new UnsupportedOperationException();
+  }
+
+  @Override
+  public boolean isCloseOnCompletion() throws SQLException {
+    throw new UnsupportedOperationException();
+  }
+
+  @Override
+  public <T> T unwrap(Class<T> iface) {
+    throw new UnsupportedOperationException();
+  }
+
+  @Override
+  public boolean isWrapperFor(Class<?> iface) throws SQLException {
+    throw new UnsupportedOperationException();
+  }
+}
diff --git a/integration/src/test/java/org/apache/iotdb/db/integration/aggregation/IoTDBAggregationIT.java b/integration/src/test/java/org/apache/iotdb/db/integration/aggregation/IoTDBAggregationIT.java
index f3bd89cd8b..3937c8b51a 100644
--- a/integration/src/test/java/org/apache/iotdb/db/integration/aggregation/IoTDBAggregationIT.java
+++ b/integration/src/test/java/org/apache/iotdb/db/integration/aggregation/IoTDBAggregationIT.java
@@ -129,13 +129,10 @@ public class IoTDBAggregationIT {
     String[] retArray = new String[] {"0,2", "0,4", "0,3"};
     try (Connection connection = EnvFactory.getEnv().getConnection();
         Statement statement = connection.createStatement()) {
-
-      boolean hasResultSet =
-          statement.execute("SELECT count(temperature) FROM root.ln.wf01.wt01 WHERE time > 3");
-
-      Assert.assertTrue(hasResultSet);
       int cnt;
-      try (ResultSet resultSet = statement.getResultSet()) {
+      try (ResultSet resultSet =
+          statement.executeQuery(
+              "SELECT count(temperature) FROM root.ln.wf01.wt01 WHERE time > 3")) {
         cnt = 0;
         while (resultSet.next()) {
           String ans =
@@ -148,11 +145,9 @@ public class IoTDBAggregationIT {
         Assert.assertEquals(1, cnt);
       }
 
-      hasResultSet =
-          statement.execute(
-              "SELECT count(temperature) FROM root.ln.wf01.wt01 WHERE time > 3 order by time desc");
-      Assert.assertTrue(hasResultSet);
-      try (ResultSet resultSet = statement.getResultSet()) {
+      try (ResultSet resultSet =
+          statement.executeQuery(
+              "SELECT count(temperature) FROM root.ln.wf01.wt01 WHERE time > 3 order by time desc")) {
         cnt = 0;
         while (resultSet.next()) {
           String ans =
@@ -165,11 +160,9 @@ public class IoTDBAggregationIT {
         Assert.assertEquals(1, cnt);
       }
 
-      hasResultSet =
-          statement.execute("SELECT min_time(temperature) FROM root.ln.wf01.wt01 WHERE time > 3");
-
-      Assert.assertTrue(hasResultSet);
-      try (ResultSet resultSet = statement.getResultSet()) {
+      try (ResultSet resultSet =
+          statement.executeQuery(
+              "SELECT min_time(temperature) FROM root.ln.wf01.wt01 WHERE time > 3")) {
         while (resultSet.next()) {
           String ans =
               resultSet.getString(TIMESTAMP_STR)
@@ -181,12 +174,9 @@ public class IoTDBAggregationIT {
         Assert.assertEquals(2, cnt);
       }
 
-      hasResultSet =
-          statement.execute(
-              "SELECT min_time(temperature) FROM root.ln.wf01.wt01 WHERE temperature > 3");
-
-      Assert.assertTrue(hasResultSet);
-      try (ResultSet resultSet = statement.getResultSet()) {
+      try (ResultSet resultSet =
+          statement.executeQuery(
+              "SELECT min_time(temperature) FROM root.ln.wf01.wt01 WHERE temperature > 3")) {
         while (resultSet.next()) {
           String ans =
               resultSet.getString(TIMESTAMP_STR)
diff --git a/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/TSServiceImpl.java b/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/TSServiceImpl.java
index 5949a128d6..1e1153a02f 100644
--- a/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/TSServiceImpl.java
+++ b/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/TSServiceImpl.java
@@ -161,7 +161,11 @@ import static org.apache.iotdb.db.utils.ErrorHandlingUtils.onQueryException;
 /** Thrift RPC implementation at server side. */
 public class TSServiceImpl implements TSIEventHandler {
 
-  private static final SessionManager SESSION_MANAGER = SessionManager.getInstance();
+  private final SessionManager SESSION_MANAGER = SessionManager.getInstance();
+
+  public SessionManager getSessionManager() {
+    return SESSION_MANAGER;
+  }
 
   private class QueryTask implements Callable<TSExecuteStatementResp> {
 
@@ -203,13 +207,13 @@ public class TSServiceImpl implements TSIEventHandler {
 
     @Override
     public TSExecuteStatementResp call() throws Exception {
-      String username = SESSION_MANAGER.getUsername(sessionId);
+      String username = getSessionManager().getUsername(sessionId);
       plan.setLoginUserName(username);
 
       QUERY_FREQUENCY_RECORDER.incrementAndGet();
       AUDIT_LOGGER.debug("Session {} execute Query: {}", sessionId, statement);
 
-      final long queryId = SESSION_MANAGER.requestQueryId(statementId, true);
+      final long queryId = getSessionManager().requestQueryId(statementId, true);
       QueryContext context =
           serviceProvider.genQueryContext(
               queryId, plan.isDebug(), queryStartTime, statement, timeout);
@@ -226,7 +230,7 @@ public class TSServiceImpl implements TSIEventHandler {
         resp.setQueryId(queryId);
         resp.setOperationType(plan.getOperatorType().toString());
       } catch (Exception e) {
-        SESSION_MANAGER.releaseQueryResourceNoExceptions(queryId);
+        getSessionManager().releaseQueryResourceNoExceptions(queryId);
         throw e;
       } finally {
         addOperationLatency(Operation.EXECUTE_QUERY, queryStartTime);
@@ -255,15 +259,16 @@ public class TSServiceImpl implements TSIEventHandler {
 
     @Override
     public TSFetchResultsResp call() throws Exception {
-      QueryDataSet queryDataSet = SESSION_MANAGER.getDataset(queryId);
+      QueryDataSet queryDataSet = getSessionManager().getDataset(queryId);
       TSFetchResultsResp resp = RpcUtils.getTSFetchResultsResp(TSStatusCode.SUCCESS_STATUS);
       try {
         if (isAlign) {
           TSQueryDataSet result =
-              fillRpcReturnData(fetchSize, queryDataSet, SESSION_MANAGER.getUsername(sessionId));
+              fillRpcReturnData(
+                  fetchSize, queryDataSet, getSessionManager().getUsername(sessionId));
           boolean hasResultSet = result.bufferForTime().limit() != 0;
           if (!hasResultSet) {
-            SESSION_MANAGER.releaseQueryResourceNoExceptions(queryId);
+            getSessionManager().releaseQueryResourceNoExceptions(queryId);
           }
           resp.setHasResultSet(hasResultSet);
           resp.setQueryDataSet(result);
@@ -271,7 +276,7 @@ public class TSServiceImpl implements TSIEventHandler {
         } else {
           TSQueryNonAlignDataSet nonAlignResult =
               fillRpcNonAlignReturnData(
-                  fetchSize, queryDataSet, SESSION_MANAGER.getUsername(sessionId));
+                  fetchSize, queryDataSet, getSessionManager().getUsername(sessionId));
           boolean hasResultSet = false;
           for (ByteBuffer timeBuffer : nonAlignResult.getTimeList()) {
             if (timeBuffer.limit() != 0) {
@@ -280,7 +285,7 @@ public class TSServiceImpl implements TSIEventHandler {
             }
           }
           if (!hasResultSet) {
-            SESSION_MANAGER.releaseQueryResourceNoExceptions(queryId);
+            getSessionManager().releaseQueryResourceNoExceptions(queryId);
           }
           resp.setHasResultSet(hasResultSet);
           resp.setNonAlignQueryDataSet(nonAlignResult);
@@ -289,7 +294,7 @@ public class TSServiceImpl implements TSIEventHandler {
         QUERY_TIME_MANAGER.unRegisterQuery(queryId, false);
         return resp;
       } catch (Exception e) {
-        SESSION_MANAGER.releaseQueryResourceNoExceptions(queryId);
+        getSessionManager().releaseQueryResourceNoExceptions(queryId);
         throw e;
       }
     }
@@ -312,8 +317,9 @@ public class TSServiceImpl implements TSIEventHandler {
   public TSOpenSessionResp openSession(TSOpenSessionReq req) throws TException {
     IoTDBConstant.ClientVersion clientVersion = parseClientVersion(req);
     BasicOpenSessionResp openSessionResp =
-        SESSION_MANAGER.openSession(
-            req.username, req.password, req.zoneId, req.client_protocol, clientVersion);
+        getSessionManager()
+            .openSession(
+                req.username, req.password, req.zoneId, req.client_protocol, clientVersion);
     TSStatus tsStatus = RpcUtils.getStatus(openSessionResp.getCode(), openSessionResp.getMessage());
     TSOpenSessionResp resp = new TSOpenSessionResp(tsStatus, CURRENT_RPC_VERSION);
     return resp.setSessionId(openSessionResp.getSessionId());
@@ -330,7 +336,7 @@ public class TSServiceImpl implements TSIEventHandler {
   @Override
   public TSStatus closeSession(TSCloseSessionReq req) {
     return new TSStatus(
-        !SESSION_MANAGER.closeSession(req.sessionId)
+        !getSessionManager().closeSession(req.sessionId)
             ? RpcUtils.getStatus(TSStatusCode.NOT_LOGIN_ERROR)
             : RpcUtils.getStatus(TSStatusCode.SUCCESS_STATUS));
   }
@@ -343,15 +349,20 @@ public class TSServiceImpl implements TSIEventHandler {
 
   @Override
   public TSStatus closeOperation(TSCloseOperationReq req) {
-    return SESSION_MANAGER.closeOperation(
-        req.sessionId, req.queryId, req.statementId, req.isSetStatementId(), req.isSetQueryId());
+    return getSessionManager()
+        .closeOperation(
+            req.sessionId,
+            req.queryId,
+            req.statementId,
+            req.isSetStatementId(),
+            req.isSetQueryId());
   }
 
   @Override
   public TSFetchMetadataResp fetchMetadata(TSFetchMetadataReq req) {
     TSFetchMetadataResp resp = new TSFetchMetadataResp();
 
-    if (!SESSION_MANAGER.checkLogin(req.getSessionId())) {
+    if (!getSessionManager().checkLogin(req.getSessionId())) {
       return resp.setStatus(getNotLoggedInStatus());
     }
 
@@ -497,7 +508,7 @@ public class TSServiceImpl implements TSIEventHandler {
     long t1 = System.currentTimeMillis();
     List<TSStatus> result = new ArrayList<>();
     boolean isAllSuccessful = true;
-    if (!SESSION_MANAGER.checkLogin(req.getSessionId())) {
+    if (!getSessionManager().checkLogin(req.getSessionId())) {
       return getNotLoggedInStatus();
     }
 
@@ -514,8 +525,8 @@ public class TSServiceImpl implements TSIEventHandler {
                 .getPlanner()
                 .parseSQLToPhysicalPlan(
                     statement,
-                    SESSION_MANAGER.getZoneId(req.sessionId),
-                    SESSION_MANAGER.getClientVersion(req.sessionId));
+                    getSessionManager().getZoneId(req.sessionId),
+                    getSessionManager().getClientVersion(req.sessionId));
         if (physicalPlan.isQuery() || physicalPlan.isSelectInto()) {
           throw new QueryInBatchStatementException(statement);
         }
@@ -529,7 +540,7 @@ public class TSServiceImpl implements TSIEventHandler {
             index = 0;
           }
 
-          TSStatus status = SESSION_MANAGER.checkAuthority(physicalPlan, req.getSessionId());
+          TSStatus status = getSessionManager().checkAuthority(physicalPlan, req.getSessionId());
           if (status != null) {
             insertRowsPlan.getResults().put(index, status);
             isAllSuccessful = false;
@@ -551,7 +562,7 @@ public class TSServiceImpl implements TSIEventHandler {
             multiPlan = new CreateMultiTimeSeriesPlan();
             executeList.add(multiPlan);
           }
-          TSStatus status = SESSION_MANAGER.checkAuthority(physicalPlan, req.getSessionId());
+          TSStatus status = getSessionManager().checkAuthority(physicalPlan, req.getSessionId());
           if (status != null) {
             multiPlan.getResults().put(i, status);
             isAllSuccessful = false;
@@ -603,7 +614,7 @@ public class TSServiceImpl implements TSIEventHandler {
   public TSExecuteStatementResp executeStatement(TSExecuteStatementReq req) {
     String statement = req.getStatement();
     try {
-      if (!SESSION_MANAGER.checkLogin(req.getSessionId())) {
+      if (!getSessionManager().checkLogin(req.getSessionId())) {
         return RpcUtils.getTSExecuteStatementResp(getNotLoggedInStatus());
       }
 
@@ -613,8 +624,8 @@ public class TSServiceImpl implements TSIEventHandler {
               .getPlanner()
               .parseSQLToPhysicalPlan(
                   statement,
-                  SESSION_MANAGER.getZoneId(req.getSessionId()),
-                  SESSION_MANAGER.getClientVersion(req.sessionId));
+                  getSessionManager().getZoneId(req.getSessionId()),
+                  getSessionManager().getClientVersion(req.sessionId));
 
       if (physicalPlan.isQuery()) {
         return submitQueryTask(physicalPlan, startTime, req);
@@ -641,7 +652,7 @@ public class TSServiceImpl implements TSIEventHandler {
   @Override
   public TSExecuteStatementResp executeQueryStatement(TSExecuteStatementReq req) {
     try {
-      if (!SESSION_MANAGER.checkLogin(req.getSessionId())) {
+      if (!getSessionManager().checkLogin(req.getSessionId())) {
         return RpcUtils.getTSExecuteStatementResp(getNotLoggedInStatus());
       }
 
@@ -652,8 +663,8 @@ public class TSServiceImpl implements TSIEventHandler {
               .getPlanner()
               .parseSQLToPhysicalPlan(
                   statement,
-                  SESSION_MANAGER.getZoneId(req.sessionId),
-                  SESSION_MANAGER.getClientVersion(req.sessionId));
+                  getSessionManager().getZoneId(req.sessionId),
+                  getSessionManager().getClientVersion(req.sessionId));
 
       if (physicalPlan.isQuery()) {
         return submitQueryTask(physicalPlan, startTime, req);
@@ -677,7 +688,7 @@ public class TSServiceImpl implements TSIEventHandler {
   @Override
   public TSExecuteStatementResp executeRawDataQuery(TSRawDataQueryReq req) {
     try {
-      if (!SESSION_MANAGER.checkLogin(req.getSessionId())) {
+      if (!getSessionManager().checkLogin(req.getSessionId())) {
         return RpcUtils.getTSExecuteStatementResp(getNotLoggedInStatus());
       }
 
@@ -687,8 +698,8 @@ public class TSServiceImpl implements TSIEventHandler {
               .getPlanner()
               .rawDataQueryReqToPhysicalPlan(
                   req,
-                  SESSION_MANAGER.getZoneId(req.sessionId),
-                  SESSION_MANAGER.getClientVersion(req.sessionId));
+                  getSessionManager().getZoneId(req.sessionId),
+                  getSessionManager().getClientVersion(req.sessionId));
 
       if (physicalPlan.isQuery()) {
         Future<TSExecuteStatementResp> resp =
@@ -723,7 +734,7 @@ public class TSServiceImpl implements TSIEventHandler {
   @Override
   public TSExecuteStatementResp executeLastDataQuery(TSLastDataQueryReq req) {
     try {
-      if (!SESSION_MANAGER.checkLogin(req.getSessionId())) {
+      if (!getSessionManager().checkLogin(req.getSessionId())) {
         return RpcUtils.getTSExecuteStatementResp(getNotLoggedInStatus());
       }
 
@@ -733,8 +744,8 @@ public class TSServiceImpl implements TSIEventHandler {
               .getPlanner()
               .lastDataQueryReqToPhysicalPlan(
                   req,
-                  SESSION_MANAGER.getZoneId(req.sessionId),
-                  SESSION_MANAGER.getClientVersion(req.sessionId));
+                  getSessionManager().getZoneId(req.sessionId),
+                  getSessionManager().getClientVersion(req.sessionId));
 
       if (physicalPlan.isQuery()) {
         Future<TSExecuteStatementResp> resp =
@@ -797,7 +808,7 @@ public class TSServiceImpl implements TSIEventHandler {
     List<? extends PartialPath> authPaths = plan.getAuthPaths();
     if (authPaths != null
         && !authPaths.isEmpty()
-        && !SESSION_MANAGER.checkAuthorization(plan, username)) {
+        && !getSessionManager().checkAuthorization(plan, username)) {
       return RpcUtils.getTSExecuteStatementResp(
           RpcUtils.getStatus(
               TSStatusCode.NO_PERMISSION_ERROR,
@@ -919,13 +930,13 @@ public class TSServiceImpl implements TSIEventHandler {
       long sessionId)
       throws IoTDBException, TException, SQLException, IOException, InterruptedException,
           QueryFilterOptimizationException {
-    TSStatus status = SESSION_MANAGER.checkAuthority(physicalPlan, sessionId);
+    TSStatus status = getSessionManager().checkAuthority(physicalPlan, sessionId);
     if (status != null) {
       return new TSExecuteStatementResp(status);
     }
 
     final long startTime = System.currentTimeMillis();
-    final long queryId = SESSION_MANAGER.requestQueryId(statementId, true);
+    final long queryId = getSessionManager().requestQueryId(statementId, true);
     QueryContext context =
         serviceProvider.genQueryContext(
             queryId, physicalPlan.isDebug(), startTime, statement, timeout);
@@ -934,7 +945,7 @@ public class TSServiceImpl implements TSIEventHandler {
 
     QUERY_FREQUENCY_RECORDER.incrementAndGet();
     AUDIT_LOGGER.debug(
-        "Session {} execute select into: {}", SESSION_MANAGER.getCurrSessionId(), statement);
+        "Session {} execute select into: {}", getSessionManager().getCurrSessionId(), statement);
     if (queryPlan.isEnableTracing()) {
       TRACING_MANAGER.setSeriesPathNum(queryId, queryPlan.getPaths().size());
     }
@@ -961,7 +972,7 @@ public class TSServiceImpl implements TSIEventHandler {
 
       return RpcUtils.getTSExecuteStatementResp(TSStatusCode.SUCCESS_STATUS).setQueryId(queryId);
     } finally {
-      SESSION_MANAGER.releaseQueryResourceNoExceptions(queryId);
+      getSessionManager().releaseQueryResourceNoExceptions(queryId);
       addOperationLatency(Operation.EXECUTE_SELECT_INTO, startTime);
       long costTime = System.currentTimeMillis() - startTime;
       if (costTime >= CONFIG.getSlowQueryThreshold()) {
@@ -975,7 +986,7 @@ public class TSServiceImpl implements TSIEventHandler {
     InsertMultiTabletsPlan insertMultiTabletsPlan = new InsertMultiTabletsPlan();
     for (int i = 0; i < insertTabletPlans.size(); i++) {
       InsertTabletPlan insertTabletPlan = insertTabletPlans.get(i);
-      TSStatus status = SESSION_MANAGER.checkAuthority(insertTabletPlan, sessionId);
+      TSStatus status = getSessionManager().checkAuthority(insertTabletPlan, sessionId);
 
       if (status != null) {
         // not authorized
@@ -991,11 +1002,11 @@ public class TSServiceImpl implements TSIEventHandler {
   @Override
   public TSFetchResultsResp fetchResults(TSFetchResultsReq req) {
     try {
-      if (!SESSION_MANAGER.checkLogin(req.getSessionId())) {
+      if (!getSessionManager().checkLogin(req.getSessionId())) {
         return RpcUtils.getTSFetchResultsResp(getNotLoggedInStatus());
       }
 
-      if (!SESSION_MANAGER.hasDataset(req.queryId)) {
+      if (!getSessionManager().hasDataset(req.queryId)) {
         return RpcUtils.getTSFetchResultsResp(
             RpcUtils.getStatus(TSStatusCode.EXECUTE_STATEMENT_ERROR, "Has not executed query"));
       }
@@ -1057,7 +1068,7 @@ public class TSServiceImpl implements TSIEventHandler {
   /** update statement can be: 1. select-into statement 2. non-query statement */
   @Override
   public TSExecuteStatementResp executeUpdateStatement(TSExecuteStatementReq req) {
-    if (!SESSION_MANAGER.checkLogin(req.getSessionId())) {
+    if (!getSessionManager().checkLogin(req.getSessionId())) {
       return RpcUtils.getTSExecuteStatementResp(getNotLoggedInStatus());
     }
 
@@ -1067,8 +1078,8 @@ public class TSServiceImpl implements TSIEventHandler {
               .getPlanner()
               .parseSQLToPhysicalPlan(
                   req.statement,
-                  SESSION_MANAGER.getZoneId(req.sessionId),
-                  SESSION_MANAGER.getClientVersion(req.sessionId));
+                  getSessionManager().getZoneId(req.sessionId),
+                  getSessionManager().getClientVersion(req.sessionId));
       return physicalPlan.isQuery()
           ? RpcUtils.getTSExecuteStatementResp(
               TSStatusCode.EXECUTE_STATEMENT_ERROR, "Statement is a query statement.")
@@ -1108,16 +1119,16 @@ public class TSServiceImpl implements TSIEventHandler {
   }
 
   private TSExecuteStatementResp executeNonQueryStatement(PhysicalPlan plan, long sessionId) {
-    TSStatus status = SESSION_MANAGER.checkAuthority(plan, sessionId);
+    TSStatus status = getSessionManager().checkAuthority(plan, sessionId);
     return status != null
         ? new TSExecuteStatementResp(status)
         : RpcUtils.getTSExecuteStatementResp(executeNonQueryPlan(plan))
-            .setQueryId(SESSION_MANAGER.requestQueryId(false));
+            .setQueryId(getSessionManager().requestQueryId(false));
   }
 
   @Override
   public void handleClientExit() {
-    Long sessionId = SESSION_MANAGER.getCurrSessionId();
+    Long sessionId = getSessionManager().getCurrSessionId();
     if (sessionId != null) {
       TSCloseSessionReq req = new TSCloseSessionReq(sessionId);
       closeSession(req);
@@ -1127,7 +1138,7 @@ public class TSServiceImpl implements TSIEventHandler {
   @Override
   public TSGetTimeZoneResp getTimeZone(long sessionId) {
     try {
-      ZoneId zoneId = SESSION_MANAGER.getZoneId(sessionId);
+      ZoneId zoneId = getSessionManager().getZoneId(sessionId);
       return new TSGetTimeZoneResp(
           RpcUtils.getStatus(TSStatusCode.SUCCESS_STATUS),
           zoneId != null ? zoneId.toString() : "Unknown time zone");
@@ -1142,7 +1153,7 @@ public class TSServiceImpl implements TSIEventHandler {
   @Override
   public TSStatus setTimeZone(TSSetTimeZoneReq req) {
     try {
-      SESSION_MANAGER.setTimezone(req.sessionId, req.timeZone);
+      getSessionManager().setTimezone(req.sessionId, req.timeZone);
       return RpcUtils.getStatus(TSStatusCode.SUCCESS_STATUS);
     } catch (Exception e) {
       return onNPEOrUnexpectedException(
@@ -1178,14 +1189,14 @@ public class TSServiceImpl implements TSIEventHandler {
 
   @Override
   public TSStatus insertRecords(TSInsertRecordsReq req) {
-    if (!SESSION_MANAGER.checkLogin(req.getSessionId())) {
+    if (!getSessionManager().checkLogin(req.getSessionId())) {
       return getNotLoggedInStatus();
     }
 
     if (AUDIT_LOGGER.isDebugEnabled()) {
       AUDIT_LOGGER.debug(
           "Session {} insertRecords, first device {}, first time {}",
-          SESSION_MANAGER.getCurrSessionId(),
+          getSessionManager().getCurrSessionId(),
           req.prefixPaths.get(0),
           req.getTimestamps().get(0));
     }
@@ -1200,7 +1211,7 @@ public class TSServiceImpl implements TSIEventHandler {
                 req.getMeasurementsList().get(i).toArray(new String[0]),
                 req.valuesList.get(i),
                 req.isAligned);
-        TSStatus status = SESSION_MANAGER.checkAuthority(plan, req.getSessionId());
+        TSStatus status = getSessionManager().checkAuthority(plan, req.getSessionId());
         if (status != null) {
           insertRowsPlan.getResults().put(i, status);
           allCheckSuccess = false;
@@ -1250,14 +1261,14 @@ public class TSServiceImpl implements TSIEventHandler {
 
   @Override
   public TSStatus insertRecordsOfOneDevice(TSInsertRecordsOfOneDeviceReq req) {
-    if (!SESSION_MANAGER.checkLogin(req.getSessionId())) {
+    if (!getSessionManager().checkLogin(req.getSessionId())) {
       return getNotLoggedInStatus();
     }
 
     if (AUDIT_LOGGER.isDebugEnabled()) {
       AUDIT_LOGGER.debug(
           "Session {} insertRecords, device {}, first time {}",
-          SESSION_MANAGER.getCurrSessionId(),
+          getSessionManager().getCurrSessionId(),
           req.prefixPath,
           req.getTimestamps().get(0));
     }
@@ -1271,7 +1282,7 @@ public class TSServiceImpl implements TSIEventHandler {
               req.getMeasurementsList(),
               req.getValuesList(),
               req.isAligned);
-      TSStatus status = SESSION_MANAGER.checkAuthority(plan, req.getSessionId());
+      TSStatus status = getSessionManager().checkAuthority(plan, req.getSessionId());
       statusList.add(status != null ? status : executeNonQueryPlan(plan));
     } catch (IoTDBException e) {
       statusList.add(
@@ -1296,14 +1307,14 @@ public class TSServiceImpl implements TSIEventHandler {
 
   @Override
   public TSStatus insertStringRecordsOfOneDevice(TSInsertStringRecordsOfOneDeviceReq req) {
-    if (!SESSION_MANAGER.checkLogin(req.getSessionId())) {
+    if (!getSessionManager().checkLogin(req.getSessionId())) {
       return getNotLoggedInStatus();
     }
 
     if (AUDIT_LOGGER.isDebugEnabled()) {
       AUDIT_LOGGER.debug(
           "Session {} insertRecords, device {}, first time {}",
-          SESSION_MANAGER.getCurrSessionId(),
+          getSessionManager().getCurrSessionId(),
           req.prefixPath,
           req.getTimestamps().get(0));
     }
@@ -1319,7 +1330,7 @@ public class TSServiceImpl implements TSIEventHandler {
         plan.setDataTypes(new TSDataType[plan.getMeasurements().length]);
         plan.setNeedInferType(true);
         plan.setAligned(req.isAligned);
-        TSStatus status = SESSION_MANAGER.checkAuthority(plan, req.getSessionId());
+        TSStatus status = getSessionManager().checkAuthority(plan, req.getSessionId());
 
         if (status != null) {
           insertRowsPlan.getResults().put(i, status);
@@ -1354,14 +1365,14 @@ public class TSServiceImpl implements TSIEventHandler {
 
   @Override
   public TSStatus insertStringRecords(TSInsertStringRecordsReq req) {
-    if (!SESSION_MANAGER.checkLogin(req.getSessionId())) {
+    if (!getSessionManager().checkLogin(req.getSessionId())) {
       return getNotLoggedInStatus();
     }
 
     if (AUDIT_LOGGER.isDebugEnabled()) {
       AUDIT_LOGGER.debug(
           "Session {} insertRecords, first device {}, first time {}",
-          SESSION_MANAGER.getCurrSessionId(),
+          getSessionManager().getCurrSessionId(),
           req.prefixPaths.get(0),
           req.getTimestamps().get(0));
     }
@@ -1377,7 +1388,7 @@ public class TSServiceImpl implements TSIEventHandler {
         plan.setDataTypes(new TSDataType[plan.getMeasurements().length]);
         plan.setNeedInferType(true);
         plan.setAligned(req.isAligned);
-        TSStatus status = SESSION_MANAGER.checkAuthority(plan, req.getSessionId());
+        TSStatus status = getSessionManager().checkAuthority(plan, req.getSessionId());
 
         if (status != null) {
           insertRowsPlan.getResults().put(i, status);
@@ -1468,13 +1479,13 @@ public class TSServiceImpl implements TSIEventHandler {
   @Override
   public TSStatus insertRecord(TSInsertRecordReq req) {
     try {
-      if (!SESSION_MANAGER.checkLogin(req.getSessionId())) {
+      if (!getSessionManager().checkLogin(req.getSessionId())) {
         return getNotLoggedInStatus();
       }
 
       AUDIT_LOGGER.debug(
           "Session {} insertRecord, device {}, time {}",
-          SESSION_MANAGER.getCurrSessionId(),
+          getSessionManager().getCurrSessionId(),
           req.getPrefixPath(),
           req.getTimestamp());
 
@@ -1485,7 +1496,7 @@ public class TSServiceImpl implements TSIEventHandler {
               req.getMeasurements().toArray(new String[0]),
               req.values,
               req.isAligned);
-      TSStatus status = SESSION_MANAGER.checkAuthority(plan, req.getSessionId());
+      TSStatus status = getSessionManager().checkAuthority(plan, req.getSessionId());
       return status != null ? status : executeNonQueryPlan(plan);
     } catch (IoTDBException e) {
       return onIoTDBException(e, OperationType.INSERT_RECORD, e.getErrorCode());
@@ -1498,13 +1509,13 @@ public class TSServiceImpl implements TSIEventHandler {
   @Override
   public TSStatus insertStringRecord(TSInsertStringRecordReq req) {
     try {
-      if (!SESSION_MANAGER.checkLogin(req.getSessionId())) {
+      if (!getSessionManager().checkLogin(req.getSessionId())) {
         return getNotLoggedInStatus();
       }
 
       AUDIT_LOGGER.debug(
           "Session {} insertRecord, device {}, time {}",
-          SESSION_MANAGER.getCurrSessionId(),
+          getSessionManager().getCurrSessionId(),
           req.getPrefixPath(),
           req.getTimestamp());
 
@@ -1516,7 +1527,7 @@ public class TSServiceImpl implements TSIEventHandler {
       plan.setValues(req.getValues().toArray(new Object[0]));
       plan.setNeedInferType(true);
       plan.setAligned(req.isAligned);
-      TSStatus status = SESSION_MANAGER.checkAuthority(plan, req.getSessionId());
+      TSStatus status = getSessionManager().checkAuthority(plan, req.getSessionId());
       return status != null ? status : executeNonQueryPlan(plan);
     } catch (IoTDBException e) {
       return onIoTDBException(e, OperationType.INSERT_STRING_RECORD, e.getErrorCode());
@@ -1529,7 +1540,7 @@ public class TSServiceImpl implements TSIEventHandler {
   @Override
   public TSStatus deleteData(TSDeleteDataReq req) {
     try {
-      if (!SESSION_MANAGER.checkLogin(req.getSessionId())) {
+      if (!getSessionManager().checkLogin(req.getSessionId())) {
         return getNotLoggedInStatus();
       }
 
@@ -1541,7 +1552,7 @@ public class TSServiceImpl implements TSIEventHandler {
         paths.add(new PartialPath(path));
       }
       plan.addPaths(paths);
-      TSStatus status = SESSION_MANAGER.checkAuthority(plan, req.getSessionId());
+      TSStatus status = getSessionManager().checkAuthority(plan, req.getSessionId());
 
       return status != null ? new TSStatus(status) : new TSStatus(executeNonQueryPlan(plan));
     } catch (IoTDBException e) {
@@ -1556,7 +1567,7 @@ public class TSServiceImpl implements TSIEventHandler {
   public TSStatus insertTablet(TSInsertTabletReq req) {
     long t1 = System.currentTimeMillis();
     try {
-      if (!SESSION_MANAGER.checkLogin(req.getSessionId())) {
+      if (!getSessionManager().checkLogin(req.getSessionId())) {
         return getNotLoggedInStatus();
       }
 
@@ -1571,7 +1582,7 @@ public class TSServiceImpl implements TSIEventHandler {
       insertTabletPlan.setRowCount(req.size);
       insertTabletPlan.setDataTypes(req.types);
       insertTabletPlan.setAligned(req.isAligned);
-      TSStatus status = SESSION_MANAGER.checkAuthority(insertTabletPlan, req.getSessionId());
+      TSStatus status = getSessionManager().checkAuthority(insertTabletPlan, req.getSessionId());
 
       return status != null ? status : executeNonQueryPlan(insertTabletPlan);
     } catch (IoTDBException e) {
@@ -1588,7 +1599,7 @@ public class TSServiceImpl implements TSIEventHandler {
   public TSStatus insertTablets(TSInsertTabletsReq req) {
     long t1 = System.currentTimeMillis();
     try {
-      if (!SESSION_MANAGER.checkLogin(req.getSessionId())) {
+      if (!getSessionManager().checkLogin(req.getSessionId())) {
         return getNotLoggedInStatus();
       }
 
@@ -1633,7 +1644,7 @@ public class TSServiceImpl implements TSIEventHandler {
     InsertMultiTabletsPlan insertMultiTabletsPlan = new InsertMultiTabletsPlan();
     for (int i = 0; i < req.prefixPaths.size(); i++) {
       InsertTabletPlan insertTabletPlan = constructInsertTabletPlan(req, i);
-      TSStatus status = SESSION_MANAGER.checkAuthority(insertTabletPlan, req.getSessionId());
+      TSStatus status = getSessionManager().checkAuthority(insertTabletPlan, req.getSessionId());
       if (status != null) {
         // not authorized
         insertMultiTabletsPlan.getResults().put(i, status);
@@ -1648,12 +1659,12 @@ public class TSServiceImpl implements TSIEventHandler {
   @Override
   public TSStatus setStorageGroup(long sessionId, String storageGroup) {
     try {
-      if (!SESSION_MANAGER.checkLogin(sessionId)) {
+      if (!getSessionManager().checkLogin(sessionId)) {
         return getNotLoggedInStatus();
       }
 
       SetStorageGroupPlan plan = new SetStorageGroupPlan(new PartialPath(storageGroup));
-      TSStatus status = SESSION_MANAGER.checkAuthority(plan, sessionId);
+      TSStatus status = getSessionManager().checkAuthority(plan, sessionId);
 
       return status != null ? status : executeNonQueryPlan(plan);
     } catch (IoTDBException e) {
@@ -1667,7 +1678,7 @@ public class TSServiceImpl implements TSIEventHandler {
   @Override
   public TSStatus deleteStorageGroups(long sessionId, List<String> storageGroups) {
     try {
-      if (!SESSION_MANAGER.checkLogin(sessionId)) {
+      if (!getSessionManager().checkLogin(sessionId)) {
         return getNotLoggedInStatus();
       }
 
@@ -1676,7 +1687,7 @@ public class TSServiceImpl implements TSIEventHandler {
         storageGroupList.add(new PartialPath(storageGroup));
       }
       DeleteStorageGroupPlan plan = new DeleteStorageGroupPlan(storageGroupList);
-      TSStatus status = SESSION_MANAGER.checkAuthority(plan, sessionId);
+      TSStatus status = getSessionManager().checkAuthority(plan, sessionId);
       return status != null ? status : executeNonQueryPlan(plan);
     } catch (IoTDBException e) {
       return onIoTDBException(e, OperationType.DELETE_STORAGE_GROUPS, e.getErrorCode());
@@ -1689,13 +1700,15 @@ public class TSServiceImpl implements TSIEventHandler {
   @Override
   public TSStatus createTimeseries(TSCreateTimeseriesReq req) {
     try {
-      if (!SESSION_MANAGER.checkLogin(req.getSessionId())) {
+      if (!getSessionManager().checkLogin(req.getSessionId())) {
         return getNotLoggedInStatus();
       }
 
       if (AUDIT_LOGGER.isDebugEnabled()) {
         AUDIT_LOGGER.debug(
-            "Session-{} create timeseries {}", SESSION_MANAGER.getCurrSessionId(), req.getPath());
+            "Session-{} create timeseries {}",
+            getSessionManager().getCurrSessionId(),
+            req.getPath());
       }
 
       CreateTimeSeriesPlan plan =
@@ -1708,7 +1721,7 @@ public class TSServiceImpl implements TSIEventHandler {
               req.tags,
               req.attributes,
               req.measurementAlias);
-      TSStatus status = SESSION_MANAGER.checkAuthority(plan, req.getSessionId());
+      TSStatus status = getSessionManager().checkAuthority(plan, req.getSessionId());
       return status != null ? status : executeNonQueryPlan(plan);
     } catch (IoTDBException e) {
       return onIoTDBException(e, OperationType.CREATE_TIMESERIES, e.getErrorCode());
@@ -1721,7 +1734,7 @@ public class TSServiceImpl implements TSIEventHandler {
   @Override
   public TSStatus createAlignedTimeseries(TSCreateAlignedTimeseriesReq req) {
     try {
-      if (!SESSION_MANAGER.checkLogin(req.getSessionId())) {
+      if (!getSessionManager().checkLogin(req.getSessionId())) {
         return getNotLoggedInStatus();
       }
 
@@ -1739,7 +1752,7 @@ public class TSServiceImpl implements TSIEventHandler {
       if (AUDIT_LOGGER.isDebugEnabled()) {
         AUDIT_LOGGER.debug(
             "Session-{} create aligned timeseries {}.{}",
-            SESSION_MANAGER.getCurrSessionId(),
+            getSessionManager().getCurrSessionId(),
             req.getPrefixPath(),
             req.getMeasurements());
       }
@@ -1767,7 +1780,7 @@ public class TSServiceImpl implements TSIEventHandler {
               req.measurementAlias,
               req.tagsList,
               req.attributesList);
-      TSStatus status = SESSION_MANAGER.checkAuthority(plan, req.getSessionId());
+      TSStatus status = getSessionManager().checkAuthority(plan, req.getSessionId());
       return status != null ? status : executeNonQueryPlan(plan);
     } catch (IoTDBException e) {
       return onIoTDBException(e, OperationType.CREATE_ALIGNED_TIMESERIES, e.getErrorCode());
@@ -1781,14 +1794,14 @@ public class TSServiceImpl implements TSIEventHandler {
   @Override
   public TSStatus createMultiTimeseries(TSCreateMultiTimeseriesReq req) {
     try {
-      if (!SESSION_MANAGER.checkLogin(req.getSessionId())) {
+      if (!getSessionManager().checkLogin(req.getSessionId())) {
         return getNotLoggedInStatus();
       }
 
       if (AUDIT_LOGGER.isDebugEnabled()) {
         AUDIT_LOGGER.debug(
             "Session-{} create {} timeseries, the first is {}",
-            SESSION_MANAGER.getCurrSessionId(),
+            getSessionManager().getCurrSessionId(),
             req.getPaths().size(),
             req.getPaths().get(0));
       }
@@ -1819,7 +1832,7 @@ public class TSServiceImpl implements TSIEventHandler {
       CreateTimeSeriesPlan plan = new CreateTimeSeriesPlan();
       for (int i = 0; i < req.paths.size(); i++) {
         plan.setPath(new PartialPath(req.paths.get(i)));
-        TSStatus status = SESSION_MANAGER.checkAuthority(plan, req.getSessionId());
+        TSStatus status = getSessionManager().checkAuthority(plan, req.getSessionId());
         if (status != null) {
           // not authorized
           multiPlan.getResults().put(i, status);
@@ -1868,7 +1881,7 @@ public class TSServiceImpl implements TSIEventHandler {
   @Override
   public TSStatus deleteTimeseries(long sessionId, List<String> paths) {
     try {
-      if (!SESSION_MANAGER.checkLogin(sessionId)) {
+      if (!getSessionManager().checkLogin(sessionId)) {
         return getNotLoggedInStatus();
       }
 
@@ -1877,7 +1890,7 @@ public class TSServiceImpl implements TSIEventHandler {
         pathList.add(new PartialPath(path));
       }
       DeleteTimeSeriesPlan plan = new DeleteTimeSeriesPlan(pathList);
-      TSStatus status = SESSION_MANAGER.checkAuthority(plan, sessionId);
+      TSStatus status = getSessionManager().checkAuthority(plan, sessionId);
       return status != null ? status : executeNonQueryPlan(plan);
     } catch (IoTDBException e) {
       return onIoTDBException(e, OperationType.DELETE_TIMESERIES, e.getErrorCode());
@@ -1889,20 +1902,20 @@ public class TSServiceImpl implements TSIEventHandler {
 
   @Override
   public long requestStatementId(long sessionId) {
-    return SESSION_MANAGER.requestStatementId(sessionId);
+    return getSessionManager().requestStatementId(sessionId);
   }
 
   @Override
   public TSStatus createSchemaTemplate(TSCreateSchemaTemplateReq req) throws TException {
     try {
-      if (!SESSION_MANAGER.checkLogin(req.getSessionId())) {
+      if (!getSessionManager().checkLogin(req.getSessionId())) {
         return getNotLoggedInStatus();
       }
 
       if (AUDIT_LOGGER.isDebugEnabled()) {
         AUDIT_LOGGER.debug(
             "Session-{} create schema template {}",
-            SESSION_MANAGER.getCurrSessionId(),
+            getSessionManager().getCurrSessionId(),
             req.getName());
       }
 
@@ -1910,7 +1923,7 @@ public class TSServiceImpl implements TSIEventHandler {
       // Construct plan from serialized request
       ByteBuffer buffer = ByteBuffer.wrap(req.getSerializedTemplate());
       plan = CreateTemplatePlan.deserializeFromReq(buffer);
-      TSStatus status = SESSION_MANAGER.checkAuthority(plan, req.getSessionId());
+      TSStatus status = getSessionManager().checkAuthority(plan, req.getSessionId());
 
       return status != null ? status : executeNonQueryPlan(plan);
     } catch (Exception e) {
@@ -1937,7 +1950,7 @@ public class TSServiceImpl implements TSIEventHandler {
     AppendTemplatePlan plan =
         new AppendTemplatePlan(
             req.getName(), req.isAligned, measurements, dataTypes, encodings, compressionTypes);
-    TSStatus status = SESSION_MANAGER.checkAuthority(plan, req.getSessionId());
+    TSStatus status = getSessionManager().checkAuthority(plan, req.getSessionId());
     return status != null ? status : executeNonQueryPlan(plan);
   }
 
@@ -1945,7 +1958,7 @@ public class TSServiceImpl implements TSIEventHandler {
   public TSStatus pruneSchemaTemplate(TSPruneSchemaTemplateReq req) {
     PruneTemplatePlan plan =
         new PruneTemplatePlan(req.getName(), Collections.singletonList(req.getPath()));
-    TSStatus status = SESSION_MANAGER.checkAuthority(plan, req.getSessionId());
+    TSStatus status = getSessionManager().checkAuthority(plan, req.getSessionId());
     return status != null ? status : executeNonQueryPlan(plan);
   }
 
@@ -1999,21 +2012,21 @@ public class TSServiceImpl implements TSIEventHandler {
 
   @Override
   public TSStatus setSchemaTemplate(TSSetSchemaTemplateReq req) throws TException {
-    if (!SESSION_MANAGER.checkLogin(req.getSessionId())) {
+    if (!getSessionManager().checkLogin(req.getSessionId())) {
       return getNotLoggedInStatus();
     }
 
     if (AUDIT_LOGGER.isDebugEnabled()) {
       AUDIT_LOGGER.debug(
           "Session-{} set device template {}.{}",
-          SESSION_MANAGER.getCurrSessionId(),
+          getSessionManager().getCurrSessionId(),
           req.getTemplateName(),
           req.getPrefixPath());
     }
 
     try {
       SetTemplatePlan plan = new SetTemplatePlan(req.templateName, req.prefixPath);
-      TSStatus status = SESSION_MANAGER.checkAuthority(plan, req.getSessionId());
+      TSStatus status = getSessionManager().checkAuthority(plan, req.getSessionId());
       return status != null ? status : executeNonQueryPlan(plan);
     } catch (IllegalPathException e) {
       return onIoTDBException(e, OperationType.EXECUTE_STATEMENT, e.getErrorCode());
@@ -2022,21 +2035,21 @@ public class TSServiceImpl implements TSIEventHandler {
 
   @Override
   public TSStatus unsetSchemaTemplate(TSUnsetSchemaTemplateReq req) throws TException {
-    if (!SESSION_MANAGER.checkLogin(req.getSessionId())) {
+    if (!getSessionManager().checkLogin(req.getSessionId())) {
       return getNotLoggedInStatus();
     }
 
     if (AUDIT_LOGGER.isDebugEnabled()) {
       AUDIT_LOGGER.debug(
           "Session-{} unset schema template {}.{}",
-          SESSION_MANAGER.getCurrSessionId(),
+          getSessionManager().getCurrSessionId(),
           req.getPrefixPath(),
           req.getTemplateName());
     }
 
     try {
       UnsetTemplatePlan plan = new UnsetTemplatePlan(req.prefixPath, req.templateName);
-      TSStatus status = SESSION_MANAGER.checkAuthority(plan, req.getSessionId());
+      TSStatus status = getSessionManager().checkAuthority(plan, req.getSessionId());
       return status != null ? status : executeNonQueryPlan(plan);
     } catch (IllegalPathException e) {
       return onIoTDBException(e, OperationType.EXECUTE_STATEMENT, e.getErrorCode());
@@ -2045,19 +2058,19 @@ public class TSServiceImpl implements TSIEventHandler {
 
   @Override
   public TSStatus dropSchemaTemplate(TSDropSchemaTemplateReq req) throws TException {
-    if (!SESSION_MANAGER.checkLogin(req.getSessionId())) {
+    if (!getSessionManager().checkLogin(req.getSessionId())) {
       return getNotLoggedInStatus();
     }
 
     if (AUDIT_LOGGER.isDebugEnabled()) {
       AUDIT_LOGGER.debug(
           "Session-{} drop schema template {}.",
-          SESSION_MANAGER.getCurrSessionId(),
+          getSessionManager().getCurrSessionId(),
           req.getTemplateName());
     }
 
     DropTemplatePlan plan = new DropTemplatePlan(req.templateName);
-    TSStatus status = SESSION_MANAGER.checkAuthority(plan, req.getSessionId());
+    TSStatus status = getSessionManager().checkAuthority(plan, req.getSessionId());
     return status != null ? status : executeNonQueryPlan(plan);
   }