You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by qi...@apache.org on 2022/11/02 13:47:30 UTC
[iotdb] branch master updated: [IOTDB-3773] [IOTDB-4831] Optimize the rpc call numbers in query processing & make query cost time print more accurate (#7875)
This is an automated email from the ASF dual-hosted git repository.
qiaojialin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new 695dd65fc7 [IOTDB-3773] [IOTDB-4831] Optimize the rpc call numbers in query processing & make query cost time print more accurate (#7875)
695dd65fc7 is described below
commit 695dd65fc75a84c1f5f9db6ecc43ec0e4451d9e2
Author: Jackie Tien <ja...@gmail.com>
AuthorDate: Wed Nov 2 21:47:24 2022 +0800
[IOTDB-3773] [IOTDB-4831] Optimize the rpc call numbers in query processing & make query cost time print more accurate (#7875)
---
.../apache/iotdb/jdbc/IoTDBDatabaseMetadata.java | 72 ++++++------
.../org/apache/iotdb/jdbc/IoTDBJDBCResultSet.java | 12 +-
.../java/org/apache/iotdb/jdbc/IoTDBStatement.java | 5 +-
.../apache/iotdb/jdbc/IoTDBJDBCResultSetTest.java | 127 +--------------------
.../org/apache/iotdb/db/mpp/plan/Coordinator.java | 21 +++-
.../db/mpp/plan/execution/IQueryExecution.java | 4 +
.../db/mpp/plan/execution/QueryExecution.java | 10 ++
.../mpp/plan/execution/config/ConfigExecution.java | 10 ++
.../service/thrift/impl/ClientRPCServiceImpl.java | 118 ++++++++++++-------
.../apache/iotdb/db/utils/QueryDataSetUtils.java | 12 +-
.../java/org/apache/iotdb/rpc/IoTDBRpcDataSet.java | 30 ++---
.../apache/iotdb/session/SessionConnection.java | 9 +-
.../org/apache/iotdb/session/SessionDataSet.java | 10 +-
thrift/src/main/thrift/client.thrift | 2 +
14 files changed, 193 insertions(+), 249 deletions(-)
diff --git a/jdbc/src/main/java/org/apache/iotdb/jdbc/IoTDBDatabaseMetadata.java b/jdbc/src/main/java/org/apache/iotdb/jdbc/IoTDBDatabaseMetadata.java
index f900c32c99..2977eb60dc 100644
--- a/jdbc/src/main/java/org/apache/iotdb/jdbc/IoTDBDatabaseMetadata.java
+++ b/jdbc/src/main/java/org/apache/iotdb/jdbc/IoTDBDatabaseMetadata.java
@@ -606,12 +606,12 @@ public class IoTDBDatabaseMetadata implements DatabaseMetaData {
false,
client,
null,
- 0,
+ -1,
sessionId,
null,
null,
(long) 60 * 1000,
- true);
+ false);
}
@Override
@@ -651,12 +651,12 @@ public class IoTDBDatabaseMetadata implements DatabaseMetaData {
false,
client,
null,
- 0,
+ -1,
sessionId,
null,
null,
(long) 60 * 1000,
- true);
+ false);
}
@Override
@@ -706,7 +706,7 @@ public class IoTDBDatabaseMetadata implements DatabaseMetaData {
true,
client,
null,
- 0,
+ -1,
sessionId,
Collections.singletonList(tsBlock),
null,
@@ -952,7 +952,7 @@ public class IoTDBDatabaseMetadata implements DatabaseMetaData {
true,
client,
null,
- 0,
+ -1,
sessionId,
Collections.singletonList(tsBlock),
null,
@@ -1061,7 +1061,7 @@ public class IoTDBDatabaseMetadata implements DatabaseMetaData {
true,
client,
null,
- 0,
+ -1,
sessionId,
Collections.singletonList(tsBlock),
null,
@@ -1116,12 +1116,12 @@ public class IoTDBDatabaseMetadata implements DatabaseMetaData {
false,
client,
null,
- 0,
+ -1,
sessionId,
null,
null,
(long) 60 * 1000,
- true);
+ false);
}
@Override
@@ -1230,12 +1230,12 @@ public class IoTDBDatabaseMetadata implements DatabaseMetaData {
false,
client,
null,
- 0,
+ -1,
sessionId,
null,
null,
(long) 60 * 1000,
- true);
+ false);
}
@Override
@@ -1330,7 +1330,7 @@ public class IoTDBDatabaseMetadata implements DatabaseMetaData {
true,
client,
null,
- 0,
+ -1,
sessionId,
Collections.singletonList(tsBlock),
null,
@@ -1399,7 +1399,7 @@ public class IoTDBDatabaseMetadata implements DatabaseMetaData {
true,
client,
null,
- 0,
+ -1,
sessionId,
Collections.singletonList(tsBlock),
null,
@@ -1452,12 +1452,12 @@ public class IoTDBDatabaseMetadata implements DatabaseMetaData {
false,
client,
null,
- 0,
+ -1,
sessionId,
null,
null,
(long) 60 * 1000,
- true);
+ false);
}
@Override
@@ -1501,12 +1501,12 @@ public class IoTDBDatabaseMetadata implements DatabaseMetaData {
false,
client,
null,
- 0,
+ -1,
sessionId,
null,
null,
(long) 60 * 1000,
- true);
+ false);
}
@Override
@@ -1705,7 +1705,7 @@ public class IoTDBDatabaseMetadata implements DatabaseMetaData {
true,
client,
null,
- 0,
+ -1,
sessionId,
Collections.singletonList(tsBlock),
null,
@@ -1761,12 +1761,12 @@ public class IoTDBDatabaseMetadata implements DatabaseMetaData {
false,
client,
null,
- 0,
+ -1,
sessionId,
null,
null,
(long) 60 * 1000,
- true);
+ false);
}
@Override
@@ -1806,12 +1806,12 @@ public class IoTDBDatabaseMetadata implements DatabaseMetaData {
false,
client,
null,
- 0,
+ -1,
sessionId,
null,
null,
(long) 60 * 1000,
- true);
+ false);
}
@Override
@@ -1952,7 +1952,7 @@ public class IoTDBDatabaseMetadata implements DatabaseMetaData {
true,
client,
null,
- 0,
+ -1,
sessionId,
Collections.singletonList(tsBlock),
null,
@@ -2006,12 +2006,12 @@ public class IoTDBDatabaseMetadata implements DatabaseMetaData {
false,
client,
null,
- 0,
+ -1,
sessionId,
null,
null,
(long) 60 * 1000,
- true);
+ false);
}
@Override
@@ -2047,12 +2047,12 @@ public class IoTDBDatabaseMetadata implements DatabaseMetaData {
false,
client,
null,
- 0,
+ -1,
sessionId,
null,
null,
(long) 60 * 1000,
- true);
+ false);
}
@Override
@@ -2171,7 +2171,7 @@ public class IoTDBDatabaseMetadata implements DatabaseMetaData {
true,
client,
null,
- 0,
+ -1,
sessionId,
Collections.singletonList(tsBlock),
null,
@@ -2211,7 +2211,7 @@ public class IoTDBDatabaseMetadata implements DatabaseMetaData {
true,
client,
null,
- 0,
+ -1,
sessionId,
Collections.singletonList(tsBlock),
null,
@@ -2383,7 +2383,7 @@ public class IoTDBDatabaseMetadata implements DatabaseMetaData {
true,
client,
null,
- 0,
+ -1,
sessionId,
Collections.singletonList(tsBlock),
null,
@@ -2560,7 +2560,7 @@ public class IoTDBDatabaseMetadata implements DatabaseMetaData {
true,
client,
null,
- 0,
+ -1,
sessionId,
Collections.singletonList(tsBlock),
null,
@@ -2763,7 +2763,7 @@ public class IoTDBDatabaseMetadata implements DatabaseMetaData {
true,
client,
null,
- 0,
+ -1,
sessionId,
Collections.singletonList(tsBlock),
null,
@@ -2806,12 +2806,12 @@ public class IoTDBDatabaseMetadata implements DatabaseMetaData {
false,
client,
null,
- 0,
+ -1,
sessionId,
null,
null,
(long) 60 * 1000,
- true);
+ false);
}
@Override
@@ -2860,12 +2860,12 @@ public class IoTDBDatabaseMetadata implements DatabaseMetaData {
false,
client,
null,
- 0,
+ -1,
sessionId,
null,
null,
(long) 60 * 1000,
- true);
+ false);
}
@Override
diff --git a/jdbc/src/main/java/org/apache/iotdb/jdbc/IoTDBJDBCResultSet.java b/jdbc/src/main/java/org/apache/iotdb/jdbc/IoTDBJDBCResultSet.java
index 1222ad128e..8fc39b31e3 100644
--- a/jdbc/src/main/java/org/apache/iotdb/jdbc/IoTDBJDBCResultSet.java
+++ b/jdbc/src/main/java/org/apache/iotdb/jdbc/IoTDBJDBCResultSet.java
@@ -60,8 +60,6 @@ public class IoTDBJDBCResultSet implements ResultSet {
protected List<String> columnTypeList;
protected IoTDBRpcDataSet ioTDBRpcDataSet;
protected IoTDBTracingInfo ioTDBRpcTracingInfo;
- private boolean isRpcFetchResult = true;
-
private String operationType = "";
private List<String> columns = null;
private List<String> sgColumns = null;
@@ -82,7 +80,8 @@ public class IoTDBJDBCResultSet implements ResultSet {
String operationType,
List<String> columns,
List<String> sgColumns,
- BitSet aliasColumnMap)
+ BitSet aliasColumnMap,
+ boolean moreData)
throws SQLException {
this.ioTDBRpcDataSet =
new IoTDBRpcDataSet(
@@ -91,7 +90,7 @@ public class IoTDBJDBCResultSet implements ResultSet {
columnTypeList,
columnNameIndex,
ignoreTimeStamp,
- true,
+ moreData,
queryId,
((IoTDBStatement) statement).getStmtId(),
client,
@@ -125,7 +124,7 @@ public class IoTDBJDBCResultSet implements ResultSet {
List<ByteBuffer> dataSet,
TSTracingInfo tracingInfo,
long timeout,
- boolean isRpcFetchResult)
+ boolean moreData)
throws SQLException {
this.ioTDBRpcDataSet =
new IoTDBRpcDataSet(
@@ -134,7 +133,7 @@ public class IoTDBJDBCResultSet implements ResultSet {
columnTypeList,
columnNameIndex,
ignoreTimeStamp,
- isRpcFetchResult,
+ moreData,
queryId,
((IoTDBStatement) statement).getStmtId(),
client,
@@ -144,7 +143,6 @@ public class IoTDBJDBCResultSet implements ResultSet {
timeout);
this.statement = statement;
this.columnTypeList = columnTypeList;
- this.isRpcFetchResult = isRpcFetchResult;
if (tracingInfo != null) {
ioTDBRpcTracingInfo = new IoTDBTracingInfo();
ioTDBRpcTracingInfo.setTsTracingInfo(tracingInfo);
diff --git a/jdbc/src/main/java/org/apache/iotdb/jdbc/IoTDBStatement.java b/jdbc/src/main/java/org/apache/iotdb/jdbc/IoTDBStatement.java
index 3fb925e46b..b3633436ef 100644
--- a/jdbc/src/main/java/org/apache/iotdb/jdbc/IoTDBStatement.java
+++ b/jdbc/src/main/java/org/apache/iotdb/jdbc/IoTDBStatement.java
@@ -302,7 +302,7 @@ public class IoTDBStatement implements Statement {
execResp.queryResult,
execResp.tracingInfo,
execReq.timeout,
- true);
+ execResp.moreData);
}
return true;
}
@@ -457,7 +457,8 @@ public class IoTDBStatement implements Statement {
execResp.operationType,
execResp.columns,
execResp.sgColumns,
- aliasColumn);
+ aliasColumn,
+ execResp.moreData);
}
return resultSet;
}
diff --git a/jdbc/src/test/java/org/apache/iotdb/jdbc/IoTDBJDBCResultSetTest.java b/jdbc/src/test/java/org/apache/iotdb/jdbc/IoTDBJDBCResultSetTest.java
index defd180750..e4437c9d0c 100644
--- a/jdbc/src/test/java/org/apache/iotdb/jdbc/IoTDBJDBCResultSetTest.java
+++ b/jdbc/src/test/java/org/apache/iotdb/jdbc/IoTDBJDBCResultSetTest.java
@@ -28,7 +28,6 @@ import org.apache.iotdb.service.rpc.thrift.TSFetchMetadataReq;
import org.apache.iotdb.service.rpc.thrift.TSFetchMetadataResp;
import org.apache.iotdb.service.rpc.thrift.TSFetchResultsReq;
import org.apache.iotdb.service.rpc.thrift.TSFetchResultsResp;
-import org.apache.iotdb.service.rpc.thrift.TSQueryDataSet;
import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
import org.apache.iotdb.tsfile.read.common.block.TsBlockBuilder;
import org.apache.iotdb.tsfile.read.common.block.column.TsBlockSerde;
@@ -39,8 +38,6 @@ import org.junit.Test;
import org.mockito.Mock;
import org.mockito.MockitoAnnotations;
-import java.io.ByteArrayOutputStream;
-import java.io.DataOutputStream;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.sql.ResultSet;
@@ -51,7 +48,6 @@ import java.time.ZoneId;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
-import java.util.LinkedList;
import java.util.List;
import static org.mockito.Matchers.any;
@@ -248,127 +244,7 @@ public class IoTDBJDBCResultSetTest {
}
// The client get TSQueryDataSet at the first request
- verify(fetchResultsResp, times(1)).getStatus();
- }
-
- // fake the first-time fetched result of 'testSql' from an IoTDB server
- private TSQueryDataSet FakedFirstFetchResult() throws IOException {
- List<TSDataType> tsDataTypeList = new ArrayList<>();
- tsDataTypeList.add(TSDataType.FLOAT); // root.vehicle.d0.s2
- tsDataTypeList.add(TSDataType.INT64); // root.vehicle.d0.s1
- tsDataTypeList.add(TSDataType.INT32); // root.vehicle.d0.s0
-
- Object[][] input = {
- {
- 2L, 2.22F, 40000L, null,
- },
- {
- 3L, 3.33F, null, null,
- },
- {
- 4L, 4.44F, null, null,
- },
- {
- 50L, null, 50000L, null,
- },
- {
- 100L, null, 199L, null,
- },
- {
- 101L, null, 199L, null,
- },
- {
- 103L, null, 199L, null,
- },
- {
- 105L, 11.11F, 199L, 33333,
- },
- {
- 1000L, 1000.11F, 55555L, 22222,
- }
- };
-
- int columnNum = tsDataTypeList.size();
- TSQueryDataSet tsQueryDataSet = new TSQueryDataSet();
- // one time column and each value column has a actual value buffer and a bitmap value to
- // indicate whether it is a null
- int columnNumWithTime = columnNum * 2 + 1;
- DataOutputStream[] dataOutputStreams = new DataOutputStream[columnNumWithTime];
- ByteArrayOutputStream[] byteArrayOutputStreams = new ByteArrayOutputStream[columnNumWithTime];
- for (int i = 0; i < columnNumWithTime; i++) {
- byteArrayOutputStreams[i] = new ByteArrayOutputStream();
- dataOutputStreams[i] = new DataOutputStream(byteArrayOutputStreams[i]);
- }
-
- int rowCount = input.length;
- int[] valueOccupation = new int[columnNum];
- // used to record a bitmap for every 8 row record
- int[] bitmap = new int[columnNum];
- for (int i = 0; i < rowCount; i++) {
- Object[] row = input[i];
- // use columnOutput to write byte array
- dataOutputStreams[0].writeLong((long) row[0]);
- for (int k = 0; k < columnNum; k++) {
- Object value = row[1 + k];
- DataOutputStream dataOutputStream = dataOutputStreams[2 * k + 1]; // DO NOT FORGET +1
- if (value == null) {
- bitmap[k] = (bitmap[k] << 1);
- } else {
- bitmap[k] = (bitmap[k] << 1) | 0x01;
- if (k == 0) { // TSDataType.FLOAT
- dataOutputStream.writeFloat((float) value);
- valueOccupation[k] += 4;
- } else if (k == 1) { // TSDataType.INT64
- dataOutputStream.writeLong((long) value);
- valueOccupation[k] += 8;
- } else { // TSDataType.INT32
- dataOutputStream.writeInt((int) value);
- valueOccupation[k] += 4;
- }
- }
- }
- if (i % 8 == 7) {
- for (int j = 0; j < bitmap.length; j++) {
- DataOutputStream dataBitmapOutputStream = dataOutputStreams[2 * (j + 1)];
- dataBitmapOutputStream.writeByte(bitmap[j]);
- // we should clear the bitmap every 8 row record
- bitmap[j] = 0;
- }
- }
- }
-
- // feed the remaining bitmap
- for (int j = 0; j < bitmap.length; j++) {
- DataOutputStream dataBitmapOutputStream = dataOutputStreams[2 * (j + 1)];
- dataBitmapOutputStream.writeByte(bitmap[j] << (8 - rowCount % 8));
- }
-
- // calculate the time buffer size
- int timeOccupation = rowCount * 8;
- ByteBuffer timeBuffer = ByteBuffer.allocate(timeOccupation);
- timeBuffer.put(byteArrayOutputStreams[0].toByteArray());
- timeBuffer.flip();
- tsQueryDataSet.setTime(timeBuffer);
-
- // calculate the bitmap buffer size
- int bitmapOccupation = rowCount / 8 + 1;
-
- List<ByteBuffer> bitmapList = new LinkedList<>();
- List<ByteBuffer> valueList = new LinkedList<>();
- for (int i = 1; i < byteArrayOutputStreams.length; i += 2) {
- ByteBuffer valueBuffer = ByteBuffer.allocate(valueOccupation[(i - 1) / 2]);
- valueBuffer.put(byteArrayOutputStreams[i].toByteArray());
- valueBuffer.flip();
- valueList.add(valueBuffer);
-
- ByteBuffer bitmapBuffer = ByteBuffer.allocate(bitmapOccupation);
- bitmapBuffer.put(byteArrayOutputStreams[i + 1].toByteArray());
- bitmapBuffer.flip();
- bitmapList.add(bitmapBuffer);
- }
- tsQueryDataSet.setBitmapList(bitmapList);
- tsQueryDataSet.setValueList(valueList);
- return tsQueryDataSet;
+ verify(fetchResultsResp, times(0)).getStatus();
}
private void constructObjectList(List<Object> standardObject) {
@@ -406,6 +282,7 @@ public class IoTDBJDBCResultSetTest {
}
}
+ // fake the first-time fetched result of 'testSql' from an IoTDB server
private List<ByteBuffer> FakedFirstFetchTsBlockResult() {
List<TSDataType> tsDataTypeList = new ArrayList<>();
tsDataTypeList.add(TSDataType.FLOAT); // root.vehicle.d0.s2
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/Coordinator.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/Coordinator.java
index 3095540a53..3a3002850e 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/Coordinator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/Coordinator.java
@@ -22,6 +22,7 @@ import org.apache.iotdb.common.rpc.thrift.TEndPoint;
import org.apache.iotdb.commons.client.IClientManager;
import org.apache.iotdb.commons.client.sync.SyncDataNodeInternalServiceClient;
import org.apache.iotdb.commons.concurrent.IoTDBThreadPoolFactory;
+import org.apache.iotdb.commons.conf.IoTDBConstant;
import org.apache.iotdb.db.client.DataNodeClientPoolFactory;
import org.apache.iotdb.db.conf.IoTDBConfig;
import org.apache.iotdb.db.conf.IoTDBDescriptor;
@@ -59,7 +60,10 @@ public class Coordinator {
private static final String COORDINATOR_WRITE_EXECUTOR_NAME = "MPPCoordinatorWrite";
private static final String COORDINATOR_SCHEDULED_EXECUTOR_NAME = "MPPCoordinatorScheduled";
private static final int COORDINATOR_SCHEDULED_EXECUTOR_SIZE = 10;
- private static final IoTDBConfig config = IoTDBDescriptor.getInstance().getConfig();
+ private static final IoTDBConfig CONFIG = IoTDBDescriptor.getInstance().getConfig();
+
+ private static final Logger SLOW_SQL_LOGGER =
+ LoggerFactory.getLogger(IoTDBConstant.SLOW_SQL_LOGGER_NAME);
private static final IClientManager<TEndPoint, SyncDataNodeInternalServiceClient>
INTERNAL_SERVICE_CLIENT_MANAGER =
@@ -135,7 +139,7 @@ public class Coordinator {
queryContext,
partitionFetcher,
schemaFetcher,
- timeOut > 0 ? timeOut : config.getQueryTimeoutThreshold(),
+ timeOut > 0 ? timeOut : CONFIG.getQueryTimeoutThreshold(),
startTime);
if (execution.isQuery()) {
queryExecutionMap.put(queryId, execution);
@@ -168,13 +172,13 @@ public class Coordinator {
// TODO: (xingtanzjr) need to redo once we have a concrete policy for the threadPool management
private ExecutorService getQueryExecutor() {
int coordinatorReadExecutorSize =
- config.isClusterMode() ? config.getCoordinatorReadExecutorSize() : 1;
+ CONFIG.isClusterMode() ? CONFIG.getCoordinatorReadExecutorSize() : 1;
return IoTDBThreadPoolFactory.newFixedThreadPool(
coordinatorReadExecutorSize, COORDINATOR_EXECUTOR_NAME);
}
private ExecutorService getWriteExecutor() {
- int coordinatorWriteExecutorSize = config.getCoordinatorWriteExecutorSize();
+ int coordinatorWriteExecutorSize = CONFIG.getCoordinatorWriteExecutorSize();
return IoTDBThreadPoolFactory.newFixedThreadPool(
coordinatorWriteExecutorSize, COORDINATOR_WRITE_EXECUTOR_NAME);
}
@@ -196,6 +200,15 @@ public class Coordinator {
LOGGER.debug("[CleanUpQuery]]");
queryExecution.stopAndCleanup();
queryExecutionMap.remove(queryId);
+ if (queryExecution.isQuery()) {
+ long costTime = System.currentTimeMillis() - queryExecution.getStartExecutionTime();
+ if (costTime >= CONFIG.getSlowQueryThreshold()) {
+ SLOW_SQL_LOGGER.info(
+ "Cost: {} ms, sql is {}",
+ costTime,
+ queryExecution.getExecuteSQL().orElse("UNKNOWN"));
+ }
+ }
}
}
}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/IQueryExecution.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/IQueryExecution.java
index b89bc3c667..6c3f51c6d6 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/IQueryExecution.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/IQueryExecution.java
@@ -49,4 +49,8 @@ public interface IQueryExecution {
boolean isQuery();
String getQueryId();
+
+ long getStartExecutionTime();
+
+ Optional<String> getExecuteSQL();
}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/QueryExecution.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/QueryExecution.java
index 88b954af7d..387f9d7258 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/QueryExecution.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/QueryExecution.java
@@ -579,6 +579,16 @@ public class QueryExecution implements IQueryExecution {
return context.getQueryId().getId();
}
+ @Override
+ public long getStartExecutionTime() {
+ return context.getStartTime();
+ }
+
+ @Override
+ public Optional<String> getExecuteSQL() {
+ return Optional.ofNullable(context.getSql());
+ }
+
public String toString() {
return String.format("QueryExecution[%s]", context.getQueryId());
}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/config/ConfigExecution.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/config/ConfigExecution.java
index 7e379425fe..c18be1ecf5 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/config/ConfigExecution.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/config/ConfigExecution.java
@@ -219,4 +219,14 @@ public class ConfigExecution implements IQueryExecution {
public String getQueryId() {
return context.getQueryId().getId();
}
+
+ @Override
+ public long getStartExecutionTime() {
+ return context.getStartTime();
+ }
+
+ @Override
+ public Optional<String> getExecuteSQL() {
+ return Optional.ofNullable(context.getSql());
+ }
}
diff --git a/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/ClientRPCServiceImpl.java b/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/ClientRPCServiceImpl.java
index e26c4e603f..e002d89ad5 100644
--- a/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/ClientRPCServiceImpl.java
+++ b/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/ClientRPCServiceImpl.java
@@ -112,6 +112,7 @@ import org.apache.iotdb.service.rpc.thrift.TSyncIdentityInfo;
import org.apache.iotdb.service.rpc.thrift.TSyncTransportMetaInfo;
import org.apache.iotdb.tsfile.read.common.block.TsBlock;
import org.apache.iotdb.tsfile.read.common.block.column.Column;
+import org.apache.iotdb.tsfile.utils.Pair;
import org.apache.thrift.TException;
import org.slf4j.Logger;
@@ -127,11 +128,8 @@ import java.util.Map;
import java.util.Optional;
import static org.apache.iotdb.db.service.basic.ServiceProvider.AUDIT_LOGGER;
-import static org.apache.iotdb.db.service.basic.ServiceProvider.CONFIG;
import static org.apache.iotdb.db.service.basic.ServiceProvider.CURRENT_RPC_VERSION;
import static org.apache.iotdb.db.service.basic.ServiceProvider.QUERY_FREQUENCY_RECORDER;
-import static org.apache.iotdb.db.service.basic.ServiceProvider.SESSION_MANAGER;
-import static org.apache.iotdb.db.service.basic.ServiceProvider.SLOW_SQL_LOGGER;
import static org.apache.iotdb.db.utils.ErrorHandlingUtils.onIoTDBException;
import static org.apache.iotdb.db.utils.ErrorHandlingUtils.onNPEOrUnexpectedException;
import static org.apache.iotdb.db.utils.ErrorHandlingUtils.onQueryException;
@@ -152,19 +150,25 @@ public class ClientRPCServiceImpl implements IClientRPCServiceWithHandler {
@FunctionalInterface
public interface SelectResult {
- public void apply(TSExecuteStatementResp resp, IQueryExecution queryExecution, int fetchSize)
+ boolean apply(TSExecuteStatementResp resp, IQueryExecution queryExecution, int fetchSize)
throws IoTDBException, IOException;
}
private static final SelectResult SELECT_RESULT =
- (resp, queryExecution, fetchSize) ->
- resp.setQueryResult(
- QueryDataSetUtils.convertQueryResultByFetchSize(queryExecution, fetchSize));
+ (resp, queryExecution, fetchSize) -> {
+ Pair<List<ByteBuffer>, Boolean> pair =
+ QueryDataSetUtils.convertQueryResultByFetchSize(queryExecution, fetchSize);
+ resp.setQueryResult(pair.left);
+ return pair.right;
+ };
private static final SelectResult OLD_SELECT_RESULT =
- (resp, queryExecution, fetchSize) ->
- resp.setQueryDataSet(
- QueryDataSetUtils.convertTsBlockByFetchSize(queryExecution, fetchSize));
+ (resp, queryExecution, fetchSize) -> {
+ Pair<TSQueryDataSet, Boolean> pair =
+ QueryDataSetUtils.convertTsBlockByFetchSize(queryExecution, fetchSize);
+ resp.setQueryDataSet(pair.left);
+ return pair.right;
+ };
public ClientRPCServiceImpl() {
if (config.isClusterMode()) {
@@ -178,6 +182,8 @@ public class ClientRPCServiceImpl implements IClientRPCServiceWithHandler {
private TSExecuteStatementResp executeStatementInternal(
TSExecuteStatementReq req, SelectResult setResult) {
+ boolean finished = false;
+ long queryId = Long.MIN_VALUE;
String statement = req.getStatement();
if (!SESSION_MANAGER.checkLogin(SESSION_MANAGER.getCurrSession())) {
return RpcUtils.getTSExecuteStatementResp(getNotLoggedInStatus());
@@ -203,8 +209,7 @@ public class ClientRPCServiceImpl implements IClientRPCServiceWithHandler {
QUERY_FREQUENCY_RECORDER.incrementAndGet();
AUDIT_LOGGER.debug("Session {} execute Query: {}", req.sessionId, statement);
- long queryId =
- SESSION_MANAGER.requestQueryId(SESSION_MANAGER.getCurrSession(), req.statementId);
+ queryId = SESSION_MANAGER.requestQueryId(SESSION_MANAGER.getCurrSession(), req.statementId);
// create and cache dataset
ExecutionResult result =
COORDINATOR.execute(
@@ -228,26 +233,29 @@ public class ClientRPCServiceImpl implements IClientRPCServiceWithHandler {
if (queryExecution != null && queryExecution.isQuery()) {
resp = createResponse(queryExecution.getDatasetHeader(), queryId);
resp.setStatus(result.status);
- setResult.apply(resp, queryExecution, req.fetchSize);
+ finished = setResult.apply(resp, queryExecution, req.fetchSize);
+ resp.setMoreData(!finished);
} else {
resp = RpcUtils.getTSExecuteStatementResp(result.status);
}
return resp;
}
} catch (Exception e) {
+ finished = true;
return RpcUtils.getTSExecuteStatementResp(
onQueryException(e, "\"" + statement + "\". " + OperationType.EXECUTE_STATEMENT));
} finally {
addOperationLatency(Operation.EXECUTE_QUERY, startTime);
- long costTime = System.currentTimeMillis() - startTime;
- if (costTime >= CONFIG.getSlowQueryThreshold()) {
- SLOW_SQL_LOGGER.info("Cost: {} ms, sql is {}", costTime, statement);
+ if (finished) {
+ COORDINATOR.cleanupQueryExecution(queryId);
}
}
}
private TSExecuteStatementResp executeRawDataQueryInternal(
TSRawDataQueryReq req, SelectResult setResult) {
+ boolean finished = false;
+ long queryId = Long.MIN_VALUE;
if (!SESSION_MANAGER.checkLogin(SESSION_MANAGER.getCurrSession())) {
return RpcUtils.getTSExecuteStatementResp(getNotLoggedInStatus());
}
@@ -264,8 +272,7 @@ public class ClientRPCServiceImpl implements IClientRPCServiceWithHandler {
QUERY_FREQUENCY_RECORDER.incrementAndGet();
AUDIT_LOGGER.debug("Session {} execute Raw Data Query: {}", req.sessionId, req);
- long queryId =
- SESSION_MANAGER.requestQueryId(SESSION_MANAGER.getCurrSession(), req.statementId);
+ queryId = SESSION_MANAGER.requestQueryId(SESSION_MANAGER.getCurrSession(), req.statementId);
// create and cache dataset
ExecutionResult result =
COORDINATOR.execute(
@@ -288,27 +295,29 @@ public class ClientRPCServiceImpl implements IClientRPCServiceWithHandler {
if (queryExecution.isQuery()) {
resp = createResponse(queryExecution.getDatasetHeader(), queryId);
resp.setStatus(result.status);
- setResult.apply(resp, queryExecution, req.fetchSize);
+ finished = setResult.apply(resp, queryExecution, req.fetchSize);
+ resp.setMoreData(!finished);
} else {
resp = RpcUtils.getTSExecuteStatementResp(result.status);
}
return resp;
}
} catch (Exception e) {
- // TODO call the coordinator to release query resource
+ finished = true;
return RpcUtils.getTSExecuteStatementResp(
onQueryException(e, "\"" + req + "\". " + OperationType.EXECUTE_RAW_DATA_QUERY));
} finally {
addOperationLatency(Operation.EXECUTE_QUERY, startTime);
- long costTime = System.currentTimeMillis() - startTime;
- if (costTime >= CONFIG.getSlowQueryThreshold()) {
- SLOW_SQL_LOGGER.info("Cost: {} ms, sql is {}", costTime, req);
+ if (finished) {
+ COORDINATOR.cleanupQueryExecution(queryId);
}
}
}
private TSExecuteStatementResp executeLastDataQueryInternal(
TSLastDataQueryReq req, SelectResult setResult) {
+ boolean finished = false;
+ long queryId = Long.MIN_VALUE;
if (!SESSION_MANAGER.checkLogin(SESSION_MANAGER.getCurrSession())) {
return RpcUtils.getTSExecuteStatementResp(getNotLoggedInStatus());
}
@@ -323,8 +332,7 @@ public class ClientRPCServiceImpl implements IClientRPCServiceWithHandler {
}
QUERY_FREQUENCY_RECORDER.incrementAndGet();
AUDIT_LOGGER.debug("Session {} execute Last Data Query: {}", req.sessionId, req);
- long queryId =
- SESSION_MANAGER.requestQueryId(SESSION_MANAGER.getCurrSession(), req.statementId);
+ queryId = SESSION_MANAGER.requestQueryId(SESSION_MANAGER.getCurrSession(), req.statementId);
// create and cache dataset
ExecutionResult result =
COORDINATOR.execute(
@@ -347,7 +355,8 @@ public class ClientRPCServiceImpl implements IClientRPCServiceWithHandler {
if (queryExecution.isQuery()) {
resp = createResponse(queryExecution.getDatasetHeader(), queryId);
resp.setStatus(result.status);
- setResult.apply(resp, queryExecution, req.fetchSize);
+ finished = setResult.apply(resp, queryExecution, req.fetchSize);
+ resp.setMoreData(!finished);
} else {
resp = RpcUtils.getTSExecuteStatementResp(result.status);
}
@@ -355,14 +364,13 @@ public class ClientRPCServiceImpl implements IClientRPCServiceWithHandler {
}
} catch (Exception e) {
- // TODO call the coordinator to release query resource
+ finished = true;
return RpcUtils.getTSExecuteStatementResp(
onQueryException(e, "\"" + req + "\". " + OperationType.EXECUTE_LAST_DATA_QUERY));
} finally {
addOperationLatency(Operation.EXECUTE_QUERY, startTime);
- long costTime = System.currentTimeMillis() - startTime;
- if (costTime >= CONFIG.getSlowQueryThreshold()) {
- SLOW_SQL_LOGGER.info("Cost: {} ms, sql is {}", costTime, req);
+ if (finished) {
+ COORDINATOR.cleanupQueryExecution(queryId);
}
}
}
@@ -394,6 +402,8 @@ public class ClientRPCServiceImpl implements IClientRPCServiceWithHandler {
@Override
public TSFetchResultsResp fetchResultsV2(TSFetchResultsReq req) {
+ long startTime = System.currentTimeMillis();
+ boolean finished = false;
try {
if (!SESSION_MANAGER.checkLogin(SESSION_MANAGER.getCurrSession())) {
return RpcUtils.getTSFetchResultsResp(getNotLoggedInStatus());
@@ -401,20 +411,33 @@ public class ClientRPCServiceImpl implements IClientRPCServiceWithHandler {
TSFetchResultsResp resp = RpcUtils.getTSFetchResultsResp(TSStatusCode.SUCCESS_STATUS);
IQueryExecution queryExecution = COORDINATOR.getQueryExecution(req.queryId);
+
+ if (queryExecution == null) {
+ resp.setHasResultSet(false);
+ resp.setMoreData(false);
+ return resp;
+ }
+
try (SetThreadName queryName = new SetThreadName(queryExecution.getQueryId())) {
- List<ByteBuffer> result =
+ Pair<List<ByteBuffer>, Boolean> pair =
QueryDataSetUtils.convertQueryResultByFetchSize(queryExecution, req.fetchSize);
+ List<ByteBuffer> result = pair.left;
+ finished = pair.right;
boolean hasResultSet = !(result.size() == 0);
resp.setHasResultSet(hasResultSet);
resp.setIsAlign(true);
resp.setQueryResult(result);
- if (!hasResultSet) {
- COORDINATOR.cleanupQueryExecution(req.queryId);
- }
+ resp.setMoreData(!finished);
return resp;
}
} catch (Exception e) {
+ finished = true;
return RpcUtils.getTSFetchResultsResp(onQueryException(e, OperationType.FETCH_RESULTS));
+ } finally {
+ addOperationLatency(Operation.EXECUTE_QUERY, startTime);
+ if (finished) {
+ COORDINATOR.cleanupQueryExecution(req.queryId);
+ }
}
}
@@ -872,6 +895,8 @@ public class ClientRPCServiceImpl implements IClientRPCServiceWithHandler {
@Override
public TSFetchResultsResp fetchResults(TSFetchResultsReq req) {
+ boolean finished = false;
+ long startTime = System.currentTimeMillis();
try {
if (!SESSION_MANAGER.checkLogin(SESSION_MANAGER.getCurrSession())) {
return RpcUtils.getTSFetchResultsResp(getNotLoggedInStatus());
@@ -880,21 +905,32 @@ public class ClientRPCServiceImpl implements IClientRPCServiceWithHandler {
TSFetchResultsResp resp = RpcUtils.getTSFetchResultsResp(TSStatusCode.SUCCESS_STATUS);
IQueryExecution queryExecution = COORDINATOR.getQueryExecution(req.queryId);
+ if (queryExecution == null) {
+ resp.setHasResultSet(false);
+ resp.setMoreData(true);
+ return resp;
+ }
+
try (SetThreadName queryName = new SetThreadName(queryExecution.getQueryId())) {
- TSQueryDataSet result =
+ Pair<TSQueryDataSet, Boolean> pair =
QueryDataSetUtils.convertTsBlockByFetchSize(queryExecution, req.fetchSize);
+ TSQueryDataSet result = pair.left;
+ finished = pair.right;
boolean hasResultSet = result.bufferForTime().limit() != 0;
-
resp.setHasResultSet(hasResultSet);
resp.setQueryDataSet(result);
resp.setIsAlign(true);
- if (!hasResultSet) {
- COORDINATOR.cleanupQueryExecution(req.queryId);
- }
+ resp.setMoreData(finished);
return resp;
}
} catch (Exception e) {
+ finished = true;
return RpcUtils.getTSFetchResultsResp(onQueryException(e, OperationType.FETCH_RESULTS));
+ } finally {
+ addOperationLatency(Operation.EXECUTE_QUERY, startTime);
+ if (finished) {
+ COORDINATOR.cleanupQueryExecution(req.queryId);
+ }
}
}
@@ -1516,10 +1552,6 @@ public class ClientRPCServiceImpl implements IClientRPCServiceWithHandler {
return null;
} finally {
addOperationLatency(Operation.EXECUTE_QUERY, startTime);
- long costTime = System.currentTimeMillis() - startTime;
- if (costTime >= CONFIG.getSlowQueryThreshold()) {
- SLOW_SQL_LOGGER.info("Cost: {} ms, sql is {}", costTime, statement);
- }
}
}
diff --git a/server/src/main/java/org/apache/iotdb/db/utils/QueryDataSetUtils.java b/server/src/main/java/org/apache/iotdb/db/utils/QueryDataSetUtils.java
index ca603bf535..e0575880ea 100644
--- a/server/src/main/java/org/apache/iotdb/db/utils/QueryDataSetUtils.java
+++ b/server/src/main/java/org/apache/iotdb/db/utils/QueryDataSetUtils.java
@@ -32,6 +32,7 @@ import org.apache.iotdb.tsfile.read.query.dataset.QueryDataSet;
import org.apache.iotdb.tsfile.utils.Binary;
import org.apache.iotdb.tsfile.utils.BitMap;
import org.apache.iotdb.tsfile.utils.BytesUtils;
+import org.apache.iotdb.tsfile.utils.Pair;
import java.io.ByteArrayOutputStream;
import java.io.DataInputStream;
@@ -179,8 +180,9 @@ public class QueryDataSetUtils {
return tsQueryDataSet;
}
- public static TSQueryDataSet convertTsBlockByFetchSize(
+ public static Pair<TSQueryDataSet, Boolean> convertTsBlockByFetchSize(
IQueryExecution queryExecution, int fetchSize) throws IOException, IoTDBException {
+ boolean finished = false;
int columnNum = queryExecution.getOutputValueColumnCount();
TSQueryDataSet tsQueryDataSet = new TSQueryDataSet();
// one time column and each value column has an actual value buffer and a bitmap value to
@@ -201,6 +203,7 @@ public class QueryDataSetUtils {
while (rowCount < fetchSize) {
Optional<TsBlock> optionalTsBlock = queryExecution.getBatchResult();
if (!optionalTsBlock.isPresent()) {
+ finished = true;
break;
}
TsBlock tsBlock = optionalTsBlock.get();
@@ -371,11 +374,12 @@ public class QueryDataSetUtils {
}
tsQueryDataSet.setBitmapList(bitmapList);
tsQueryDataSet.setValueList(valueList);
- return tsQueryDataSet;
+ return new Pair<>(tsQueryDataSet, finished);
}
+ /** pair.left is serialized TsBlock pair.right indicates if the query finished */
// To fetch required amounts of data and combine them through List
- public static List<ByteBuffer> convertQueryResultByFetchSize(
+ public static Pair<List<ByteBuffer>, Boolean> convertQueryResultByFetchSize(
IQueryExecution queryExecution, int fetchSize) throws IoTDBException {
int rowCount = 0;
List<ByteBuffer> res = new ArrayList<>();
@@ -397,7 +401,7 @@ public class QueryDataSetUtils {
}
rowCount += positionCount;
}
- return res;
+ return new Pair<>(res, !queryExecution.hasNextResult());
}
public static long[] readTimesFromBuffer(ByteBuffer buffer, int size) {
diff --git a/service-rpc/src/main/java/org/apache/iotdb/rpc/IoTDBRpcDataSet.java b/service-rpc/src/main/java/org/apache/iotdb/rpc/IoTDBRpcDataSet.java
index 3077c571be..5e78af7bce 100644
--- a/service-rpc/src/main/java/org/apache/iotdb/rpc/IoTDBRpcDataSet.java
+++ b/service-rpc/src/main/java/org/apache/iotdb/rpc/IoTDBRpcDataSet.java
@@ -53,7 +53,6 @@ public class IoTDBRpcDataSet {
public List<TSDataType> columnTypeDeduplicatedList; // deduplicated from columnTypeList
public int fetchSize;
public final long timeout;
- public boolean emptyResultSet = false;
public boolean hasCachedRecord = false;
public boolean lastReadWasNull;
@@ -64,7 +63,8 @@ public class IoTDBRpcDataSet {
public long queryId;
public long statementId;
public boolean ignoreTimeStamp;
- public boolean isRpcFetchResult;
+ // indicates that there is still more data in server side and we can call fetchResult to get more
+ public boolean moreData;
public static final TsBlockSerde serde = new TsBlockSerde();
public List<ByteBuffer> queryResult;
@@ -81,7 +81,7 @@ public class IoTDBRpcDataSet {
List<String> columnTypeList,
Map<String, Integer> columnNameIndex,
boolean ignoreTimeStamp,
- boolean isRpcFetchResult,
+ boolean moreData,
long queryId,
long statementId,
IClientRPCService.Iface client,
@@ -97,7 +97,7 @@ public class IoTDBRpcDataSet {
this.client = client;
this.fetchSize = fetchSize;
this.timeout = timeout;
- this.isRpcFetchResult = isRpcFetchResult;
+ this.moreData = moreData;
columnSize = columnNameList.size();
this.columnNameList = new ArrayList<>();
@@ -153,7 +153,6 @@ public class IoTDBRpcDataSet {
this.queryResultIndex = 0;
this.tsBlockSize = 0;
this.tsBlockIndex = -1;
- this.emptyResultSet = this.queryResultSize == 0;
}
public IoTDBRpcDataSet(
@@ -162,7 +161,7 @@ public class IoTDBRpcDataSet {
List<String> columnTypeList,
Map<String, Integer> columnNameIndex,
boolean ignoreTimeStamp,
- boolean isRpcFetchResult,
+ boolean moreData,
long queryId,
long statementId,
IClientRPCService.Iface client,
@@ -180,7 +179,7 @@ public class IoTDBRpcDataSet {
this.client = client;
this.fetchSize = fetchSize;
this.timeout = timeout;
- this.isRpcFetchResult = isRpcFetchResult;
+ this.moreData = moreData;
columnSize = columnNameList.size();
this.columnNameList = new ArrayList<>();
@@ -245,7 +244,6 @@ public class IoTDBRpcDataSet {
this.queryResultIndex = 0;
this.tsBlockSize = 0;
this.tsBlockIndex = -1;
- this.emptyResultSet = this.queryResultSize == 0;
}
public void close() throws StatementExecutionException, TException {
@@ -280,16 +278,8 @@ public class IoTDBRpcDataSet {
constructOneRow();
return true;
}
- if (emptyResultSet) {
- try {
- close();
- return false;
- } catch (TException e) {
- throw new IoTDBConnectionException(
- "Cannot close dataset, because of network connection: {} ", e);
- }
- }
- if (isRpcFetchResult && fetchResults() && hasCachedByteBuffer()) {
+
+ if (moreData && fetchResults() && hasCachedByteBuffer()) {
constructOneTsBlock();
constructOneRow();
return true;
@@ -309,10 +299,9 @@ public class IoTDBRpcDataSet {
req.setTimeout(timeout);
try {
TSFetchResultsResp resp = client.fetchResultsV2(req);
-
RpcUtils.verifySuccess(resp.getStatus());
+ moreData = resp.moreData;
if (!resp.hasResultSet) {
- emptyResultSet = true;
close();
} else {
queryResult = resp.getQueryResult();
@@ -323,7 +312,6 @@ public class IoTDBRpcDataSet {
}
this.tsBlockSize = 0;
this.tsBlockIndex = -1;
- this.emptyResultSet = this.queryResultSize == 0;
}
return resp.hasResultSet;
} catch (TException e) {
diff --git a/session/src/main/java/org/apache/iotdb/session/SessionConnection.java b/session/src/main/java/org/apache/iotdb/session/SessionConnection.java
index 630f6ffdeb..bd2ca927bc 100644
--- a/session/src/main/java/org/apache/iotdb/session/SessionConnection.java
+++ b/session/src/main/java/org/apache/iotdb/session/SessionConnection.java
@@ -377,7 +377,8 @@ public class SessionConnection {
sessionId,
execResp.queryResult,
execResp.isIgnoreTimeStamp(),
- timeout);
+ timeout,
+ execResp.moreData);
}
protected void executeNonQueryStatement(String sql)
@@ -439,7 +440,8 @@ public class SessionConnection {
client,
sessionId,
execResp.queryResult,
- execResp.isIgnoreTimeStamp());
+ execResp.isIgnoreTimeStamp(),
+ execResp.moreData);
}
protected SessionDataSet executeLastDataQuery(List<String> paths, long time, long timeOut)
@@ -478,7 +480,8 @@ public class SessionConnection {
client,
sessionId,
tsExecuteStatementResp.queryResult,
- tsExecuteStatementResp.isIgnoreTimeStamp());
+ tsExecuteStatementResp.isIgnoreTimeStamp(),
+ tsExecuteStatementResp.moreData);
}
protected void insertRecord(TSInsertRecordReq request)
diff --git a/session/src/main/java/org/apache/iotdb/session/SessionDataSet.java b/session/src/main/java/org/apache/iotdb/session/SessionDataSet.java
index f7b725ab73..c07336bb36 100644
--- a/session/src/main/java/org/apache/iotdb/session/SessionDataSet.java
+++ b/session/src/main/java/org/apache/iotdb/session/SessionDataSet.java
@@ -51,7 +51,8 @@ public class SessionDataSet implements AutoCloseable {
IClientRPCService.Iface client,
long sessionId,
List<ByteBuffer> queryResult,
- boolean ignoreTimeStamp) {
+ boolean ignoreTimeStamp,
+ boolean moreData) {
this.ioTDBRpcDataSet =
new IoTDBRpcDataSet(
sql,
@@ -59,7 +60,7 @@ public class SessionDataSet implements AutoCloseable {
columnTypeList,
columnNameIndex,
ignoreTimeStamp,
- true,
+ moreData,
queryId,
statementId,
client,
@@ -80,7 +81,8 @@ public class SessionDataSet implements AutoCloseable {
long sessionId,
List<ByteBuffer> queryResult,
boolean ignoreTimeStamp,
- long timeout) {
+ long timeout,
+ boolean moreData) {
this.ioTDBRpcDataSet =
new IoTDBRpcDataSet(
sql,
@@ -88,7 +90,7 @@ public class SessionDataSet implements AutoCloseable {
columnTypeList,
columnNameIndex,
ignoreTimeStamp,
- true,
+ moreData,
queryId,
statementId,
client,
diff --git a/thrift/src/main/thrift/client.thrift b/thrift/src/main/thrift/client.thrift
index 61744021c8..0cccfe9ee2 100644
--- a/thrift/src/main/thrift/client.thrift
+++ b/thrift/src/main/thrift/client.thrift
@@ -68,6 +68,7 @@ struct TSExecuteStatementResp {
11: optional list<byte> aliasColumns
12: optional TSTracingInfo tracingInfo
13: optional list<binary> queryResult
+ 14: optional bool moreData
}
enum TSProtocolVersion {
@@ -176,6 +177,7 @@ struct TSFetchResultsResp{
4: optional TSQueryDataSet queryDataSet
5: optional TSQueryNonAlignDataSet nonAlignQueryDataSet
6: optional list<binary> queryResult
+ 7: optional bool moreData
}
struct TSFetchMetadataResp{