You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ja...@apache.org on 2022/11/02 07:17:11 UTC

[iotdb] 02/02: Save one rpc call

This is an automated email from the ASF dual-hosted git repository.

jackietien pushed a commit to branch IOTDB-3773
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit bc31698ac8116b4e0312e0fa6340cfb01d4e609e
Author: JackieTien97 <ja...@gmail.com>
AuthorDate: Wed Nov 2 15:16:46 2022 +0800

    Save one rpc call
---
 .../apache/iotdb/jdbc/IoTDBDatabaseMetadata.java   |  72 +++++++-------
 .../org/apache/iotdb/jdbc/IoTDBJDBCResultSet.java  |  12 +--
 .../java/org/apache/iotdb/jdbc/IoTDBStatement.java |   5 +-
 .../service/thrift/impl/ClientRPCServiceImpl.java  | 105 +++++++++++++++------
 .../apache/iotdb/db/utils/QueryDataSetUtils.java   |  14 ++-
 .../java/org/apache/iotdb/rpc/IoTDBRpcDataSet.java |  15 +--
 .../apache/iotdb/session/SessionConnection.java    |   9 +-
 .../org/apache/iotdb/session/SessionDataSet.java   |  10 +-
 thrift/src/main/thrift/client.thrift               |   2 +
 9 files changed, 154 insertions(+), 90 deletions(-)

diff --git a/jdbc/src/main/java/org/apache/iotdb/jdbc/IoTDBDatabaseMetadata.java b/jdbc/src/main/java/org/apache/iotdb/jdbc/IoTDBDatabaseMetadata.java
index f900c32c99..2977eb60dc 100644
--- a/jdbc/src/main/java/org/apache/iotdb/jdbc/IoTDBDatabaseMetadata.java
+++ b/jdbc/src/main/java/org/apache/iotdb/jdbc/IoTDBDatabaseMetadata.java
@@ -606,12 +606,12 @@ public class IoTDBDatabaseMetadata implements DatabaseMetaData {
         false,
         client,
         null,
-        0,
+        -1,
         sessionId,
         null,
         null,
         (long) 60 * 1000,
-        true);
+        false);
   }
 
   @Override
@@ -651,12 +651,12 @@ public class IoTDBDatabaseMetadata implements DatabaseMetaData {
         false,
         client,
         null,
-        0,
+        -1,
         sessionId,
         null,
         null,
         (long) 60 * 1000,
-        true);
+        false);
   }
 
   @Override
@@ -706,7 +706,7 @@ public class IoTDBDatabaseMetadata implements DatabaseMetaData {
         true,
         client,
         null,
-        0,
+        -1,
         sessionId,
         Collections.singletonList(tsBlock),
         null,
@@ -952,7 +952,7 @@ public class IoTDBDatabaseMetadata implements DatabaseMetaData {
         true,
         client,
         null,
-        0,
+        -1,
         sessionId,
         Collections.singletonList(tsBlock),
         null,
@@ -1061,7 +1061,7 @@ public class IoTDBDatabaseMetadata implements DatabaseMetaData {
         true,
         client,
         null,
-        0,
+        -1,
         sessionId,
         Collections.singletonList(tsBlock),
         null,
@@ -1116,12 +1116,12 @@ public class IoTDBDatabaseMetadata implements DatabaseMetaData {
         false,
         client,
         null,
-        0,
+        -1,
         sessionId,
         null,
         null,
         (long) 60 * 1000,
-        true);
+        false);
   }
 
   @Override
@@ -1230,12 +1230,12 @@ public class IoTDBDatabaseMetadata implements DatabaseMetaData {
         false,
         client,
         null,
-        0,
+        -1,
         sessionId,
         null,
         null,
         (long) 60 * 1000,
-        true);
+        false);
   }
 
   @Override
@@ -1330,7 +1330,7 @@ public class IoTDBDatabaseMetadata implements DatabaseMetaData {
         true,
         client,
         null,
-        0,
+        -1,
         sessionId,
         Collections.singletonList(tsBlock),
         null,
@@ -1399,7 +1399,7 @@ public class IoTDBDatabaseMetadata implements DatabaseMetaData {
         true,
         client,
         null,
-        0,
+        -1,
         sessionId,
         Collections.singletonList(tsBlock),
         null,
@@ -1452,12 +1452,12 @@ public class IoTDBDatabaseMetadata implements DatabaseMetaData {
         false,
         client,
         null,
-        0,
+        -1,
         sessionId,
         null,
         null,
         (long) 60 * 1000,
-        true);
+        false);
   }
 
   @Override
@@ -1501,12 +1501,12 @@ public class IoTDBDatabaseMetadata implements DatabaseMetaData {
         false,
         client,
         null,
-        0,
+        -1,
         sessionId,
         null,
         null,
         (long) 60 * 1000,
-        true);
+        false);
   }
 
   @Override
@@ -1705,7 +1705,7 @@ public class IoTDBDatabaseMetadata implements DatabaseMetaData {
         true,
         client,
         null,
-        0,
+        -1,
         sessionId,
         Collections.singletonList(tsBlock),
         null,
@@ -1761,12 +1761,12 @@ public class IoTDBDatabaseMetadata implements DatabaseMetaData {
         false,
         client,
         null,
-        0,
+        -1,
         sessionId,
         null,
         null,
         (long) 60 * 1000,
-        true);
+        false);
   }
 
   @Override
@@ -1806,12 +1806,12 @@ public class IoTDBDatabaseMetadata implements DatabaseMetaData {
         false,
         client,
         null,
-        0,
+        -1,
         sessionId,
         null,
         null,
         (long) 60 * 1000,
-        true);
+        false);
   }
 
   @Override
@@ -1952,7 +1952,7 @@ public class IoTDBDatabaseMetadata implements DatabaseMetaData {
         true,
         client,
         null,
-        0,
+        -1,
         sessionId,
         Collections.singletonList(tsBlock),
         null,
@@ -2006,12 +2006,12 @@ public class IoTDBDatabaseMetadata implements DatabaseMetaData {
         false,
         client,
         null,
-        0,
+        -1,
         sessionId,
         null,
         null,
         (long) 60 * 1000,
-        true);
+        false);
   }
 
   @Override
@@ -2047,12 +2047,12 @@ public class IoTDBDatabaseMetadata implements DatabaseMetaData {
         false,
         client,
         null,
-        0,
+        -1,
         sessionId,
         null,
         null,
         (long) 60 * 1000,
-        true);
+        false);
   }
 
   @Override
@@ -2171,7 +2171,7 @@ public class IoTDBDatabaseMetadata implements DatabaseMetaData {
         true,
         client,
         null,
-        0,
+        -1,
         sessionId,
         Collections.singletonList(tsBlock),
         null,
@@ -2211,7 +2211,7 @@ public class IoTDBDatabaseMetadata implements DatabaseMetaData {
         true,
         client,
         null,
-        0,
+        -1,
         sessionId,
         Collections.singletonList(tsBlock),
         null,
@@ -2383,7 +2383,7 @@ public class IoTDBDatabaseMetadata implements DatabaseMetaData {
         true,
         client,
         null,
-        0,
+        -1,
         sessionId,
         Collections.singletonList(tsBlock),
         null,
@@ -2560,7 +2560,7 @@ public class IoTDBDatabaseMetadata implements DatabaseMetaData {
         true,
         client,
         null,
-        0,
+        -1,
         sessionId,
         Collections.singletonList(tsBlock),
         null,
@@ -2763,7 +2763,7 @@ public class IoTDBDatabaseMetadata implements DatabaseMetaData {
         true,
         client,
         null,
-        0,
+        -1,
         sessionId,
         Collections.singletonList(tsBlock),
         null,
@@ -2806,12 +2806,12 @@ public class IoTDBDatabaseMetadata implements DatabaseMetaData {
         false,
         client,
         null,
-        0,
+        -1,
         sessionId,
         null,
         null,
         (long) 60 * 1000,
-        true);
+        false);
   }
 
   @Override
@@ -2860,12 +2860,12 @@ public class IoTDBDatabaseMetadata implements DatabaseMetaData {
         false,
         client,
         null,
-        0,
+        -1,
         sessionId,
         null,
         null,
         (long) 60 * 1000,
-        true);
+        false);
   }
 
   @Override
diff --git a/jdbc/src/main/java/org/apache/iotdb/jdbc/IoTDBJDBCResultSet.java b/jdbc/src/main/java/org/apache/iotdb/jdbc/IoTDBJDBCResultSet.java
index 1222ad128e..8fc39b31e3 100644
--- a/jdbc/src/main/java/org/apache/iotdb/jdbc/IoTDBJDBCResultSet.java
+++ b/jdbc/src/main/java/org/apache/iotdb/jdbc/IoTDBJDBCResultSet.java
@@ -60,8 +60,6 @@ public class IoTDBJDBCResultSet implements ResultSet {
   protected List<String> columnTypeList;
   protected IoTDBRpcDataSet ioTDBRpcDataSet;
   protected IoTDBTracingInfo ioTDBRpcTracingInfo;
-  private boolean isRpcFetchResult = true;
-
   private String operationType = "";
   private List<String> columns = null;
   private List<String> sgColumns = null;
@@ -82,7 +80,8 @@ public class IoTDBJDBCResultSet implements ResultSet {
       String operationType,
       List<String> columns,
       List<String> sgColumns,
-      BitSet aliasColumnMap)
+      BitSet aliasColumnMap,
+      boolean moreData)
       throws SQLException {
     this.ioTDBRpcDataSet =
         new IoTDBRpcDataSet(
@@ -91,7 +90,7 @@ public class IoTDBJDBCResultSet implements ResultSet {
             columnTypeList,
             columnNameIndex,
             ignoreTimeStamp,
-            true,
+            moreData,
             queryId,
             ((IoTDBStatement) statement).getStmtId(),
             client,
@@ -125,7 +124,7 @@ public class IoTDBJDBCResultSet implements ResultSet {
       List<ByteBuffer> dataSet,
       TSTracingInfo tracingInfo,
       long timeout,
-      boolean isRpcFetchResult)
+      boolean moreData)
       throws SQLException {
     this.ioTDBRpcDataSet =
         new IoTDBRpcDataSet(
@@ -134,7 +133,7 @@ public class IoTDBJDBCResultSet implements ResultSet {
             columnTypeList,
             columnNameIndex,
             ignoreTimeStamp,
-            isRpcFetchResult,
+            moreData,
             queryId,
             ((IoTDBStatement) statement).getStmtId(),
             client,
@@ -144,7 +143,6 @@ public class IoTDBJDBCResultSet implements ResultSet {
             timeout);
     this.statement = statement;
     this.columnTypeList = columnTypeList;
-    this.isRpcFetchResult = isRpcFetchResult;
     if (tracingInfo != null) {
       ioTDBRpcTracingInfo = new IoTDBTracingInfo();
       ioTDBRpcTracingInfo.setTsTracingInfo(tracingInfo);
diff --git a/jdbc/src/main/java/org/apache/iotdb/jdbc/IoTDBStatement.java b/jdbc/src/main/java/org/apache/iotdb/jdbc/IoTDBStatement.java
index 3fb925e46b..b3633436ef 100644
--- a/jdbc/src/main/java/org/apache/iotdb/jdbc/IoTDBStatement.java
+++ b/jdbc/src/main/java/org/apache/iotdb/jdbc/IoTDBStatement.java
@@ -302,7 +302,7 @@ public class IoTDBStatement implements Statement {
                 execResp.queryResult,
                 execResp.tracingInfo,
                 execReq.timeout,
-                true);
+                execResp.moreData);
       }
       return true;
     }
@@ -457,7 +457,8 @@ public class IoTDBStatement implements Statement {
               execResp.operationType,
               execResp.columns,
               execResp.sgColumns,
-              aliasColumn);
+              aliasColumn,
+              execResp.moreData);
     }
     return resultSet;
   }
diff --git a/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/ClientRPCServiceImpl.java b/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/ClientRPCServiceImpl.java
index 0b8526bad2..2efdf6a85a 100644
--- a/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/ClientRPCServiceImpl.java
+++ b/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/ClientRPCServiceImpl.java
@@ -112,6 +112,7 @@ import org.apache.iotdb.service.rpc.thrift.TSyncIdentityInfo;
 import org.apache.iotdb.service.rpc.thrift.TSyncTransportMetaInfo;
 import org.apache.iotdb.tsfile.read.common.block.TsBlock;
 import org.apache.iotdb.tsfile.read.common.block.column.Column;
+import org.apache.iotdb.tsfile.utils.Pair;
 
 import org.apache.thrift.TException;
 import org.slf4j.Logger;
@@ -149,19 +150,25 @@ public class ClientRPCServiceImpl implements IClientRPCServiceWithHandler {
 
   @FunctionalInterface
   public interface SelectResult {
-    public void apply(TSExecuteStatementResp resp, IQueryExecution queryExecution, int fetchSize)
+    boolean apply(TSExecuteStatementResp resp, IQueryExecution queryExecution, int fetchSize)
         throws IoTDBException, IOException;
   }
 
   private static final SelectResult SELECT_RESULT =
-      (resp, queryExecution, fetchSize) ->
-          resp.setQueryResult(
-              QueryDataSetUtils.convertQueryResultByFetchSize(queryExecution, fetchSize));
+      (resp, queryExecution, fetchSize) -> {
+        Pair<List<ByteBuffer>, Boolean> pair =
+            QueryDataSetUtils.convertQueryResultByFetchSize(queryExecution, fetchSize);
+        resp.setQueryResult(pair.left);
+        return pair.right;
+      };
 
   private static final SelectResult OLD_SELECT_RESULT =
-      (resp, queryExecution, fetchSize) ->
-          resp.setQueryDataSet(
-              QueryDataSetUtils.convertTsBlockByFetchSize(queryExecution, fetchSize));
+      (resp, queryExecution, fetchSize) -> {
+        Pair<TSQueryDataSet, Boolean> pair =
+            QueryDataSetUtils.convertTsBlockByFetchSize(queryExecution, fetchSize);
+        resp.setQueryDataSet(pair.left);
+        return pair.right;
+      };
 
   public ClientRPCServiceImpl() {
     if (config.isClusterMode()) {
@@ -175,6 +182,8 @@ public class ClientRPCServiceImpl implements IClientRPCServiceWithHandler {
 
   private TSExecuteStatementResp executeStatementInternal(
       TSExecuteStatementReq req, SelectResult setResult) {
+    boolean finished = false;
+    long queryId = Long.MIN_VALUE;
     String statement = req.getStatement();
     if (!SESSION_MANAGER.checkLogin(SESSION_MANAGER.getCurrSession())) {
       return RpcUtils.getTSExecuteStatementResp(getNotLoggedInStatus());
@@ -200,8 +209,7 @@ public class ClientRPCServiceImpl implements IClientRPCServiceWithHandler {
       QUERY_FREQUENCY_RECORDER.incrementAndGet();
       AUDIT_LOGGER.debug("Session {} execute Query: {}", req.sessionId, statement);
 
-      long queryId =
-          SESSION_MANAGER.requestQueryId(SESSION_MANAGER.getCurrSession(), req.statementId);
+      queryId = SESSION_MANAGER.requestQueryId(SESSION_MANAGER.getCurrSession(), req.statementId);
       // create and cache dataset
       ExecutionResult result =
           COORDINATOR.execute(
@@ -225,22 +233,29 @@ public class ClientRPCServiceImpl implements IClientRPCServiceWithHandler {
         if (queryExecution != null && queryExecution.isQuery()) {
           resp = createResponse(queryExecution.getDatasetHeader(), queryId);
           resp.setStatus(result.status);
-          setResult.apply(resp, queryExecution, req.fetchSize);
+          finished = setResult.apply(resp, queryExecution, req.fetchSize);
+          resp.setMoreData(!finished);
         } else {
           resp = RpcUtils.getTSExecuteStatementResp(result.status);
         }
         return resp;
       }
     } catch (Exception e) {
+      finished = true;
       return RpcUtils.getTSExecuteStatementResp(
           onQueryException(e, "\"" + statement + "\". " + OperationType.EXECUTE_STATEMENT));
     } finally {
       addOperationLatency(Operation.EXECUTE_QUERY, startTime);
+      if (finished) {
+        COORDINATOR.cleanupQueryExecution(queryId);
+      }
     }
   }
 
   private TSExecuteStatementResp executeRawDataQueryInternal(
       TSRawDataQueryReq req, SelectResult setResult) {
+    boolean finished = false;
+    long queryId = Long.MIN_VALUE;
     if (!SESSION_MANAGER.checkLogin(SESSION_MANAGER.getCurrSession())) {
       return RpcUtils.getTSExecuteStatementResp(getNotLoggedInStatus());
     }
@@ -257,8 +272,7 @@ public class ClientRPCServiceImpl implements IClientRPCServiceWithHandler {
 
       QUERY_FREQUENCY_RECORDER.incrementAndGet();
       AUDIT_LOGGER.debug("Session {} execute Raw Data Query: {}", req.sessionId, req);
-      long queryId =
-          SESSION_MANAGER.requestQueryId(SESSION_MANAGER.getCurrSession(), req.statementId);
+      queryId = SESSION_MANAGER.requestQueryId(SESSION_MANAGER.getCurrSession(), req.statementId);
       // create and cache dataset
       ExecutionResult result =
           COORDINATOR.execute(
@@ -281,23 +295,29 @@ public class ClientRPCServiceImpl implements IClientRPCServiceWithHandler {
         if (queryExecution.isQuery()) {
           resp = createResponse(queryExecution.getDatasetHeader(), queryId);
           resp.setStatus(result.status);
-          setResult.apply(resp, queryExecution, req.fetchSize);
+          finished = setResult.apply(resp, queryExecution, req.fetchSize);
+          resp.setMoreData(!finished);
         } else {
           resp = RpcUtils.getTSExecuteStatementResp(result.status);
         }
         return resp;
       }
     } catch (Exception e) {
-      // TODO call the coordinator to release query resource
+      finished = true;
       return RpcUtils.getTSExecuteStatementResp(
           onQueryException(e, "\"" + req + "\". " + OperationType.EXECUTE_RAW_DATA_QUERY));
     } finally {
       addOperationLatency(Operation.EXECUTE_QUERY, startTime);
+      if (finished) {
+        COORDINATOR.cleanupQueryExecution(queryId);
+      }
     }
   }
 
   private TSExecuteStatementResp executeLastDataQueryInternal(
       TSLastDataQueryReq req, SelectResult setResult) {
+    boolean finished = false;
+    long queryId = Long.MIN_VALUE;
     if (!SESSION_MANAGER.checkLogin(SESSION_MANAGER.getCurrSession())) {
       return RpcUtils.getTSExecuteStatementResp(getNotLoggedInStatus());
     }
@@ -312,8 +332,7 @@ public class ClientRPCServiceImpl implements IClientRPCServiceWithHandler {
       }
       QUERY_FREQUENCY_RECORDER.incrementAndGet();
       AUDIT_LOGGER.debug("Session {} execute Last Data Query: {}", req.sessionId, req);
-      long queryId =
-          SESSION_MANAGER.requestQueryId(SESSION_MANAGER.getCurrSession(), req.statementId);
+      queryId = SESSION_MANAGER.requestQueryId(SESSION_MANAGER.getCurrSession(), req.statementId);
       // create and cache dataset
       ExecutionResult result =
           COORDINATOR.execute(
@@ -336,7 +355,8 @@ public class ClientRPCServiceImpl implements IClientRPCServiceWithHandler {
         if (queryExecution.isQuery()) {
           resp = createResponse(queryExecution.getDatasetHeader(), queryId);
           resp.setStatus(result.status);
-          setResult.apply(resp, queryExecution, req.fetchSize);
+          finished = setResult.apply(resp, queryExecution, req.fetchSize);
+          resp.setMoreData(!finished);
         } else {
           resp = RpcUtils.getTSExecuteStatementResp(result.status);
         }
@@ -344,11 +364,14 @@ public class ClientRPCServiceImpl implements IClientRPCServiceWithHandler {
       }
 
     } catch (Exception e) {
-      // TODO call the coordinator to release query resource
+      finished = true;
       return RpcUtils.getTSExecuteStatementResp(
           onQueryException(e, "\"" + req + "\". " + OperationType.EXECUTE_LAST_DATA_QUERY));
     } finally {
       addOperationLatency(Operation.EXECUTE_QUERY, startTime);
+      if (finished) {
+        COORDINATOR.cleanupQueryExecution(queryId);
+      }
     }
   }
 
@@ -379,6 +402,8 @@ public class ClientRPCServiceImpl implements IClientRPCServiceWithHandler {
 
   @Override
   public TSFetchResultsResp fetchResultsV2(TSFetchResultsReq req) {
+    long startTime = System.currentTimeMillis();
+    boolean finished = false;
     try {
       if (!SESSION_MANAGER.checkLogin(SESSION_MANAGER.getCurrSession())) {
         return RpcUtils.getTSFetchResultsResp(getNotLoggedInStatus());
@@ -386,20 +411,33 @@ public class ClientRPCServiceImpl implements IClientRPCServiceWithHandler {
       TSFetchResultsResp resp = RpcUtils.getTSFetchResultsResp(TSStatusCode.SUCCESS_STATUS);
 
       IQueryExecution queryExecution = COORDINATOR.getQueryExecution(req.queryId);
+
+      if (queryExecution == null) {
+        resp.setHasResultSet(false);
+        resp.setMoreData(true);
+        return resp;
+      }
+
       try (SetThreadName queryName = new SetThreadName(queryExecution.getQueryId())) {
-        List<ByteBuffer> result =
+        Pair<List<ByteBuffer>, Boolean> pair =
             QueryDataSetUtils.convertQueryResultByFetchSize(queryExecution, req.fetchSize);
+        List<ByteBuffer> result = pair.left;
+        finished = pair.right;
         boolean hasResultSet = !(result.size() == 0);
         resp.setHasResultSet(hasResultSet);
         resp.setIsAlign(true);
         resp.setQueryResult(result);
-        if (!hasResultSet) {
-          COORDINATOR.cleanupQueryExecution(req.queryId);
-        }
+        resp.setMoreData(!finished);
         return resp;
       }
     } catch (Exception e) {
+      finished = true;
       return RpcUtils.getTSFetchResultsResp(onQueryException(e, OperationType.FETCH_RESULTS));
+    } finally {
+      addOperationLatency(Operation.EXECUTE_QUERY, startTime);
+      if (finished) {
+        COORDINATOR.cleanupQueryExecution(req.queryId);
+      }
     }
   }
 
@@ -857,6 +895,8 @@ public class ClientRPCServiceImpl implements IClientRPCServiceWithHandler {
 
   @Override
   public TSFetchResultsResp fetchResults(TSFetchResultsReq req) {
+    boolean finished = false;
+    long startTime = System.currentTimeMillis();
     try {
       if (!SESSION_MANAGER.checkLogin(SESSION_MANAGER.getCurrSession())) {
         return RpcUtils.getTSFetchResultsResp(getNotLoggedInStatus());
@@ -865,21 +905,32 @@ public class ClientRPCServiceImpl implements IClientRPCServiceWithHandler {
       TSFetchResultsResp resp = RpcUtils.getTSFetchResultsResp(TSStatusCode.SUCCESS_STATUS);
 
       IQueryExecution queryExecution = COORDINATOR.getQueryExecution(req.queryId);
+      if (queryExecution == null) {
+        resp.setHasResultSet(false);
+        resp.setMoreData(true);
+        return resp;
+      }
+
       try (SetThreadName queryName = new SetThreadName(queryExecution.getQueryId())) {
-        TSQueryDataSet result =
+        Pair<TSQueryDataSet, Boolean> pair =
             QueryDataSetUtils.convertTsBlockByFetchSize(queryExecution, req.fetchSize);
+        TSQueryDataSet result = pair.left;
+        finished = pair.right;
         boolean hasResultSet = result.bufferForTime().limit() != 0;
-
         resp.setHasResultSet(hasResultSet);
         resp.setQueryDataSet(result);
         resp.setIsAlign(true);
-        if (!hasResultSet) {
-          COORDINATOR.cleanupQueryExecution(req.queryId);
-        }
+        resp.setMoreData(finished);
         return resp;
       }
     } catch (Exception e) {
+      finished = true;
       return RpcUtils.getTSFetchResultsResp(onQueryException(e, OperationType.FETCH_RESULTS));
+    } finally {
+      addOperationLatency(Operation.EXECUTE_QUERY, startTime);
+      if (finished) {
+        COORDINATOR.cleanupQueryExecution(req.queryId);
+      }
     }
   }
 
diff --git a/server/src/main/java/org/apache/iotdb/db/utils/QueryDataSetUtils.java b/server/src/main/java/org/apache/iotdb/db/utils/QueryDataSetUtils.java
index ca603bf535..ff3cda054a 100644
--- a/server/src/main/java/org/apache/iotdb/db/utils/QueryDataSetUtils.java
+++ b/server/src/main/java/org/apache/iotdb/db/utils/QueryDataSetUtils.java
@@ -32,6 +32,7 @@ import org.apache.iotdb.tsfile.read.query.dataset.QueryDataSet;
 import org.apache.iotdb.tsfile.utils.Binary;
 import org.apache.iotdb.tsfile.utils.BitMap;
 import org.apache.iotdb.tsfile.utils.BytesUtils;
+import org.apache.iotdb.tsfile.utils.Pair;
 
 import java.io.ByteArrayOutputStream;
 import java.io.DataInputStream;
@@ -179,8 +180,9 @@ public class QueryDataSetUtils {
     return tsQueryDataSet;
   }
 
-  public static TSQueryDataSet convertTsBlockByFetchSize(
+  public static Pair<TSQueryDataSet, Boolean> convertTsBlockByFetchSize(
       IQueryExecution queryExecution, int fetchSize) throws IOException, IoTDBException {
+    boolean finished = false;
     int columnNum = queryExecution.getOutputValueColumnCount();
     TSQueryDataSet tsQueryDataSet = new TSQueryDataSet();
     // one time column and each value column has an actual value buffer and a bitmap value to
@@ -201,6 +203,7 @@ public class QueryDataSetUtils {
     while (rowCount < fetchSize) {
       Optional<TsBlock> optionalTsBlock = queryExecution.getBatchResult();
       if (!optionalTsBlock.isPresent()) {
+        finished = true;
         break;
       }
       TsBlock tsBlock = optionalTsBlock.get();
@@ -371,17 +374,20 @@ public class QueryDataSetUtils {
     }
     tsQueryDataSet.setBitmapList(bitmapList);
     tsQueryDataSet.setValueList(valueList);
-    return tsQueryDataSet;
+    return new Pair<>(tsQueryDataSet, finished);
   }
 
+  /** pair.left is serialized TsBlock pair.right indicates if the query finished */
   // To fetch required amounts of data and combine them through List
-  public static List<ByteBuffer> convertQueryResultByFetchSize(
+  public static Pair<List<ByteBuffer>, Boolean> convertQueryResultByFetchSize(
       IQueryExecution queryExecution, int fetchSize) throws IoTDBException {
     int rowCount = 0;
     List<ByteBuffer> res = new ArrayList<>();
+    boolean finished = false;
     while (rowCount < fetchSize) {
       Optional<ByteBuffer> optionalByteBuffer = queryExecution.getByteBufferBatchResult();
       if (!optionalByteBuffer.isPresent()) {
+        finished = true;
         break;
       }
       ByteBuffer byteBuffer = optionalByteBuffer.get();
@@ -397,7 +403,7 @@ public class QueryDataSetUtils {
       }
       rowCount += positionCount;
     }
-    return res;
+    return new Pair<>(res, finished);
   }
 
   public static long[] readTimesFromBuffer(ByteBuffer buffer, int size) {
diff --git a/service-rpc/src/main/java/org/apache/iotdb/rpc/IoTDBRpcDataSet.java b/service-rpc/src/main/java/org/apache/iotdb/rpc/IoTDBRpcDataSet.java
index 3077c571be..98a116b008 100644
--- a/service-rpc/src/main/java/org/apache/iotdb/rpc/IoTDBRpcDataSet.java
+++ b/service-rpc/src/main/java/org/apache/iotdb/rpc/IoTDBRpcDataSet.java
@@ -64,7 +64,8 @@ public class IoTDBRpcDataSet {
   public long queryId;
   public long statementId;
   public boolean ignoreTimeStamp;
-  public boolean isRpcFetchResult;
+  // indicates that there is still more data in server side and we can call fetchResult to get more
+  public boolean moreData;
 
   public static final TsBlockSerde serde = new TsBlockSerde();
   public List<ByteBuffer> queryResult;
@@ -81,7 +82,7 @@ public class IoTDBRpcDataSet {
       List<String> columnTypeList,
       Map<String, Integer> columnNameIndex,
       boolean ignoreTimeStamp,
-      boolean isRpcFetchResult,
+      boolean moreData,
       long queryId,
       long statementId,
       IClientRPCService.Iface client,
@@ -97,7 +98,7 @@ public class IoTDBRpcDataSet {
     this.client = client;
     this.fetchSize = fetchSize;
     this.timeout = timeout;
-    this.isRpcFetchResult = isRpcFetchResult;
+    this.moreData = moreData;
     columnSize = columnNameList.size();
 
     this.columnNameList = new ArrayList<>();
@@ -162,7 +163,7 @@ public class IoTDBRpcDataSet {
       List<String> columnTypeList,
       Map<String, Integer> columnNameIndex,
       boolean ignoreTimeStamp,
-      boolean isRpcFetchResult,
+      boolean moreData,
       long queryId,
       long statementId,
       IClientRPCService.Iface client,
@@ -180,7 +181,7 @@ public class IoTDBRpcDataSet {
     this.client = client;
     this.fetchSize = fetchSize;
     this.timeout = timeout;
-    this.isRpcFetchResult = isRpcFetchResult;
+    this.moreData = moreData;
     columnSize = columnNameList.size();
 
     this.columnNameList = new ArrayList<>();
@@ -289,7 +290,7 @@ public class IoTDBRpcDataSet {
             "Cannot close dataset, because of network connection: {} ", e);
       }
     }
-    if (isRpcFetchResult && fetchResults() && hasCachedByteBuffer()) {
+    if (moreData && fetchResults() && hasCachedByteBuffer()) {
       constructOneTsBlock();
       constructOneRow();
       return true;
@@ -309,8 +310,8 @@ public class IoTDBRpcDataSet {
     req.setTimeout(timeout);
     try {
       TSFetchResultsResp resp = client.fetchResultsV2(req);
-
       RpcUtils.verifySuccess(resp.getStatus());
+      moreData = resp.moreData;
       if (!resp.hasResultSet) {
         emptyResultSet = true;
         close();
diff --git a/session/src/main/java/org/apache/iotdb/session/SessionConnection.java b/session/src/main/java/org/apache/iotdb/session/SessionConnection.java
index 630f6ffdeb..bd2ca927bc 100644
--- a/session/src/main/java/org/apache/iotdb/session/SessionConnection.java
+++ b/session/src/main/java/org/apache/iotdb/session/SessionConnection.java
@@ -377,7 +377,8 @@ public class SessionConnection {
         sessionId,
         execResp.queryResult,
         execResp.isIgnoreTimeStamp(),
-        timeout);
+        timeout,
+        execResp.moreData);
   }
 
   protected void executeNonQueryStatement(String sql)
@@ -439,7 +440,8 @@ public class SessionConnection {
         client,
         sessionId,
         execResp.queryResult,
-        execResp.isIgnoreTimeStamp());
+        execResp.isIgnoreTimeStamp(),
+        execResp.moreData);
   }
 
   protected SessionDataSet executeLastDataQuery(List<String> paths, long time, long timeOut)
@@ -478,7 +480,8 @@ public class SessionConnection {
         client,
         sessionId,
         tsExecuteStatementResp.queryResult,
-        tsExecuteStatementResp.isIgnoreTimeStamp());
+        tsExecuteStatementResp.isIgnoreTimeStamp(),
+        tsExecuteStatementResp.moreData);
   }
 
   protected void insertRecord(TSInsertRecordReq request)
diff --git a/session/src/main/java/org/apache/iotdb/session/SessionDataSet.java b/session/src/main/java/org/apache/iotdb/session/SessionDataSet.java
index f7b725ab73..c07336bb36 100644
--- a/session/src/main/java/org/apache/iotdb/session/SessionDataSet.java
+++ b/session/src/main/java/org/apache/iotdb/session/SessionDataSet.java
@@ -51,7 +51,8 @@ public class SessionDataSet implements AutoCloseable {
       IClientRPCService.Iface client,
       long sessionId,
       List<ByteBuffer> queryResult,
-      boolean ignoreTimeStamp) {
+      boolean ignoreTimeStamp,
+      boolean moreData) {
     this.ioTDBRpcDataSet =
         new IoTDBRpcDataSet(
             sql,
@@ -59,7 +60,7 @@ public class SessionDataSet implements AutoCloseable {
             columnTypeList,
             columnNameIndex,
             ignoreTimeStamp,
-            true,
+            moreData,
             queryId,
             statementId,
             client,
@@ -80,7 +81,8 @@ public class SessionDataSet implements AutoCloseable {
       long sessionId,
       List<ByteBuffer> queryResult,
       boolean ignoreTimeStamp,
-      long timeout) {
+      long timeout,
+      boolean moreData) {
     this.ioTDBRpcDataSet =
         new IoTDBRpcDataSet(
             sql,
@@ -88,7 +90,7 @@ public class SessionDataSet implements AutoCloseable {
             columnTypeList,
             columnNameIndex,
             ignoreTimeStamp,
-            true,
+            moreData,
             queryId,
             statementId,
             client,
diff --git a/thrift/src/main/thrift/client.thrift b/thrift/src/main/thrift/client.thrift
index 61744021c8..0cccfe9ee2 100644
--- a/thrift/src/main/thrift/client.thrift
+++ b/thrift/src/main/thrift/client.thrift
@@ -68,6 +68,7 @@ struct TSExecuteStatementResp {
   11: optional list<byte> aliasColumns
   12: optional TSTracingInfo tracingInfo
   13: optional list<binary> queryResult
+  14: optional bool moreData
 }
 
 enum TSProtocolVersion {
@@ -176,6 +177,7 @@ struct TSFetchResultsResp{
   4: optional TSQueryDataSet queryDataSet
   5: optional TSQueryNonAlignDataSet nonAlignQueryDataSet
   6: optional list<binary> queryResult
+  7: optional bool moreData
 }
 
 struct TSFetchMetadataResp{