You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by hu...@apache.org on 2022/11/14 08:42:12 UTC
[iotdb] 02/02: add V2 interface
This is an automated email from the ASF dual-hosted git repository.
hui pushed a commit to branch ml/windowSet
in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit edc3bb43b7d4613fd31e067e5abe7233885dd11d
Author: Minghui Liu <li...@foxmail.com>
AuthorDate: Mon Nov 14 16:40:46 2022 +0800
add V2 interface
---
.../service/thrift/impl/ClientRPCServiceImpl.java | 59 +++++-
.../db/service/thrift/impl/TSServiceImpl.java | 5 +
.../apache/iotdb/db/utils/QueryDataSetUtils.java | 203 +++++++++++++++++++++
.../apache/iotdb/session/SessionConnection.java | 2 +-
thrift/src/main/thrift/client.thrift | 5 +-
5 files changed, 270 insertions(+), 4 deletions(-)
diff --git a/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/ClientRPCServiceImpl.java b/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/ClientRPCServiceImpl.java
index 61363a1d6c..a7796ac5d7 100644
--- a/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/ClientRPCServiceImpl.java
+++ b/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/ClientRPCServiceImpl.java
@@ -457,8 +457,8 @@ public class ClientRPCServiceImpl implements IClientRPCServiceWithHandler {
try (SetThreadName threadName = new SetThreadName(result.queryId.getId())) {
TSFetchWindowBatchResp resp =
- createTSFetchWindowBatchResp(queryExecution.getDatasetHeader());
- resp.setWindowBatch(QueryDataSetUtils.convertTsBlocksToWindowBatch(queryExecution));
+ createTSFetchWindowBatchResp(queryExecution.getDatasetHeader());
+ resp.setWindowBatchDataSetList(QueryDataSetUtils.convertTsBlocksToWindowBatchDataSetList(queryExecution));
return resp;
}
} catch (Exception e) {
@@ -474,6 +474,61 @@ public class ClientRPCServiceImpl implements IClientRPCServiceWithHandler {
}
}
+ @Override
+ public TSFetchWindowBatchResp fetchWindowBatchV2(TSFetchWindowBatchReq req) throws TException {
+ if (!SESSION_MANAGER.checkLogin(SESSION_MANAGER.getCurrSession())) {
+ return RpcUtils.getTSFetchWindowBatchResp(getNotLoggedInStatus());
+ }
+ long startTime = System.currentTimeMillis();
+ try {
+ Statement s = StatementGenerator.createStatement(req);
+
+ // permission check
+ TSStatus status = AuthorityChecker.checkAuthority(s, SESSION_MANAGER.getCurrSession());
+ if (status.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
+ return RpcUtils.getTSFetchWindowBatchResp(status);
+ }
+
+ QUERY_FREQUENCY_RECORDER.incrementAndGet();
+ AUDIT_LOGGER.debug("Session {} execute fetch window set: {}", req.sessionId, req);
+ long queryId =
+ SESSION_MANAGER.requestQueryId(SESSION_MANAGER.getCurrSession(), req.statementId);
+ // create and cache dataset
+ ExecutionResult result =
+ COORDINATOR.execute(
+ s,
+ queryId,
+ SESSION_MANAGER.getSessionInfo(SESSION_MANAGER.getCurrSession()),
+ "",
+ PARTITION_FETCHER,
+ SCHEMA_FETCHER,
+ config.getQueryTimeoutThreshold());
+
+ if (result.status.code != TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
+ throw new RuntimeException("error code: " + result.status);
+ }
+
+ IQueryExecution queryExecution = COORDINATOR.getQueryExecution(queryId);
+
+ try (SetThreadName threadName = new SetThreadName(result.queryId.getId())) {
+ TSFetchWindowBatchResp resp =
+ createTSFetchWindowBatchResp(queryExecution.getDatasetHeader());
+ resp.setWindowBatch(QueryDataSetUtils.convertTsBlocksToWindowBatch(queryExecution));
+ return resp;
+ }
+ } catch (Exception e) {
+ // TODO call the coordinator to release query resource
+ return RpcUtils.getTSFetchWindowBatchResp(
+ onQueryException(e, "\"" + req + "\". " + OperationType.EXECUTE_RAW_DATA_QUERY));
+ } finally {
+ addOperationLatency(Operation.EXECUTE_QUERY, startTime);
+ long costTime = System.currentTimeMillis() - startTime;
+ if (costTime >= CONFIG.getSlowQueryThreshold()) {
+ SLOW_SQL_LOGGER.info("Cost: {} ms, sql is {}", costTime, req);
+ }
+ }
+ }
+
@Override
public TSOpenSessionResp openSession(TSOpenSessionReq req) throws TException {
IoTDBConstant.ClientVersion clientVersion = parseClientVersion(req);
diff --git a/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/TSServiceImpl.java b/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/TSServiceImpl.java
index 7470898fac..2ce613c3cb 100644
--- a/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/TSServiceImpl.java
+++ b/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/TSServiceImpl.java
@@ -283,6 +283,11 @@ public class TSServiceImpl implements IClientRPCServiceWithHandler {
return null;
}
+ @Override
+ public TSFetchWindowBatchResp fetchWindowBatchV2(TSFetchWindowBatchReq req) throws TException {
+ return null;
+ }
+
@Override
public TSOpenSessionResp openSession(TSOpenSessionReq req) throws TException {
IoTDBConstant.ClientVersion clientVersion = parseClientVersion(req);
diff --git a/server/src/main/java/org/apache/iotdb/db/utils/QueryDataSetUtils.java b/server/src/main/java/org/apache/iotdb/db/utils/QueryDataSetUtils.java
index d053e4b953..17dd769f5a 100644
--- a/server/src/main/java/org/apache/iotdb/db/utils/QueryDataSetUtils.java
+++ b/server/src/main/java/org/apache/iotdb/db/utils/QueryDataSetUtils.java
@@ -428,6 +428,209 @@ public class QueryDataSetUtils {
return windowSet;
}
+ public static List<TSQueryDataSet> convertTsBlocksToWindowBatchDataSetList(IQueryExecution queryExecution)
+ throws IoTDBException, IOException {
+ List<TSQueryDataSet> windowSet = new ArrayList<>();
+
+ int columnNum = queryExecution.getOutputValueColumnCount();
+ // one time column and each value column has an actual value buffer and a bitmap value to
+ // indicate whether it is a null
+ int columnNumWithTime = columnNum * 2 + 1;
+
+ while (true) {
+ Optional<TsBlock> optionalTsBlock = queryExecution.getBatchResult();
+ if (!optionalTsBlock.isPresent()) {
+ break;
+ }
+ TsBlock tsBlock = optionalTsBlock.get();
+ if (tsBlock.isEmpty()) {
+ continue;
+ }
+
+ TSQueryDataSet tsQueryDataSet = new TSQueryDataSet();
+
+ DataOutputStream[] dataOutputStreams = new DataOutputStream[columnNumWithTime];
+ ByteArrayOutputStream[] byteArrayOutputStreams = new ByteArrayOutputStream[columnNumWithTime];
+ for (int i = 0; i < columnNumWithTime; i++) {
+ byteArrayOutputStreams[i] = new ByteArrayOutputStream();
+ dataOutputStreams[i] = new DataOutputStream(byteArrayOutputStreams[i]);
+ }
+
+ int rowCount = 0;
+ int[] valueOccupation = new int[columnNum];
+
+ // used to record a bitmap for every 8 points
+ int[] bitmaps = new int[columnNum];
+
+ int currentCount = tsBlock.getPositionCount();
+ // serialize time column
+ for (int i = 0; i < currentCount; i++) {
+ // use columnOutput to write byte array
+ dataOutputStreams[0].writeLong(tsBlock.getTimeByIndex(i));
+ }
+
+ // serialize each value column and its bitmap
+ for (int k = 0; k < columnNum; k++) {
+ // get DataOutputStream for current value column and its bitmap
+ DataOutputStream dataOutputStream = dataOutputStreams[2 * k + 1];
+ DataOutputStream dataBitmapOutputStream = dataOutputStreams[2 * (k + 1)];
+
+ Column column = tsBlock.getColumn(k);
+ TSDataType type = column.getDataType();
+ switch (type) {
+ case INT32:
+ for (int i = 0; i < currentCount; i++) {
+ rowCount++;
+ if (column.isNull(i)) {
+ bitmaps[k] = bitmaps[k] << 1;
+ } else {
+ bitmaps[k] = (bitmaps[k] << 1) | FLAG;
+ dataOutputStream.writeInt(column.getInt(i));
+ valueOccupation[k] += 4;
+ }
+ if (rowCount != 0 && rowCount % 8 == 0) {
+ dataBitmapOutputStream.writeByte(bitmaps[k]);
+ // we should clear the bitmap every 8 points
+ bitmaps[k] = 0;
+ }
+ }
+ break;
+ case INT64:
+ for (int i = 0; i < currentCount; i++) {
+ rowCount++;
+ if (column.isNull(i)) {
+ bitmaps[k] = bitmaps[k] << 1;
+ } else {
+ bitmaps[k] = (bitmaps[k] << 1) | FLAG;
+ dataOutputStream.writeLong(column.getLong(i));
+ valueOccupation[k] += 8;
+ }
+ if (rowCount != 0 && rowCount % 8 == 0) {
+ dataBitmapOutputStream.writeByte(bitmaps[k]);
+ // we should clear the bitmap every 8 points
+ bitmaps[k] = 0;
+ }
+ }
+ break;
+ case FLOAT:
+ for (int i = 0; i < currentCount; i++) {
+ rowCount++;
+ if (column.isNull(i)) {
+ bitmaps[k] = bitmaps[k] << 1;
+ } else {
+ bitmaps[k] = (bitmaps[k] << 1) | FLAG;
+ dataOutputStream.writeFloat(column.getFloat(i));
+ valueOccupation[k] += 4;
+ }
+ if (rowCount != 0 && rowCount % 8 == 0) {
+ dataBitmapOutputStream.writeByte(bitmaps[k]);
+ // we should clear the bitmap every 8 points
+ bitmaps[k] = 0;
+ }
+ }
+ break;
+ case DOUBLE:
+ for (int i = 0; i < currentCount; i++) {
+ rowCount++;
+ if (column.isNull(i)) {
+ bitmaps[k] = bitmaps[k] << 1;
+ } else {
+ bitmaps[k] = (bitmaps[k] << 1) | FLAG;
+ dataOutputStream.writeDouble(column.getDouble(i));
+ valueOccupation[k] += 8;
+ }
+ if (rowCount != 0 && rowCount % 8 == 0) {
+ dataBitmapOutputStream.writeByte(bitmaps[k]);
+ // we should clear the bitmap every 8 points
+ bitmaps[k] = 0;
+ }
+ }
+ break;
+ case BOOLEAN:
+ for (int i = 0; i < currentCount; i++) {
+ rowCount++;
+ if (column.isNull(i)) {
+ bitmaps[k] = bitmaps[k] << 1;
+ } else {
+ bitmaps[k] = (bitmaps[k] << 1) | FLAG;
+ dataOutputStream.writeBoolean(column.getBoolean(i));
+ valueOccupation[k] += 1;
+ }
+ if (rowCount != 0 && rowCount % 8 == 0) {
+ dataBitmapOutputStream.writeByte(bitmaps[k]);
+ // we should clear the bitmap every 8 points
+ bitmaps[k] = 0;
+ }
+ }
+ break;
+ case TEXT:
+ for (int i = 0; i < currentCount; i++) {
+ rowCount++;
+ if (column.isNull(i)) {
+ bitmaps[k] = bitmaps[k] << 1;
+ } else {
+ bitmaps[k] = (bitmaps[k] << 1) | FLAG;
+ Binary binary = column.getBinary(i);
+ dataOutputStream.writeInt(binary.getLength());
+ dataOutputStream.write(binary.getValues());
+ valueOccupation[k] = valueOccupation[k] + 4 + binary.getLength();
+ }
+ if (rowCount != 0 && rowCount % 8 == 0) {
+ dataBitmapOutputStream.writeByte(bitmaps[k]);
+ // we should clear the bitmap every 8 points
+ bitmaps[k] = 0;
+ }
+ }
+ break;
+ default:
+ throw new UnSupportedDataTypeException(
+ String.format("Data type %s is not supported.", type));
+ }
+ if (k != columnNum - 1) {
+ rowCount -= currentCount;
+ }
+ }
+
+ // feed the remaining bitmap
+ int remaining = rowCount % 8;
+ for (int k = 0; k < columnNum; k++) {
+ if (remaining != 0) {
+ DataOutputStream dataBitmapOutputStream = dataOutputStreams[2 * (k + 1)];
+ dataBitmapOutputStream.writeByte(bitmaps[k] << (8 - remaining));
+ }
+ }
+
+ // calculate the time buffer size
+ int timeOccupation = rowCount * 8;
+ ByteBuffer timeBuffer = ByteBuffer.allocate(timeOccupation);
+ timeBuffer.put(byteArrayOutputStreams[0].toByteArray());
+ timeBuffer.flip();
+ tsQueryDataSet.setTime(timeBuffer);
+
+ // calculate the bitmap buffer size
+ int bitmapOccupation = (rowCount + 7) / 8;
+
+ List<ByteBuffer> bitmapList = new LinkedList<>();
+ List<ByteBuffer> valueList = new LinkedList<>();
+ for (int i = 1; i < byteArrayOutputStreams.length; i += 2) {
+ ByteBuffer valueBuffer = ByteBuffer.allocate(valueOccupation[(i - 1) / 2]);
+ valueBuffer.put(byteArrayOutputStreams[i].toByteArray());
+ valueBuffer.flip();
+ valueList.add(valueBuffer);
+
+ ByteBuffer bitmapBuffer = ByteBuffer.allocate(bitmapOccupation);
+ bitmapBuffer.put(byteArrayOutputStreams[i + 1].toByteArray());
+ bitmapBuffer.flip();
+ bitmapList.add(bitmapBuffer);
+ }
+ tsQueryDataSet.setBitmapList(bitmapList);
+ tsQueryDataSet.setValueList(valueList);
+
+ windowSet.add(tsQueryDataSet);
+ }
+ return windowSet;
+ }
+
public static long[] readTimesFromBuffer(ByteBuffer buffer, int size) {
long[] times = new long[size];
for (int i = 0; i < size; i++) {
diff --git a/session/src/main/java/org/apache/iotdb/session/SessionConnection.java b/session/src/main/java/org/apache/iotdb/session/SessionConnection.java
index 1cfa1666c3..f0a11bcecc 100644
--- a/session/src/main/java/org/apache/iotdb/session/SessionConnection.java
+++ b/session/src/main/java/org/apache/iotdb/session/SessionConnection.java
@@ -506,7 +506,7 @@ public class SessionConnection {
TSFetchWindowBatchResp resp;
try {
- resp = client.fetchWindowBatch(req);
+ resp = client.fetchWindowBatchV2(req);
RpcUtils.verifySuccess(resp.getStatus());
} catch (TException e) {
throw new StatementExecutionException("");
diff --git a/thrift/src/main/thrift/client.thrift b/thrift/src/main/thrift/client.thrift
index 6d5f6cfc11..f97fa4012e 100644
--- a/thrift/src/main/thrift/client.thrift
+++ b/thrift/src/main/thrift/client.thrift
@@ -434,7 +434,8 @@ struct TSFetchWindowBatchResp {
2: required list<string> columnNameList
3: required list<string> columnTypeList
4: required map<string, i32> columnNameIndexMap
- 5: required list<list<binary>> windowBatch
+ 5: optional list<list<binary>> windowBatch
+ 6: optional list<TSQueryDataSet> windowBatchDataSetList
}
// The sender and receiver need to check some info to confirm validity
@@ -586,4 +587,6 @@ service IClientRPCService {
TSConnectionInfoResp fetchAllConnectionsInfo();
TSFetchWindowBatchResp fetchWindowBatch(1:TSFetchWindowBatchReq req);
+
+ TSFetchWindowBatchResp fetchWindowBatchV2(1:TSFetchWindowBatchReq req);
}
\ No newline at end of file