You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ja...@apache.org on 2022/11/02 07:17:11 UTC
[iotdb] 02/02: Save one rpc call
This is an automated email from the ASF dual-hosted git repository.
jackietien pushed a commit to branch IOTDB-3773
in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit bc31698ac8116b4e0312e0fa6340cfb01d4e609e
Author: JackieTien97 <ja...@gmail.com>
AuthorDate: Wed Nov 2 15:16:46 2022 +0800
Save one rpc call
---
.../apache/iotdb/jdbc/IoTDBDatabaseMetadata.java | 72 +++++++-------
.../org/apache/iotdb/jdbc/IoTDBJDBCResultSet.java | 12 +--
.../java/org/apache/iotdb/jdbc/IoTDBStatement.java | 5 +-
.../service/thrift/impl/ClientRPCServiceImpl.java | 105 +++++++++++++++------
.../apache/iotdb/db/utils/QueryDataSetUtils.java | 14 ++-
.../java/org/apache/iotdb/rpc/IoTDBRpcDataSet.java | 15 +--
.../apache/iotdb/session/SessionConnection.java | 9 +-
.../org/apache/iotdb/session/SessionDataSet.java | 10 +-
thrift/src/main/thrift/client.thrift | 2 +
9 files changed, 154 insertions(+), 90 deletions(-)
diff --git a/jdbc/src/main/java/org/apache/iotdb/jdbc/IoTDBDatabaseMetadata.java b/jdbc/src/main/java/org/apache/iotdb/jdbc/IoTDBDatabaseMetadata.java
index f900c32c99..2977eb60dc 100644
--- a/jdbc/src/main/java/org/apache/iotdb/jdbc/IoTDBDatabaseMetadata.java
+++ b/jdbc/src/main/java/org/apache/iotdb/jdbc/IoTDBDatabaseMetadata.java
@@ -606,12 +606,12 @@ public class IoTDBDatabaseMetadata implements DatabaseMetaData {
false,
client,
null,
- 0,
+ -1,
sessionId,
null,
null,
(long) 60 * 1000,
- true);
+ false);
}
@Override
@@ -651,12 +651,12 @@ public class IoTDBDatabaseMetadata implements DatabaseMetaData {
false,
client,
null,
- 0,
+ -1,
sessionId,
null,
null,
(long) 60 * 1000,
- true);
+ false);
}
@Override
@@ -706,7 +706,7 @@ public class IoTDBDatabaseMetadata implements DatabaseMetaData {
true,
client,
null,
- 0,
+ -1,
sessionId,
Collections.singletonList(tsBlock),
null,
@@ -952,7 +952,7 @@ public class IoTDBDatabaseMetadata implements DatabaseMetaData {
true,
client,
null,
- 0,
+ -1,
sessionId,
Collections.singletonList(tsBlock),
null,
@@ -1061,7 +1061,7 @@ public class IoTDBDatabaseMetadata implements DatabaseMetaData {
true,
client,
null,
- 0,
+ -1,
sessionId,
Collections.singletonList(tsBlock),
null,
@@ -1116,12 +1116,12 @@ public class IoTDBDatabaseMetadata implements DatabaseMetaData {
false,
client,
null,
- 0,
+ -1,
sessionId,
null,
null,
(long) 60 * 1000,
- true);
+ false);
}
@Override
@@ -1230,12 +1230,12 @@ public class IoTDBDatabaseMetadata implements DatabaseMetaData {
false,
client,
null,
- 0,
+ -1,
sessionId,
null,
null,
(long) 60 * 1000,
- true);
+ false);
}
@Override
@@ -1330,7 +1330,7 @@ public class IoTDBDatabaseMetadata implements DatabaseMetaData {
true,
client,
null,
- 0,
+ -1,
sessionId,
Collections.singletonList(tsBlock),
null,
@@ -1399,7 +1399,7 @@ public class IoTDBDatabaseMetadata implements DatabaseMetaData {
true,
client,
null,
- 0,
+ -1,
sessionId,
Collections.singletonList(tsBlock),
null,
@@ -1452,12 +1452,12 @@ public class IoTDBDatabaseMetadata implements DatabaseMetaData {
false,
client,
null,
- 0,
+ -1,
sessionId,
null,
null,
(long) 60 * 1000,
- true);
+ false);
}
@Override
@@ -1501,12 +1501,12 @@ public class IoTDBDatabaseMetadata implements DatabaseMetaData {
false,
client,
null,
- 0,
+ -1,
sessionId,
null,
null,
(long) 60 * 1000,
- true);
+ false);
}
@Override
@@ -1705,7 +1705,7 @@ public class IoTDBDatabaseMetadata implements DatabaseMetaData {
true,
client,
null,
- 0,
+ -1,
sessionId,
Collections.singletonList(tsBlock),
null,
@@ -1761,12 +1761,12 @@ public class IoTDBDatabaseMetadata implements DatabaseMetaData {
false,
client,
null,
- 0,
+ -1,
sessionId,
null,
null,
(long) 60 * 1000,
- true);
+ false);
}
@Override
@@ -1806,12 +1806,12 @@ public class IoTDBDatabaseMetadata implements DatabaseMetaData {
false,
client,
null,
- 0,
+ -1,
sessionId,
null,
null,
(long) 60 * 1000,
- true);
+ false);
}
@Override
@@ -1952,7 +1952,7 @@ public class IoTDBDatabaseMetadata implements DatabaseMetaData {
true,
client,
null,
- 0,
+ -1,
sessionId,
Collections.singletonList(tsBlock),
null,
@@ -2006,12 +2006,12 @@ public class IoTDBDatabaseMetadata implements DatabaseMetaData {
false,
client,
null,
- 0,
+ -1,
sessionId,
null,
null,
(long) 60 * 1000,
- true);
+ false);
}
@Override
@@ -2047,12 +2047,12 @@ public class IoTDBDatabaseMetadata implements DatabaseMetaData {
false,
client,
null,
- 0,
+ -1,
sessionId,
null,
null,
(long) 60 * 1000,
- true);
+ false);
}
@Override
@@ -2171,7 +2171,7 @@ public class IoTDBDatabaseMetadata implements DatabaseMetaData {
true,
client,
null,
- 0,
+ -1,
sessionId,
Collections.singletonList(tsBlock),
null,
@@ -2211,7 +2211,7 @@ public class IoTDBDatabaseMetadata implements DatabaseMetaData {
true,
client,
null,
- 0,
+ -1,
sessionId,
Collections.singletonList(tsBlock),
null,
@@ -2383,7 +2383,7 @@ public class IoTDBDatabaseMetadata implements DatabaseMetaData {
true,
client,
null,
- 0,
+ -1,
sessionId,
Collections.singletonList(tsBlock),
null,
@@ -2560,7 +2560,7 @@ public class IoTDBDatabaseMetadata implements DatabaseMetaData {
true,
client,
null,
- 0,
+ -1,
sessionId,
Collections.singletonList(tsBlock),
null,
@@ -2763,7 +2763,7 @@ public class IoTDBDatabaseMetadata implements DatabaseMetaData {
true,
client,
null,
- 0,
+ -1,
sessionId,
Collections.singletonList(tsBlock),
null,
@@ -2806,12 +2806,12 @@ public class IoTDBDatabaseMetadata implements DatabaseMetaData {
false,
client,
null,
- 0,
+ -1,
sessionId,
null,
null,
(long) 60 * 1000,
- true);
+ false);
}
@Override
@@ -2860,12 +2860,12 @@ public class IoTDBDatabaseMetadata implements DatabaseMetaData {
false,
client,
null,
- 0,
+ -1,
sessionId,
null,
null,
(long) 60 * 1000,
- true);
+ false);
}
@Override
diff --git a/jdbc/src/main/java/org/apache/iotdb/jdbc/IoTDBJDBCResultSet.java b/jdbc/src/main/java/org/apache/iotdb/jdbc/IoTDBJDBCResultSet.java
index 1222ad128e..8fc39b31e3 100644
--- a/jdbc/src/main/java/org/apache/iotdb/jdbc/IoTDBJDBCResultSet.java
+++ b/jdbc/src/main/java/org/apache/iotdb/jdbc/IoTDBJDBCResultSet.java
@@ -60,8 +60,6 @@ public class IoTDBJDBCResultSet implements ResultSet {
protected List<String> columnTypeList;
protected IoTDBRpcDataSet ioTDBRpcDataSet;
protected IoTDBTracingInfo ioTDBRpcTracingInfo;
- private boolean isRpcFetchResult = true;
-
private String operationType = "";
private List<String> columns = null;
private List<String> sgColumns = null;
@@ -82,7 +80,8 @@ public class IoTDBJDBCResultSet implements ResultSet {
String operationType,
List<String> columns,
List<String> sgColumns,
- BitSet aliasColumnMap)
+ BitSet aliasColumnMap,
+ boolean moreData)
throws SQLException {
this.ioTDBRpcDataSet =
new IoTDBRpcDataSet(
@@ -91,7 +90,7 @@ public class IoTDBJDBCResultSet implements ResultSet {
columnTypeList,
columnNameIndex,
ignoreTimeStamp,
- true,
+ moreData,
queryId,
((IoTDBStatement) statement).getStmtId(),
client,
@@ -125,7 +124,7 @@ public class IoTDBJDBCResultSet implements ResultSet {
List<ByteBuffer> dataSet,
TSTracingInfo tracingInfo,
long timeout,
- boolean isRpcFetchResult)
+ boolean moreData)
throws SQLException {
this.ioTDBRpcDataSet =
new IoTDBRpcDataSet(
@@ -134,7 +133,7 @@ public class IoTDBJDBCResultSet implements ResultSet {
columnTypeList,
columnNameIndex,
ignoreTimeStamp,
- isRpcFetchResult,
+ moreData,
queryId,
((IoTDBStatement) statement).getStmtId(),
client,
@@ -144,7 +143,6 @@ public class IoTDBJDBCResultSet implements ResultSet {
timeout);
this.statement = statement;
this.columnTypeList = columnTypeList;
- this.isRpcFetchResult = isRpcFetchResult;
if (tracingInfo != null) {
ioTDBRpcTracingInfo = new IoTDBTracingInfo();
ioTDBRpcTracingInfo.setTsTracingInfo(tracingInfo);
diff --git a/jdbc/src/main/java/org/apache/iotdb/jdbc/IoTDBStatement.java b/jdbc/src/main/java/org/apache/iotdb/jdbc/IoTDBStatement.java
index 3fb925e46b..b3633436ef 100644
--- a/jdbc/src/main/java/org/apache/iotdb/jdbc/IoTDBStatement.java
+++ b/jdbc/src/main/java/org/apache/iotdb/jdbc/IoTDBStatement.java
@@ -302,7 +302,7 @@ public class IoTDBStatement implements Statement {
execResp.queryResult,
execResp.tracingInfo,
execReq.timeout,
- true);
+ execResp.moreData);
}
return true;
}
@@ -457,7 +457,8 @@ public class IoTDBStatement implements Statement {
execResp.operationType,
execResp.columns,
execResp.sgColumns,
- aliasColumn);
+ aliasColumn,
+ execResp.moreData);
}
return resultSet;
}
diff --git a/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/ClientRPCServiceImpl.java b/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/ClientRPCServiceImpl.java
index 0b8526bad2..2efdf6a85a 100644
--- a/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/ClientRPCServiceImpl.java
+++ b/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/ClientRPCServiceImpl.java
@@ -112,6 +112,7 @@ import org.apache.iotdb.service.rpc.thrift.TSyncIdentityInfo;
import org.apache.iotdb.service.rpc.thrift.TSyncTransportMetaInfo;
import org.apache.iotdb.tsfile.read.common.block.TsBlock;
import org.apache.iotdb.tsfile.read.common.block.column.Column;
+import org.apache.iotdb.tsfile.utils.Pair;
import org.apache.thrift.TException;
import org.slf4j.Logger;
@@ -149,19 +150,25 @@ public class ClientRPCServiceImpl implements IClientRPCServiceWithHandler {
@FunctionalInterface
public interface SelectResult {
- public void apply(TSExecuteStatementResp resp, IQueryExecution queryExecution, int fetchSize)
+ boolean apply(TSExecuteStatementResp resp, IQueryExecution queryExecution, int fetchSize)
throws IoTDBException, IOException;
}
private static final SelectResult SELECT_RESULT =
- (resp, queryExecution, fetchSize) ->
- resp.setQueryResult(
- QueryDataSetUtils.convertQueryResultByFetchSize(queryExecution, fetchSize));
+ (resp, queryExecution, fetchSize) -> {
+ Pair<List<ByteBuffer>, Boolean> pair =
+ QueryDataSetUtils.convertQueryResultByFetchSize(queryExecution, fetchSize);
+ resp.setQueryResult(pair.left);
+ return pair.right;
+ };
private static final SelectResult OLD_SELECT_RESULT =
- (resp, queryExecution, fetchSize) ->
- resp.setQueryDataSet(
- QueryDataSetUtils.convertTsBlockByFetchSize(queryExecution, fetchSize));
+ (resp, queryExecution, fetchSize) -> {
+ Pair<TSQueryDataSet, Boolean> pair =
+ QueryDataSetUtils.convertTsBlockByFetchSize(queryExecution, fetchSize);
+ resp.setQueryDataSet(pair.left);
+ return pair.right;
+ };
public ClientRPCServiceImpl() {
if (config.isClusterMode()) {
@@ -175,6 +182,8 @@ public class ClientRPCServiceImpl implements IClientRPCServiceWithHandler {
private TSExecuteStatementResp executeStatementInternal(
TSExecuteStatementReq req, SelectResult setResult) {
+ boolean finished = false;
+ long queryId = Long.MIN_VALUE;
String statement = req.getStatement();
if (!SESSION_MANAGER.checkLogin(SESSION_MANAGER.getCurrSession())) {
return RpcUtils.getTSExecuteStatementResp(getNotLoggedInStatus());
@@ -200,8 +209,7 @@ public class ClientRPCServiceImpl implements IClientRPCServiceWithHandler {
QUERY_FREQUENCY_RECORDER.incrementAndGet();
AUDIT_LOGGER.debug("Session {} execute Query: {}", req.sessionId, statement);
- long queryId =
- SESSION_MANAGER.requestQueryId(SESSION_MANAGER.getCurrSession(), req.statementId);
+ queryId = SESSION_MANAGER.requestQueryId(SESSION_MANAGER.getCurrSession(), req.statementId);
// create and cache dataset
ExecutionResult result =
COORDINATOR.execute(
@@ -225,22 +233,29 @@ public class ClientRPCServiceImpl implements IClientRPCServiceWithHandler {
if (queryExecution != null && queryExecution.isQuery()) {
resp = createResponse(queryExecution.getDatasetHeader(), queryId);
resp.setStatus(result.status);
- setResult.apply(resp, queryExecution, req.fetchSize);
+ finished = setResult.apply(resp, queryExecution, req.fetchSize);
+ resp.setMoreData(!finished);
} else {
resp = RpcUtils.getTSExecuteStatementResp(result.status);
}
return resp;
}
} catch (Exception e) {
+ finished = true;
return RpcUtils.getTSExecuteStatementResp(
onQueryException(e, "\"" + statement + "\". " + OperationType.EXECUTE_STATEMENT));
} finally {
addOperationLatency(Operation.EXECUTE_QUERY, startTime);
+ if (finished) {
+ COORDINATOR.cleanupQueryExecution(queryId);
+ }
}
}
private TSExecuteStatementResp executeRawDataQueryInternal(
TSRawDataQueryReq req, SelectResult setResult) {
+ boolean finished = false;
+ long queryId = Long.MIN_VALUE;
if (!SESSION_MANAGER.checkLogin(SESSION_MANAGER.getCurrSession())) {
return RpcUtils.getTSExecuteStatementResp(getNotLoggedInStatus());
}
@@ -257,8 +272,7 @@ public class ClientRPCServiceImpl implements IClientRPCServiceWithHandler {
QUERY_FREQUENCY_RECORDER.incrementAndGet();
AUDIT_LOGGER.debug("Session {} execute Raw Data Query: {}", req.sessionId, req);
- long queryId =
- SESSION_MANAGER.requestQueryId(SESSION_MANAGER.getCurrSession(), req.statementId);
+ queryId = SESSION_MANAGER.requestQueryId(SESSION_MANAGER.getCurrSession(), req.statementId);
// create and cache dataset
ExecutionResult result =
COORDINATOR.execute(
@@ -281,23 +295,29 @@ public class ClientRPCServiceImpl implements IClientRPCServiceWithHandler {
if (queryExecution.isQuery()) {
resp = createResponse(queryExecution.getDatasetHeader(), queryId);
resp.setStatus(result.status);
- setResult.apply(resp, queryExecution, req.fetchSize);
+ finished = setResult.apply(resp, queryExecution, req.fetchSize);
+ resp.setMoreData(!finished);
} else {
resp = RpcUtils.getTSExecuteStatementResp(result.status);
}
return resp;
}
} catch (Exception e) {
- // TODO call the coordinator to release query resource
+ finished = true;
return RpcUtils.getTSExecuteStatementResp(
onQueryException(e, "\"" + req + "\". " + OperationType.EXECUTE_RAW_DATA_QUERY));
} finally {
addOperationLatency(Operation.EXECUTE_QUERY, startTime);
+ if (finished) {
+ COORDINATOR.cleanupQueryExecution(queryId);
+ }
}
}
private TSExecuteStatementResp executeLastDataQueryInternal(
TSLastDataQueryReq req, SelectResult setResult) {
+ boolean finished = false;
+ long queryId = Long.MIN_VALUE;
if (!SESSION_MANAGER.checkLogin(SESSION_MANAGER.getCurrSession())) {
return RpcUtils.getTSExecuteStatementResp(getNotLoggedInStatus());
}
@@ -312,8 +332,7 @@ public class ClientRPCServiceImpl implements IClientRPCServiceWithHandler {
}
QUERY_FREQUENCY_RECORDER.incrementAndGet();
AUDIT_LOGGER.debug("Session {} execute Last Data Query: {}", req.sessionId, req);
- long queryId =
- SESSION_MANAGER.requestQueryId(SESSION_MANAGER.getCurrSession(), req.statementId);
+ queryId = SESSION_MANAGER.requestQueryId(SESSION_MANAGER.getCurrSession(), req.statementId);
// create and cache dataset
ExecutionResult result =
COORDINATOR.execute(
@@ -336,7 +355,8 @@ public class ClientRPCServiceImpl implements IClientRPCServiceWithHandler {
if (queryExecution.isQuery()) {
resp = createResponse(queryExecution.getDatasetHeader(), queryId);
resp.setStatus(result.status);
- setResult.apply(resp, queryExecution, req.fetchSize);
+ finished = setResult.apply(resp, queryExecution, req.fetchSize);
+ resp.setMoreData(!finished);
} else {
resp = RpcUtils.getTSExecuteStatementResp(result.status);
}
@@ -344,11 +364,14 @@ public class ClientRPCServiceImpl implements IClientRPCServiceWithHandler {
}
} catch (Exception e) {
- // TODO call the coordinator to release query resource
+ finished = true;
return RpcUtils.getTSExecuteStatementResp(
onQueryException(e, "\"" + req + "\". " + OperationType.EXECUTE_LAST_DATA_QUERY));
} finally {
addOperationLatency(Operation.EXECUTE_QUERY, startTime);
+ if (finished) {
+ COORDINATOR.cleanupQueryExecution(queryId);
+ }
}
}
@@ -379,6 +402,8 @@ public class ClientRPCServiceImpl implements IClientRPCServiceWithHandler {
@Override
public TSFetchResultsResp fetchResultsV2(TSFetchResultsReq req) {
+ long startTime = System.currentTimeMillis();
+ boolean finished = false;
try {
if (!SESSION_MANAGER.checkLogin(SESSION_MANAGER.getCurrSession())) {
return RpcUtils.getTSFetchResultsResp(getNotLoggedInStatus());
@@ -386,20 +411,33 @@ public class ClientRPCServiceImpl implements IClientRPCServiceWithHandler {
TSFetchResultsResp resp = RpcUtils.getTSFetchResultsResp(TSStatusCode.SUCCESS_STATUS);
IQueryExecution queryExecution = COORDINATOR.getQueryExecution(req.queryId);
+
+ if (queryExecution == null) {
+ resp.setHasResultSet(false);
+ resp.setMoreData(true);
+ return resp;
+ }
+
try (SetThreadName queryName = new SetThreadName(queryExecution.getQueryId())) {
- List<ByteBuffer> result =
+ Pair<List<ByteBuffer>, Boolean> pair =
QueryDataSetUtils.convertQueryResultByFetchSize(queryExecution, req.fetchSize);
+ List<ByteBuffer> result = pair.left;
+ finished = pair.right;
boolean hasResultSet = !(result.size() == 0);
resp.setHasResultSet(hasResultSet);
resp.setIsAlign(true);
resp.setQueryResult(result);
- if (!hasResultSet) {
- COORDINATOR.cleanupQueryExecution(req.queryId);
- }
+ resp.setMoreData(!finished);
return resp;
}
} catch (Exception e) {
+ finished = true;
return RpcUtils.getTSFetchResultsResp(onQueryException(e, OperationType.FETCH_RESULTS));
+ } finally {
+ addOperationLatency(Operation.EXECUTE_QUERY, startTime);
+ if (finished) {
+ COORDINATOR.cleanupQueryExecution(req.queryId);
+ }
}
}
@@ -857,6 +895,8 @@ public class ClientRPCServiceImpl implements IClientRPCServiceWithHandler {
@Override
public TSFetchResultsResp fetchResults(TSFetchResultsReq req) {
+ boolean finished = false;
+ long startTime = System.currentTimeMillis();
try {
if (!SESSION_MANAGER.checkLogin(SESSION_MANAGER.getCurrSession())) {
return RpcUtils.getTSFetchResultsResp(getNotLoggedInStatus());
@@ -865,21 +905,32 @@ public class ClientRPCServiceImpl implements IClientRPCServiceWithHandler {
TSFetchResultsResp resp = RpcUtils.getTSFetchResultsResp(TSStatusCode.SUCCESS_STATUS);
IQueryExecution queryExecution = COORDINATOR.getQueryExecution(req.queryId);
+ if (queryExecution == null) {
+ resp.setHasResultSet(false);
+ resp.setMoreData(true);
+ return resp;
+ }
+
try (SetThreadName queryName = new SetThreadName(queryExecution.getQueryId())) {
- TSQueryDataSet result =
+ Pair<TSQueryDataSet, Boolean> pair =
QueryDataSetUtils.convertTsBlockByFetchSize(queryExecution, req.fetchSize);
+ TSQueryDataSet result = pair.left;
+ finished = pair.right;
boolean hasResultSet = result.bufferForTime().limit() != 0;
-
resp.setHasResultSet(hasResultSet);
resp.setQueryDataSet(result);
resp.setIsAlign(true);
- if (!hasResultSet) {
- COORDINATOR.cleanupQueryExecution(req.queryId);
- }
+ resp.setMoreData(finished);
return resp;
}
} catch (Exception e) {
+ finished = true;
return RpcUtils.getTSFetchResultsResp(onQueryException(e, OperationType.FETCH_RESULTS));
+ } finally {
+ addOperationLatency(Operation.EXECUTE_QUERY, startTime);
+ if (finished) {
+ COORDINATOR.cleanupQueryExecution(req.queryId);
+ }
}
}
diff --git a/server/src/main/java/org/apache/iotdb/db/utils/QueryDataSetUtils.java b/server/src/main/java/org/apache/iotdb/db/utils/QueryDataSetUtils.java
index ca603bf535..ff3cda054a 100644
--- a/server/src/main/java/org/apache/iotdb/db/utils/QueryDataSetUtils.java
+++ b/server/src/main/java/org/apache/iotdb/db/utils/QueryDataSetUtils.java
@@ -32,6 +32,7 @@ import org.apache.iotdb.tsfile.read.query.dataset.QueryDataSet;
import org.apache.iotdb.tsfile.utils.Binary;
import org.apache.iotdb.tsfile.utils.BitMap;
import org.apache.iotdb.tsfile.utils.BytesUtils;
+import org.apache.iotdb.tsfile.utils.Pair;
import java.io.ByteArrayOutputStream;
import java.io.DataInputStream;
@@ -179,8 +180,9 @@ public class QueryDataSetUtils {
return tsQueryDataSet;
}
- public static TSQueryDataSet convertTsBlockByFetchSize(
+ public static Pair<TSQueryDataSet, Boolean> convertTsBlockByFetchSize(
IQueryExecution queryExecution, int fetchSize) throws IOException, IoTDBException {
+ boolean finished = false;
int columnNum = queryExecution.getOutputValueColumnCount();
TSQueryDataSet tsQueryDataSet = new TSQueryDataSet();
// one time column and each value column has an actual value buffer and a bitmap value to
@@ -201,6 +203,7 @@ public class QueryDataSetUtils {
while (rowCount < fetchSize) {
Optional<TsBlock> optionalTsBlock = queryExecution.getBatchResult();
if (!optionalTsBlock.isPresent()) {
+ finished = true;
break;
}
TsBlock tsBlock = optionalTsBlock.get();
@@ -371,17 +374,20 @@ public class QueryDataSetUtils {
}
tsQueryDataSet.setBitmapList(bitmapList);
tsQueryDataSet.setValueList(valueList);
- return tsQueryDataSet;
+ return new Pair<>(tsQueryDataSet, finished);
}
+ /** pair.left is serialized TsBlock pair.right indicates if the query finished */
// To fetch required amounts of data and combine them through List
- public static List<ByteBuffer> convertQueryResultByFetchSize(
+ public static Pair<List<ByteBuffer>, Boolean> convertQueryResultByFetchSize(
IQueryExecution queryExecution, int fetchSize) throws IoTDBException {
int rowCount = 0;
List<ByteBuffer> res = new ArrayList<>();
+ boolean finished = false;
while (rowCount < fetchSize) {
Optional<ByteBuffer> optionalByteBuffer = queryExecution.getByteBufferBatchResult();
if (!optionalByteBuffer.isPresent()) {
+ finished = true;
break;
}
ByteBuffer byteBuffer = optionalByteBuffer.get();
@@ -397,7 +403,7 @@ public class QueryDataSetUtils {
}
rowCount += positionCount;
}
- return res;
+ return new Pair<>(res, finished);
}
public static long[] readTimesFromBuffer(ByteBuffer buffer, int size) {
diff --git a/service-rpc/src/main/java/org/apache/iotdb/rpc/IoTDBRpcDataSet.java b/service-rpc/src/main/java/org/apache/iotdb/rpc/IoTDBRpcDataSet.java
index 3077c571be..98a116b008 100644
--- a/service-rpc/src/main/java/org/apache/iotdb/rpc/IoTDBRpcDataSet.java
+++ b/service-rpc/src/main/java/org/apache/iotdb/rpc/IoTDBRpcDataSet.java
@@ -64,7 +64,8 @@ public class IoTDBRpcDataSet {
public long queryId;
public long statementId;
public boolean ignoreTimeStamp;
- public boolean isRpcFetchResult;
+ // indicates that there is still more data in server side and we can call fetchResult to get more
+ public boolean moreData;
public static final TsBlockSerde serde = new TsBlockSerde();
public List<ByteBuffer> queryResult;
@@ -81,7 +82,7 @@ public class IoTDBRpcDataSet {
List<String> columnTypeList,
Map<String, Integer> columnNameIndex,
boolean ignoreTimeStamp,
- boolean isRpcFetchResult,
+ boolean moreData,
long queryId,
long statementId,
IClientRPCService.Iface client,
@@ -97,7 +98,7 @@ public class IoTDBRpcDataSet {
this.client = client;
this.fetchSize = fetchSize;
this.timeout = timeout;
- this.isRpcFetchResult = isRpcFetchResult;
+ this.moreData = moreData;
columnSize = columnNameList.size();
this.columnNameList = new ArrayList<>();
@@ -162,7 +163,7 @@ public class IoTDBRpcDataSet {
List<String> columnTypeList,
Map<String, Integer> columnNameIndex,
boolean ignoreTimeStamp,
- boolean isRpcFetchResult,
+ boolean moreData,
long queryId,
long statementId,
IClientRPCService.Iface client,
@@ -180,7 +181,7 @@ public class IoTDBRpcDataSet {
this.client = client;
this.fetchSize = fetchSize;
this.timeout = timeout;
- this.isRpcFetchResult = isRpcFetchResult;
+ this.moreData = moreData;
columnSize = columnNameList.size();
this.columnNameList = new ArrayList<>();
@@ -289,7 +290,7 @@ public class IoTDBRpcDataSet {
"Cannot close dataset, because of network connection: {} ", e);
}
}
- if (isRpcFetchResult && fetchResults() && hasCachedByteBuffer()) {
+ if (moreData && fetchResults() && hasCachedByteBuffer()) {
constructOneTsBlock();
constructOneRow();
return true;
@@ -309,8 +310,8 @@ public class IoTDBRpcDataSet {
req.setTimeout(timeout);
try {
TSFetchResultsResp resp = client.fetchResultsV2(req);
-
RpcUtils.verifySuccess(resp.getStatus());
+ moreData = resp.moreData;
if (!resp.hasResultSet) {
emptyResultSet = true;
close();
diff --git a/session/src/main/java/org/apache/iotdb/session/SessionConnection.java b/session/src/main/java/org/apache/iotdb/session/SessionConnection.java
index 630f6ffdeb..bd2ca927bc 100644
--- a/session/src/main/java/org/apache/iotdb/session/SessionConnection.java
+++ b/session/src/main/java/org/apache/iotdb/session/SessionConnection.java
@@ -377,7 +377,8 @@ public class SessionConnection {
sessionId,
execResp.queryResult,
execResp.isIgnoreTimeStamp(),
- timeout);
+ timeout,
+ execResp.moreData);
}
protected void executeNonQueryStatement(String sql)
@@ -439,7 +440,8 @@ public class SessionConnection {
client,
sessionId,
execResp.queryResult,
- execResp.isIgnoreTimeStamp());
+ execResp.isIgnoreTimeStamp(),
+ execResp.moreData);
}
protected SessionDataSet executeLastDataQuery(List<String> paths, long time, long timeOut)
@@ -478,7 +480,8 @@ public class SessionConnection {
client,
sessionId,
tsExecuteStatementResp.queryResult,
- tsExecuteStatementResp.isIgnoreTimeStamp());
+ tsExecuteStatementResp.isIgnoreTimeStamp(),
+ tsExecuteStatementResp.moreData);
}
protected void insertRecord(TSInsertRecordReq request)
diff --git a/session/src/main/java/org/apache/iotdb/session/SessionDataSet.java b/session/src/main/java/org/apache/iotdb/session/SessionDataSet.java
index f7b725ab73..c07336bb36 100644
--- a/session/src/main/java/org/apache/iotdb/session/SessionDataSet.java
+++ b/session/src/main/java/org/apache/iotdb/session/SessionDataSet.java
@@ -51,7 +51,8 @@ public class SessionDataSet implements AutoCloseable {
IClientRPCService.Iface client,
long sessionId,
List<ByteBuffer> queryResult,
- boolean ignoreTimeStamp) {
+ boolean ignoreTimeStamp,
+ boolean moreData) {
this.ioTDBRpcDataSet =
new IoTDBRpcDataSet(
sql,
@@ -59,7 +60,7 @@ public class SessionDataSet implements AutoCloseable {
columnTypeList,
columnNameIndex,
ignoreTimeStamp,
- true,
+ moreData,
queryId,
statementId,
client,
@@ -80,7 +81,8 @@ public class SessionDataSet implements AutoCloseable {
long sessionId,
List<ByteBuffer> queryResult,
boolean ignoreTimeStamp,
- long timeout) {
+ long timeout,
+ boolean moreData) {
this.ioTDBRpcDataSet =
new IoTDBRpcDataSet(
sql,
@@ -88,7 +90,7 @@ public class SessionDataSet implements AutoCloseable {
columnTypeList,
columnNameIndex,
ignoreTimeStamp,
- true,
+ moreData,
queryId,
statementId,
client,
diff --git a/thrift/src/main/thrift/client.thrift b/thrift/src/main/thrift/client.thrift
index 61744021c8..0cccfe9ee2 100644
--- a/thrift/src/main/thrift/client.thrift
+++ b/thrift/src/main/thrift/client.thrift
@@ -68,6 +68,7 @@ struct TSExecuteStatementResp {
11: optional list<byte> aliasColumns
12: optional TSTracingInfo tracingInfo
13: optional list<binary> queryResult
+ 14: optional bool moreData
}
enum TSProtocolVersion {
@@ -176,6 +177,7 @@ struct TSFetchResultsResp{
4: optional TSQueryDataSet queryDataSet
5: optional TSQueryNonAlignDataSet nonAlignQueryDataSet
6: optional list<binary> queryResult
+ 7: optional bool moreData
}
struct TSFetchMetadataResp{