You are viewing a plain text version of this content. The canonical link for it is here.
Posted to reviews@iotdb.apache.org by GitBox <gi...@apache.org> on 2022/10/19 11:56:54 UTC

[GitHub] [iotdb] JackieTien97 commented on a diff in pull request #7613: [IOTDB-3936]Add an interface in IClientRPCService to directly return bytebuffer instead of TSQueryDataSet

JackieTien97 commented on code in PR #7613:
URL: https://github.com/apache/iotdb/pull/7613#discussion_r998997856


##########
server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/ISourceHandle.java:
##########
@@ -40,6 +42,9 @@ public interface ISourceHandle {
    */
   TsBlock receive();
 
+  /** get the serialized tsblock as the form of bytebuffer. */

Review Comment:
   Add more java doc to indicate that this method share the same iterator with `receive()`, after this method being called, `receive()`'s cursor will also forward.



##########
server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/LocalSourceHandle.java:
##########
@@ -103,6 +107,36 @@ public TsBlock receive() {
     }
   }
 
+  @Override
+  public ByteBuffer getSerializedTsBlock() {
+    try (SetThreadName sourceHandleName = new SetThreadName(threadName)) {
+      checkState();
+
+      if (!queue.isBlocked().isDone()) {
+        throw new IllegalStateException("Source handle is blocked.");
+      }
+      TsBlock tsBlock;
+      synchronized (queue) {
+        tsBlock = queue.remove();
+      }
+      if (tsBlock != null) {
+        currSequenceId++;
+        logger.info(
+            "[GetTsBlockFromQueue] TsBlock:{} size:{}",
+            currSequenceId,
+            tsBlock.getRetainedSizeInBytes());
+      }
+      checkAndInvokeOnFinished();
+      if (tsBlock != null) {
+        return new TsBlockSerde().serialize(tsBlock);
+      } else {
+        return null;
+      }
+    } catch (IOException e) {
+      throw new RuntimeException(e);
+    }

Review Comment:
   Too much duplicated code with `receive()`, try to extract same code.



##########
server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/QueryExecution.java:
##########
@@ -392,6 +393,60 @@ public Optional<TsBlock> getBatchResult() throws IoTDBException {
     }
   }
 
+  @Override
+  public Optional<ByteBuffer> getByteBufferBatchResult() throws IoTDBException {

Review Comment:
   extract the common code



##########
server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/memory/MemorySourceHandle.java:
##########
@@ -60,6 +64,20 @@ public synchronized TsBlock receive() {
     return result;
   }
 
+  @Override
+  public ByteBuffer getSerializedTsBlock() {
+    hasNext = false;
+    if (result.isEmpty()) {
+      return null;
+    } else {
+      try {
+        return new TsBlockSerde().serialize(result);
+      } catch (IOException e) {
+        throw new RuntimeException(e);

Review Comment:
   try not to throw RuntimeException



##########
server/src/main/java/org/apache/iotdb/db/service/thrift/impl/TSServiceImpl.java:
##########
@@ -315,6 +315,38 @@ public TSServiceImpl() {
     serviceProvider = IoTDB.serviceProvider;
   }
 
+  @Override
+  public TSExecuteStatementResp executeQueryStatementV2(TSExecuteStatementReq req)
+      throws TException {
+    return null;
+  }
+
+  @Override
+  public TSExecuteStatementResp executeUpdateStatementV2(TSExecuteStatementReq req)
+      throws TException {
+    return null;
+  }
+
+  @Override
+  public TSExecuteStatementResp executeStatementV2(TSExecuteStatementReq req) throws TException {
+    return null;
+  }
+
+  @Override
+  public TSExecuteStatementResp executeRawDataQueryV2(TSRawDataQueryReq req) throws TException {
+    return null;
+  }
+
+  @Override
+  public TSExecuteStatementResp executeLastDataQueryV2(TSLastDataQueryReq req) throws TException {
+    return null;

Review Comment:
   ```suggestion
       throw  new UnsupportedOperationException();
   ```



##########
server/src/main/java/org/apache/iotdb/db/service/thrift/impl/ClientRPCServiceImpl.java:
##########
@@ -158,6 +159,270 @@ public ClientRPCServiceImpl() {
     }
   }
 
+  @Override
+  public TSExecuteStatementResp executeQueryStatementV2(TSExecuteStatementReq req)
+      throws TException {
+    return executeStatementV2(req);
+  }
+
+  @Override
+  public TSExecuteStatementResp executeUpdateStatementV2(TSExecuteStatementReq req)
+      throws TException {
+    return executeStatementV2(req);
+  }
+
+  // To fetch required amounts of data and combine them through List
+  private List<ByteBuffer> convertQueryResultByFetchSize(
+      IQueryExecution queryExecution, int fetchSize) {
+    int rowCount = 0;
+    List<ByteBuffer> res = new LinkedList<>();
+    while (rowCount < fetchSize) {
+      try {
+        Optional<ByteBuffer> optionalByteBuffer = queryExecution.getByteBufferBatchResult();
+        if (!optionalByteBuffer.isPresent()) {
+          break;
+        }
+
+        ByteBuffer byteBuffer = optionalByteBuffer.get();
+        byteBuffer.mark();
+        int valueColumnCount = byteBuffer.getInt();
+        if (valueColumnCount == 0) {
+          continue;
+        }

Review Comment:
   it's  ok that valueColumnColumn is zero, don't skip this. We may support some queries that only need time column.



##########
server/src/main/java/org/apache/iotdb/db/service/thrift/impl/ClientRPCServiceImpl.java:
##########
@@ -158,6 +159,270 @@ public ClientRPCServiceImpl() {
     }
   }
 
+  @Override
+  public TSExecuteStatementResp executeQueryStatementV2(TSExecuteStatementReq req)
+      throws TException {
+    return executeStatementV2(req);
+  }
+
+  @Override
+  public TSExecuteStatementResp executeUpdateStatementV2(TSExecuteStatementReq req)
+      throws TException {
+    return executeStatementV2(req);
+  }
+
+  // To fetch required amounts of data and combine them through List
+  private List<ByteBuffer> convertQueryResultByFetchSize(

Review Comment:
   move the method to `QueryDataSetUtils`



##########
session/src/main/java/org/apache/iotdb/session/SessionDataSet.java:
##########
@@ -134,32 +132,31 @@ private RowRecord constructRowRecordFromValueArray() throws StatementExecutionEx
               - START_INDEX;
 
       if (!ioTDBRpcDataSet.isNull(datasetColumnIndex)) {
-        byte[] valueBytes = ioTDBRpcDataSet.values[loc];
         TSDataType dataType = ioTDBRpcDataSet.columnTypeDeduplicatedList.get(loc);
         field = new Field(dataType);
         switch (dataType) {
           case BOOLEAN:
-            boolean booleanValue = BytesUtils.bytesToBool(valueBytes);
+            boolean booleanValue = ioTDBRpcDataSet.getBoolean(datasetColumnIndex);
             field.setBoolV(booleanValue);
             break;
           case INT32:
-            int intValue = BytesUtils.bytesToInt(valueBytes);
+            int intValue = iterator().getInt(datasetColumnIndex);

Review Comment:
   Why `INT32` is different from other types?



##########
session/src/test/java/org/apache/iotdb/session/SessionDatasetTest.java:
##########
@@ -0,0 +1,238 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.session;
+
+import org.apache.iotdb.rpc.IoTDBConnectionException;
+import org.apache.iotdb.rpc.StatementExecutionException;
+import org.apache.iotdb.tsfile.exception.write.UnSupportedDataTypeException;
+import org.apache.iotdb.tsfile.file.metadata.enums.CompressionType;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSEncoding;
+import org.apache.iotdb.tsfile.read.common.Field;
+import org.apache.iotdb.tsfile.read.common.RowRecord;
+import org.apache.iotdb.tsfile.write.record.Tablet;
+import org.apache.iotdb.tsfile.write.schema.MeasurementSchema;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+
+public class SessionDatasetTest {

Review Comment:
   no need to add this test.



##########
server/src/main/java/org/apache/iotdb/db/service/thrift/impl/TSServiceImpl.java:
##########
@@ -315,6 +315,38 @@ public TSServiceImpl() {
     serviceProvider = IoTDB.serviceProvider;
   }
 
+  @Override
+  public TSExecuteStatementResp executeQueryStatementV2(TSExecuteStatementReq req)
+      throws TException {
+    return null;
+  }
+
+  @Override
+  public TSExecuteStatementResp executeUpdateStatementV2(TSExecuteStatementReq req)
+      throws TException {
+    return null;

Review Comment:
   ```suggestion
       throw  new UnsupportedOperationException();
   ```



##########
server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/config/ConfigExecution.java:
##########
@@ -172,6 +175,19 @@ public Optional<TsBlock> getBatchResult() {
     return Optional.empty();
   }
 
+  @Override
+  public Optional<ByteBuffer> getByteBufferBatchResult() {
+    if (!resultSetConsumed) {
+      resultSetConsumed = true;
+      try {
+        return Optional.of(new TsBlockSerde().serialize(resultSet));
+      } catch (IOException e) {
+        throw new RuntimeException(e);

Review Comment:
   try not to use RuntimeException to wrap



##########
server/src/main/java/org/apache/iotdb/db/service/thrift/impl/TSServiceImpl.java:
##########
@@ -315,6 +315,38 @@ public TSServiceImpl() {
     serviceProvider = IoTDB.serviceProvider;
   }
 
+  @Override
+  public TSExecuteStatementResp executeQueryStatementV2(TSExecuteStatementReq req)
+      throws TException {
+    return null;

Review Comment:
   ```suggestion
       throw  new UnsupportedOperationException();
   ```



##########
server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/LocalSourceHandle.java:
##########
@@ -103,6 +107,36 @@ public TsBlock receive() {
     }
   }
 
+  @Override
+  public ByteBuffer getSerializedTsBlock() {
+    try (SetThreadName sourceHandleName = new SetThreadName(threadName)) {
+      checkState();
+
+      if (!queue.isBlocked().isDone()) {
+        throw new IllegalStateException("Source handle is blocked.");
+      }
+      TsBlock tsBlock;
+      synchronized (queue) {
+        tsBlock = queue.remove();
+      }
+      if (tsBlock != null) {
+        currSequenceId++;
+        logger.info(
+            "[GetTsBlockFromQueue] TsBlock:{} size:{}",
+            currSequenceId,
+            tsBlock.getRetainedSizeInBytes());
+      }
+      checkAndInvokeOnFinished();
+      if (tsBlock != null) {
+        return new TsBlockSerde().serialize(tsBlock);
+      } else {
+        return null;
+      }
+    } catch (IOException e) {
+      throw new RuntimeException(e);

Review Comment:
   better not to wrap checked exception as unchecked exception. You can add `throw IOException` in the signature of this method and then add catch block in the caller.



##########
server/src/main/java/org/apache/iotdb/db/service/thrift/impl/TSServiceImpl.java:
##########
@@ -315,6 +315,38 @@ public TSServiceImpl() {
     serviceProvider = IoTDB.serviceProvider;
   }
 
+  @Override
+  public TSExecuteStatementResp executeQueryStatementV2(TSExecuteStatementReq req)
+      throws TException {
+    return null;
+  }
+
+  @Override
+  public TSExecuteStatementResp executeUpdateStatementV2(TSExecuteStatementReq req)
+      throws TException {
+    return null;
+  }
+
+  @Override
+  public TSExecuteStatementResp executeStatementV2(TSExecuteStatementReq req) throws TException {
+    return null;

Review Comment:
   ```suggestion
       throw  new UnsupportedOperationException();
   ```



##########
server/src/main/java/org/apache/iotdb/db/service/thrift/impl/TSServiceImpl.java:
##########
@@ -315,6 +315,38 @@ public TSServiceImpl() {
     serviceProvider = IoTDB.serviceProvider;
   }
 
+  @Override
+  public TSExecuteStatementResp executeQueryStatementV2(TSExecuteStatementReq req)
+      throws TException {
+    return null;
+  }
+
+  @Override
+  public TSExecuteStatementResp executeUpdateStatementV2(TSExecuteStatementReq req)
+      throws TException {
+    return null;
+  }
+
+  @Override
+  public TSExecuteStatementResp executeStatementV2(TSExecuteStatementReq req) throws TException {
+    return null;
+  }
+
+  @Override
+  public TSExecuteStatementResp executeRawDataQueryV2(TSRawDataQueryReq req) throws TException {
+    return null;

Review Comment:
   ```suggestion
       throw  new UnsupportedOperationException();
   ```



##########
server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/SourceHandle.java:
##########
@@ -146,6 +148,38 @@ public synchronized TsBlock receive() {
     }
   }
 
+  @Override
+  public ByteBuffer getSerializedTsBlock() {
+    try (SetThreadName sourceHandleName = new SetThreadName(threadName)) {
+
+      checkState();
+
+      if (!blocked.isDone()) {
+        throw new IllegalStateException("Source handle is blocked.");
+      }
+
+      ByteBuffer bfBlock = sequenceIdToBfBlock.remove(currSequenceId);
+      if (bfBlock == null) {
+        return null;
+      }
+      long retainedSize = sequenceIdToDataBlockSize.remove(currSequenceId);
+      logger.info("[GetTsBlockFromBuffer] sequenceId:{}, size:{}", currSequenceId, retainedSize);
+      currSequenceId += 1;
+      bufferRetainedSizeInBytes -= retainedSize;
+      localMemoryManager.getQueryPool().free(localFragmentInstanceId.getQueryId(), retainedSize);
+
+      if (sequenceIdToBfBlock.isEmpty() && !isFinished()) {
+        logger.info("[WaitForMoreTsBlock]");
+        blocked = SettableFuture.create();
+      }
+      if (isFinished()) {
+        sourceHandleListener.onFinished(this);
+      }
+      trySubmitGetDataBlocksTask();
+      return bfBlock;
+    }
+  }

Review Comment:
   Too much duplicated code with receive(), try to extract same code.



##########
server/src/main/java/org/apache/iotdb/db/service/thrift/impl/TSServiceImpl.java:
##########
@@ -315,6 +315,38 @@ public TSServiceImpl() {
     serviceProvider = IoTDB.serviceProvider;
   }
 
+  @Override
+  public TSExecuteStatementResp executeQueryStatementV2(TSExecuteStatementReq req)
+      throws TException {
+    return null;
+  }
+
+  @Override
+  public TSExecuteStatementResp executeUpdateStatementV2(TSExecuteStatementReq req)
+      throws TException {
+    return null;
+  }
+
+  @Override
+  public TSExecuteStatementResp executeStatementV2(TSExecuteStatementReq req) throws TException {
+    return null;
+  }
+
+  @Override
+  public TSExecuteStatementResp executeRawDataQueryV2(TSRawDataQueryReq req) throws TException {
+    return null;
+  }
+
+  @Override
+  public TSExecuteStatementResp executeLastDataQueryV2(TSLastDataQueryReq req) throws TException {
+    return null;
+  }
+
+  @Override
+  public TSFetchResultsResp fetchResultsV2(TSFetchResultsReq req) throws TException {
+    return null;

Review Comment:
   ```suggestion
       throw  new UnsupportedOperationException();
   ```



##########
server/src/main/java/org/apache/iotdb/db/service/thrift/impl/ClientRPCServiceImpl.java:
##########
@@ -158,6 +159,270 @@ public ClientRPCServiceImpl() {
     }
   }
 
+  @Override
+  public TSExecuteStatementResp executeQueryStatementV2(TSExecuteStatementReq req)
+      throws TException {
+    return executeStatementV2(req);
+  }
+
+  @Override
+  public TSExecuteStatementResp executeUpdateStatementV2(TSExecuteStatementReq req)
+      throws TException {
+    return executeStatementV2(req);
+  }
+
+  // To fetch required amounts of data and combine them through List
+  private List<ByteBuffer> convertQueryResultByFetchSize(
+      IQueryExecution queryExecution, int fetchSize) {
+    int rowCount = 0;
+    List<ByteBuffer> res = new LinkedList<>();
+    while (rowCount < fetchSize) {
+      try {
+        Optional<ByteBuffer> optionalByteBuffer = queryExecution.getByteBufferBatchResult();
+        if (!optionalByteBuffer.isPresent()) {
+          break;
+        }
+
+        ByteBuffer byteBuffer = optionalByteBuffer.get();
+        byteBuffer.mark();
+        int valueColumnCount = byteBuffer.getInt();
+        if (valueColumnCount == 0) {
+          continue;
+        }
+        for (int i = 0; i < valueColumnCount; i++) {
+          byteBuffer.get();
+        }
+        int positionCount = byteBuffer.getInt();
+        byteBuffer.reset();
+
+        res.add(byteBuffer);
+        rowCount += positionCount;
+
+      } catch (IoTDBException e) {
+        throw new RuntimeException(e);
+      }
+    }
+    return res;
+  }
+
+  @Override
+  public TSExecuteStatementResp executeStatementV2(TSExecuteStatementReq req) throws TException {

Review Comment:
   extract common codes.



##########
server/src/main/java/org/apache/iotdb/db/service/thrift/impl/ClientRPCServiceImpl.java:
##########
@@ -158,6 +159,270 @@ public ClientRPCServiceImpl() {
     }
   }
 
+  @Override
+  public TSExecuteStatementResp executeQueryStatementV2(TSExecuteStatementReq req)
+      throws TException {
+    return executeStatementV2(req);
+  }
+
+  @Override
+  public TSExecuteStatementResp executeUpdateStatementV2(TSExecuteStatementReq req)
+      throws TException {
+    return executeStatementV2(req);
+  }
+
+  // To fetch required amounts of data and combine them through List
+  private List<ByteBuffer> convertQueryResultByFetchSize(
+      IQueryExecution queryExecution, int fetchSize) {
+    int rowCount = 0;
+    List<ByteBuffer> res = new LinkedList<>();
+    while (rowCount < fetchSize) {
+      try {
+        Optional<ByteBuffer> optionalByteBuffer = queryExecution.getByteBufferBatchResult();
+        if (!optionalByteBuffer.isPresent()) {
+          break;
+        }
+
+        ByteBuffer byteBuffer = optionalByteBuffer.get();
+        byteBuffer.mark();
+        int valueColumnCount = byteBuffer.getInt();
+        if (valueColumnCount == 0) {
+          continue;
+        }
+        for (int i = 0; i < valueColumnCount; i++) {
+          byteBuffer.get();
+        }
+        int positionCount = byteBuffer.getInt();
+        byteBuffer.reset();
+
+        res.add(byteBuffer);
+        rowCount += positionCount;
+
+      } catch (IoTDBException e) {
+        throw new RuntimeException(e);
+      }
+    }
+    return res;
+  }
+
+  @Override
+  public TSExecuteStatementResp executeStatementV2(TSExecuteStatementReq req) throws TException {
+    String statement = req.getStatement();
+    if (!SESSION_MANAGER.checkLogin(req.getSessionId())) {
+      return RpcUtils.getTSExecuteStatementResp(getNotLoggedInStatus());
+    }
+
+    long startTime = System.currentTimeMillis();
+    try {
+      Statement s =
+          StatementGenerator.createStatement(
+              statement, SESSION_MANAGER.getZoneId(req.getSessionId()));
+
+      if (s == null) {
+        return RpcUtils.getTSExecuteStatementResp(
+            RpcUtils.getStatus(
+                TSStatusCode.SQL_PARSE_ERROR, "This operation type is not supported"));
+      }
+      // permission check
+      TSStatus status = AuthorityChecker.checkAuthority(s, req.sessionId);
+      if (status.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
+        return RpcUtils.getTSExecuteStatementResp(status);
+      }
+
+      QUERY_FREQUENCY_RECORDER.incrementAndGet();
+      AUDIT_LOGGER.debug("Session {} execute Query: {}", req.sessionId, statement);
+
+      long queryId = SESSION_MANAGER.requestQueryId(req.statementId, true);
+      // create and cache dataset
+      ExecutionResult result =
+          COORDINATOR.execute(
+              s,
+              queryId,
+              SESSION_MANAGER.getSessionInfo(req.sessionId),
+              statement,
+              PARTITION_FETCHER,
+              SCHEMA_FETCHER,
+              req.getTimeout());
+
+      if (result.status.code != TSStatusCode.SUCCESS_STATUS.getStatusCode()
+          && result.status.code != TSStatusCode.NEED_REDIRECTION.getStatusCode()) {
+        return RpcUtils.getTSExecuteStatementResp(result.status);
+      }
+
+      IQueryExecution queryExecution = COORDINATOR.getQueryExecution(queryId);
+
+      try (SetThreadName threadName = new SetThreadName(result.queryId.getId())) {
+        TSExecuteStatementResp resp;
+        if (queryExecution != null && queryExecution.isQuery()) {
+          resp = createResponse(queryExecution.getDatasetHeader(), queryId);
+          resp.setStatus(result.status);
+          resp.setQueryResult(convertQueryResultByFetchSize(queryExecution, req.getFetchSize()));
+        } else {
+          resp = RpcUtils.getTSExecuteStatementResp(result.status);
+        }
+        return resp;
+      }
+    } catch (Exception e) {
+      return RpcUtils.getTSExecuteStatementResp(
+          onQueryException(e, "\"" + statement + "\". " + OperationType.EXECUTE_STATEMENT));
+    } finally {
+      addOperationLatency(Operation.EXECUTE_QUERY, startTime);
+      long costTime = System.currentTimeMillis() - startTime;
+      if (costTime >= CONFIG.getSlowQueryThreshold()) {
+        SLOW_SQL_LOGGER.info("Cost: {} ms, sql is {}", costTime, statement);
+      }
+    }
+  }
+
+  @Override
+  public TSExecuteStatementResp executeRawDataQueryV2(TSRawDataQueryReq req) throws TException {

Review Comment:
   extract common codes.



##########
server/src/main/java/org/apache/iotdb/db/service/thrift/impl/ClientRPCServiceImpl.java:
##########
@@ -158,6 +159,270 @@ public ClientRPCServiceImpl() {
     }
   }
 
+  @Override
+  public TSExecuteStatementResp executeQueryStatementV2(TSExecuteStatementReq req)
+      throws TException {
+    return executeStatementV2(req);
+  }
+
+  @Override
+  public TSExecuteStatementResp executeUpdateStatementV2(TSExecuteStatementReq req)
+      throws TException {
+    return executeStatementV2(req);
+  }
+
+  // To fetch required amounts of data and combine them through List
+  private List<ByteBuffer> convertQueryResultByFetchSize(
+      IQueryExecution queryExecution, int fetchSize) {
+    int rowCount = 0;
+    List<ByteBuffer> res = new LinkedList<>();
+    while (rowCount < fetchSize) {
+      try {
+        Optional<ByteBuffer> optionalByteBuffer = queryExecution.getByteBufferBatchResult();
+        if (!optionalByteBuffer.isPresent()) {
+          break;
+        }
+
+        ByteBuffer byteBuffer = optionalByteBuffer.get();
+        byteBuffer.mark();
+        int valueColumnCount = byteBuffer.getInt();
+        if (valueColumnCount == 0) {
+          continue;
+        }
+        for (int i = 0; i < valueColumnCount; i++) {
+          byteBuffer.get();
+        }
+        int positionCount = byteBuffer.getInt();
+        byteBuffer.reset();
+
+        res.add(byteBuffer);
+        rowCount += positionCount;
+
+      } catch (IoTDBException e) {
+        throw new RuntimeException(e);

Review Comment:
   Don't use `RuntimeException`, just rethrow it.



##########
service-rpc/src/main/java/org/apache/iotdb/rpc/IoTDBRpcDataSet.java:
##########
@@ -69,11 +64,14 @@ public class IoTDBRpcDataSet {
   public long statementId;
   public boolean ignoreTimeStamp;
 
-  public int rowsIndex = 0; // used to record the row index in current TSQueryDataSet
-
-  public TSQueryDataSet tsQueryDataSet = null;
-  public byte[] time; // used to cache the current time value
-  public byte[] currentBitmap; // used to cache the current bitmap for every column
+  public TsBlockSerde serde;

Review Comment:
   ```suggestion
     public final static TsBlockSerde serde = new TsBlockSerde();
   ```



##########
server/src/main/java/org/apache/iotdb/db/service/thrift/impl/ClientRPCServiceImpl.java:
##########
@@ -158,6 +159,270 @@ public ClientRPCServiceImpl() {
     }
   }
 
+  @Override
+  public TSExecuteStatementResp executeQueryStatementV2(TSExecuteStatementReq req)
+      throws TException {
+    return executeStatementV2(req);
+  }
+
+  @Override
+  public TSExecuteStatementResp executeUpdateStatementV2(TSExecuteStatementReq req)
+      throws TException {
+    return executeStatementV2(req);
+  }
+
+  // To fetch required amounts of data and combine them through List
+  private List<ByteBuffer> convertQueryResultByFetchSize(
+      IQueryExecution queryExecution, int fetchSize) {
+    int rowCount = 0;
+    List<ByteBuffer> res = new LinkedList<>();
+    while (rowCount < fetchSize) {
+      try {
+        Optional<ByteBuffer> optionalByteBuffer = queryExecution.getByteBufferBatchResult();
+        if (!optionalByteBuffer.isPresent()) {
+          break;
+        }
+
+        ByteBuffer byteBuffer = optionalByteBuffer.get();
+        byteBuffer.mark();
+        int valueColumnCount = byteBuffer.getInt();
+        if (valueColumnCount == 0) {
+          continue;
+        }
+        for (int i = 0; i < valueColumnCount; i++) {
+          byteBuffer.get();
+        }
+        int positionCount = byteBuffer.getInt();
+        byteBuffer.reset();
+
+        res.add(byteBuffer);
+        rowCount += positionCount;
+
+      } catch (IoTDBException e) {
+        throw new RuntimeException(e);
+      }
+    }
+    return res;
+  }
+
+  @Override
+  public TSExecuteStatementResp executeStatementV2(TSExecuteStatementReq req) throws TException {
+    String statement = req.getStatement();
+    if (!SESSION_MANAGER.checkLogin(req.getSessionId())) {
+      return RpcUtils.getTSExecuteStatementResp(getNotLoggedInStatus());
+    }
+
+    long startTime = System.currentTimeMillis();
+    try {
+      Statement s =
+          StatementGenerator.createStatement(
+              statement, SESSION_MANAGER.getZoneId(req.getSessionId()));
+
+      if (s == null) {
+        return RpcUtils.getTSExecuteStatementResp(
+            RpcUtils.getStatus(
+                TSStatusCode.SQL_PARSE_ERROR, "This operation type is not supported"));
+      }
+      // permission check
+      TSStatus status = AuthorityChecker.checkAuthority(s, req.sessionId);
+      if (status.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
+        return RpcUtils.getTSExecuteStatementResp(status);
+      }
+
+      QUERY_FREQUENCY_RECORDER.incrementAndGet();
+      AUDIT_LOGGER.debug("Session {} execute Query: {}", req.sessionId, statement);
+
+      long queryId = SESSION_MANAGER.requestQueryId(req.statementId, true);
+      // create and cache dataset
+      ExecutionResult result =
+          COORDINATOR.execute(
+              s,
+              queryId,
+              SESSION_MANAGER.getSessionInfo(req.sessionId),
+              statement,
+              PARTITION_FETCHER,
+              SCHEMA_FETCHER,
+              req.getTimeout());
+
+      if (result.status.code != TSStatusCode.SUCCESS_STATUS.getStatusCode()
+          && result.status.code != TSStatusCode.NEED_REDIRECTION.getStatusCode()) {
+        return RpcUtils.getTSExecuteStatementResp(result.status);
+      }
+
+      IQueryExecution queryExecution = COORDINATOR.getQueryExecution(queryId);
+
+      try (SetThreadName threadName = new SetThreadName(result.queryId.getId())) {
+        TSExecuteStatementResp resp;
+        if (queryExecution != null && queryExecution.isQuery()) {
+          resp = createResponse(queryExecution.getDatasetHeader(), queryId);
+          resp.setStatus(result.status);
+          resp.setQueryResult(convertQueryResultByFetchSize(queryExecution, req.getFetchSize()));
+        } else {
+          resp = RpcUtils.getTSExecuteStatementResp(result.status);
+        }
+        return resp;
+      }
+    } catch (Exception e) {
+      return RpcUtils.getTSExecuteStatementResp(
+          onQueryException(e, "\"" + statement + "\". " + OperationType.EXECUTE_STATEMENT));
+    } finally {
+      addOperationLatency(Operation.EXECUTE_QUERY, startTime);
+      long costTime = System.currentTimeMillis() - startTime;
+      if (costTime >= CONFIG.getSlowQueryThreshold()) {
+        SLOW_SQL_LOGGER.info("Cost: {} ms, sql is {}", costTime, statement);
+      }
+    }
+  }
+
+  @Override
+  public TSExecuteStatementResp executeRawDataQueryV2(TSRawDataQueryReq req) throws TException {
+    if (!SESSION_MANAGER.checkLogin(req.getSessionId())) {
+      return RpcUtils.getTSExecuteStatementResp(getNotLoggedInStatus());
+    }
+    long startTime = System.currentTimeMillis();
+    try {
+      Statement s =
+          StatementGenerator.createStatement(req, SESSION_MANAGER.getZoneId(req.getSessionId()));
+
+      // permission check
+      TSStatus status = AuthorityChecker.checkAuthority(s, req.sessionId);
+      if (status.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
+        return RpcUtils.getTSExecuteStatementResp(status);
+      }
+
+      QUERY_FREQUENCY_RECORDER.incrementAndGet();
+      AUDIT_LOGGER.debug("Session {} execute Raw Data Query: {}", req.sessionId, req);
+      long queryId = SESSION_MANAGER.requestQueryId(req.statementId, true);
+      // create and cache dataset
+      ExecutionResult result =
+          COORDINATOR.execute(
+              s,
+              queryId,
+              SESSION_MANAGER.getSessionInfo(req.sessionId),
+              "",
+              PARTITION_FETCHER,
+              SCHEMA_FETCHER,
+              req.getTimeout());
+
+      if (result.status.code != TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
+        throw new RuntimeException("error code: " + result.status);
+      }
+
+      IQueryExecution queryExecution = COORDINATOR.getQueryExecution(queryId);
+
+      try (SetThreadName threadName = new SetThreadName(result.queryId.getId())) {
+        TSExecuteStatementResp resp;
+        if (queryExecution.isQuery()) {
+          resp = createResponse(queryExecution.getDatasetHeader(), queryId);
+          resp.setStatus(result.status);
+          resp.setQueryResult(convertQueryResultByFetchSize(queryExecution, req.fetchSize));
+        } else {
+          resp = RpcUtils.getTSExecuteStatementResp(result.status);
+        }
+        return resp;
+      }
+    } catch (Exception e) {
+      // TODO call the coordinator to release query resource
+      return RpcUtils.getTSExecuteStatementResp(
+          onQueryException(e, "\"" + req + "\". " + OperationType.EXECUTE_RAW_DATA_QUERY));
+    } finally {
+      addOperationLatency(Operation.EXECUTE_QUERY, startTime);
+      long costTime = System.currentTimeMillis() - startTime;
+      if (costTime >= CONFIG.getSlowQueryThreshold()) {
+        SLOW_SQL_LOGGER.info("Cost: {} ms, sql is {}", costTime, req);
+      }
+    }
+  }
+
+  @Override
+  public TSExecuteStatementResp executeLastDataQueryV2(TSLastDataQueryReq req) throws TException {
+    if (!SESSION_MANAGER.checkLogin(req.getSessionId())) {
+      return RpcUtils.getTSExecuteStatementResp(getNotLoggedInStatus());
+    }
+    long startTime = System.currentTimeMillis();
+    try {
+      Statement s =
+          StatementGenerator.createStatement(req, SESSION_MANAGER.getZoneId(req.getSessionId()));
+      // permission check
+      TSStatus status = AuthorityChecker.checkAuthority(s, req.sessionId);
+      if (status.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
+        return RpcUtils.getTSExecuteStatementResp(status);
+      }
+      QUERY_FREQUENCY_RECORDER.incrementAndGet();
+      AUDIT_LOGGER.debug("Session {} execute Last Data Query: {}", req.sessionId, req);
+      long queryId = SESSION_MANAGER.requestQueryId(req.statementId, true);
+      // create and cache dataset
+      ExecutionResult result =
+          COORDINATOR.execute(
+              s,
+              queryId,
+              SESSION_MANAGER.getSessionInfo(req.sessionId),
+              "",
+              PARTITION_FETCHER,
+              SCHEMA_FETCHER,
+              req.getTimeout());
+
+      if (result.status.code != TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
+        throw new RuntimeException("error code: " + result.status);
+      }
+
+      IQueryExecution queryExecution = COORDINATOR.getQueryExecution(queryId);
+
+      try (SetThreadName threadName = new SetThreadName(result.queryId.getId())) {
+        TSExecuteStatementResp resp;
+        if (queryExecution.isQuery()) {
+          resp = createResponse(queryExecution.getDatasetHeader(), queryId);
+          resp.setStatus(result.status);
+          resp.setQueryResult(convertQueryResultByFetchSize(queryExecution, req.fetchSize));
+        } else {
+          resp = RpcUtils.getTSExecuteStatementResp(result.status);
+        }
+        return resp;
+      }
+
+    } catch (Exception e) {
+      // TODO call the coordinator to release query resource
+      return RpcUtils.getTSExecuteStatementResp(
+          onQueryException(e, "\"" + req + "\". " + OperationType.EXECUTE_LAST_DATA_QUERY));
+    } finally {
+      addOperationLatency(Operation.EXECUTE_QUERY, startTime);
+      long costTime = System.currentTimeMillis() - startTime;
+      if (costTime >= CONFIG.getSlowQueryThreshold()) {
+        SLOW_SQL_LOGGER.info("Cost: {} ms, sql is {}", costTime, req);
+      }
+    }
+  }
+
+  @Override
+  public TSFetchResultsResp fetchResultsV2(TSFetchResultsReq req) throws TException {

Review Comment:
   extract common codes.



##########
service-rpc/pom.xml:
##########
@@ -62,6 +62,14 @@
             <groupId>org.xerial.snappy</groupId>
             <artifactId>snappy-java</artifactId>
         </dependency>
+        <dependency>
+            <groupId>org.openjdk.jol</groupId>
+            <artifactId>jol-core</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>io.airlift</groupId>
+            <artifactId>slice</artifactId>
+        </dependency>

Review Comment:
   why we need to add these dependencies?



##########
service-rpc/src/main/java/org/apache/iotdb/rpc/IoTDBRpcDataSet.java:
##########
@@ -69,11 +64,14 @@ public class IoTDBRpcDataSet {
   public long statementId;
   public boolean ignoreTimeStamp;
 
-  public int rowsIndex = 0; // used to record the row index in current TSQueryDataSet
-
-  public TSQueryDataSet tsQueryDataSet = null;
-  public byte[] time; // used to cache the current time value
-  public byte[] currentBitmap; // used to cache the current bitmap for every column
+  public TsBlockSerde serde;
+  public List<ByteBuffer> queryResult;
+  public TsBlock curTsBlock;
+  public Long time; // used to cache the current time value

Review Comment:
   ```suggestion
     public long time; // used to cache the current time value
   ```
   Actually, there is no need to save this, each time  we need to return current time, we can just get it from `curTsBlock`.



##########
server/src/main/java/org/apache/iotdb/db/service/thrift/impl/ClientRPCServiceImpl.java:
##########
@@ -158,6 +159,270 @@ public ClientRPCServiceImpl() {
     }
   }
 
+  @Override
+  public TSExecuteStatementResp executeQueryStatementV2(TSExecuteStatementReq req)
+      throws TException {
+    return executeStatementV2(req);
+  }
+
+  @Override
+  public TSExecuteStatementResp executeUpdateStatementV2(TSExecuteStatementReq req)
+      throws TException {
+    return executeStatementV2(req);
+  }
+
+  // To fetch required amounts of data and combine them through List
+  private List<ByteBuffer> convertQueryResultByFetchSize(
+      IQueryExecution queryExecution, int fetchSize) {
+    int rowCount = 0;
+    List<ByteBuffer> res = new LinkedList<>();
+    while (rowCount < fetchSize) {
+      try {
+        Optional<ByteBuffer> optionalByteBuffer = queryExecution.getByteBufferBatchResult();
+        if (!optionalByteBuffer.isPresent()) {
+          break;
+        }
+
+        ByteBuffer byteBuffer = optionalByteBuffer.get();
+        byteBuffer.mark();
+        int valueColumnCount = byteBuffer.getInt();
+        if (valueColumnCount == 0) {
+          continue;
+        }
+        for (int i = 0; i < valueColumnCount; i++) {
+          byteBuffer.get();
+        }
+        int positionCount = byteBuffer.getInt();
+        byteBuffer.reset();
+
+        res.add(byteBuffer);
+        rowCount += positionCount;
+
+      } catch (IoTDBException e) {
+        throw new RuntimeException(e);
+      }
+    }
+    return res;
+  }
+
+  @Override
+  public TSExecuteStatementResp executeStatementV2(TSExecuteStatementReq req) throws TException {
+    String statement = req.getStatement();
+    if (!SESSION_MANAGER.checkLogin(req.getSessionId())) {
+      return RpcUtils.getTSExecuteStatementResp(getNotLoggedInStatus());
+    }
+
+    long startTime = System.currentTimeMillis();
+    try {
+      Statement s =
+          StatementGenerator.createStatement(
+              statement, SESSION_MANAGER.getZoneId(req.getSessionId()));
+
+      if (s == null) {
+        return RpcUtils.getTSExecuteStatementResp(
+            RpcUtils.getStatus(
+                TSStatusCode.SQL_PARSE_ERROR, "This operation type is not supported"));
+      }
+      // permission check
+      TSStatus status = AuthorityChecker.checkAuthority(s, req.sessionId);
+      if (status.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
+        return RpcUtils.getTSExecuteStatementResp(status);
+      }
+
+      QUERY_FREQUENCY_RECORDER.incrementAndGet();
+      AUDIT_LOGGER.debug("Session {} execute Query: {}", req.sessionId, statement);
+
+      long queryId = SESSION_MANAGER.requestQueryId(req.statementId, true);
+      // create and cache dataset
+      ExecutionResult result =
+          COORDINATOR.execute(
+              s,
+              queryId,
+              SESSION_MANAGER.getSessionInfo(req.sessionId),
+              statement,
+              PARTITION_FETCHER,
+              SCHEMA_FETCHER,
+              req.getTimeout());
+
+      if (result.status.code != TSStatusCode.SUCCESS_STATUS.getStatusCode()
+          && result.status.code != TSStatusCode.NEED_REDIRECTION.getStatusCode()) {
+        return RpcUtils.getTSExecuteStatementResp(result.status);
+      }
+
+      IQueryExecution queryExecution = COORDINATOR.getQueryExecution(queryId);
+
+      try (SetThreadName threadName = new SetThreadName(result.queryId.getId())) {
+        TSExecuteStatementResp resp;
+        if (queryExecution != null && queryExecution.isQuery()) {
+          resp = createResponse(queryExecution.getDatasetHeader(), queryId);
+          resp.setStatus(result.status);
+          resp.setQueryResult(convertQueryResultByFetchSize(queryExecution, req.getFetchSize()));
+        } else {
+          resp = RpcUtils.getTSExecuteStatementResp(result.status);
+        }
+        return resp;
+      }
+    } catch (Exception e) {
+      return RpcUtils.getTSExecuteStatementResp(
+          onQueryException(e, "\"" + statement + "\". " + OperationType.EXECUTE_STATEMENT));
+    } finally {
+      addOperationLatency(Operation.EXECUTE_QUERY, startTime);
+      long costTime = System.currentTimeMillis() - startTime;
+      if (costTime >= CONFIG.getSlowQueryThreshold()) {
+        SLOW_SQL_LOGGER.info("Cost: {} ms, sql is {}", costTime, statement);
+      }
+    }
+  }
+
+  @Override
+  public TSExecuteStatementResp executeRawDataQueryV2(TSRawDataQueryReq req) throws TException {
+    if (!SESSION_MANAGER.checkLogin(req.getSessionId())) {
+      return RpcUtils.getTSExecuteStatementResp(getNotLoggedInStatus());
+    }
+    long startTime = System.currentTimeMillis();
+    try {
+      Statement s =
+          StatementGenerator.createStatement(req, SESSION_MANAGER.getZoneId(req.getSessionId()));
+
+      // permission check
+      TSStatus status = AuthorityChecker.checkAuthority(s, req.sessionId);
+      if (status.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
+        return RpcUtils.getTSExecuteStatementResp(status);
+      }
+
+      QUERY_FREQUENCY_RECORDER.incrementAndGet();
+      AUDIT_LOGGER.debug("Session {} execute Raw Data Query: {}", req.sessionId, req);
+      long queryId = SESSION_MANAGER.requestQueryId(req.statementId, true);
+      // create and cache dataset
+      ExecutionResult result =
+          COORDINATOR.execute(
+              s,
+              queryId,
+              SESSION_MANAGER.getSessionInfo(req.sessionId),
+              "",
+              PARTITION_FETCHER,
+              SCHEMA_FETCHER,
+              req.getTimeout());
+
+      if (result.status.code != TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
+        throw new RuntimeException("error code: " + result.status);
+      }
+
+      IQueryExecution queryExecution = COORDINATOR.getQueryExecution(queryId);
+
+      try (SetThreadName threadName = new SetThreadName(result.queryId.getId())) {
+        TSExecuteStatementResp resp;
+        if (queryExecution.isQuery()) {
+          resp = createResponse(queryExecution.getDatasetHeader(), queryId);
+          resp.setStatus(result.status);
+          resp.setQueryResult(convertQueryResultByFetchSize(queryExecution, req.fetchSize));
+        } else {
+          resp = RpcUtils.getTSExecuteStatementResp(result.status);
+        }
+        return resp;
+      }
+    } catch (Exception e) {
+      // TODO call the coordinator to release query resource
+      return RpcUtils.getTSExecuteStatementResp(
+          onQueryException(e, "\"" + req + "\". " + OperationType.EXECUTE_RAW_DATA_QUERY));
+    } finally {
+      addOperationLatency(Operation.EXECUTE_QUERY, startTime);
+      long costTime = System.currentTimeMillis() - startTime;
+      if (costTime >= CONFIG.getSlowQueryThreshold()) {
+        SLOW_SQL_LOGGER.info("Cost: {} ms, sql is {}", costTime, req);
+      }
+    }
+  }
+
+  @Override
+  public TSExecuteStatementResp executeLastDataQueryV2(TSLastDataQueryReq req) throws TException {

Review Comment:
   extract common codes.



##########
server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/SourceHandle.java:
##########
@@ -66,8 +66,8 @@ public class SourceHandle implements ISourceHandle {
   private final TsBlockSerde serde;
   private final SourceHandleListener sourceHandleListener;
 
-  private final Map<Integer, TsBlock> sequenceIdToTsBlock = new HashMap<>();
   private final Map<Integer, Long> sequenceIdToDataBlockSize = new HashMap<>();
+  private final Map<Integer, ByteBuffer> sequenceIdToBfBlock = new HashMap<>();

Review Comment:
   ```suggestion
     private final Map<Integer, ByteBuffer> sequenceIdToTsBlock = new HashMap<>();
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@iotdb.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org