You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by hu...@apache.org on 2022/11/01 12:31:35 UTC
[iotdb] 06/07: implement java session
This is an automated email from the ASF dual-hosted git repository.
hui pushed a commit to branch ml/windowSet
in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit db93dfb3f79efcc3d7e9ff7303daeb7c85ee03c2
Author: Minghui Liu <li...@foxmail.com>
AuthorDate: Tue Nov 1 17:13:00 2022 +0800
implement java session
---
.../service/thrift/impl/ClientRPCServiceImpl.java | 14 +-
.../apache/iotdb/db/utils/QueryDataSetUtils.java | 207 ++-------------------
.../java/org/apache/iotdb/session/ISession.java | 10 +
.../java/org/apache/iotdb/session/Session.java | 14 ++
.../apache/iotdb/session/SessionConnection.java | 45 +++++
thrift/src/main/thrift/client.thrift | 8 +-
6 files changed, 98 insertions(+), 200 deletions(-)
diff --git a/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/ClientRPCServiceImpl.java b/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/ClientRPCServiceImpl.java
index 24eb5bf432..1d7d875999 100644
--- a/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/ClientRPCServiceImpl.java
+++ b/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/ClientRPCServiceImpl.java
@@ -458,9 +458,9 @@ public class ClientRPCServiceImpl implements IClientRPCServiceWithHandler {
IQueryExecution queryExecution = COORDINATOR.getQueryExecution(queryId);
try (SetThreadName threadName = new SetThreadName(result.queryId.getId())) {
- TSFetchWindowSetResp resp = createResponse(queryExecution.getDatasetHeader());
- resp.setStatus(result.status);
- resp.setQueryDataSetList(QueryDataSetUtils.convertTsBlocksToWindowSet(queryExecution));
+ TSFetchWindowSetResp resp =
+ createTSFetchWindowSetResp(queryExecution.getDatasetHeader(), queryId);
+ resp.setQueryResultList(QueryDataSetUtils.convertTsBlocksToWindowSet(queryExecution));
return resp;
}
} catch (Exception e) {
@@ -1798,10 +1798,12 @@ public class ClientRPCServiceImpl implements IClientRPCServiceWithHandler {
return resp;
}
- private TSFetchWindowSetResp createResponse(DatasetHeader header) {
+ private TSFetchWindowSetResp createTSFetchWindowSetResp(DatasetHeader header, long queryId) {
TSFetchWindowSetResp resp = RpcUtils.getTSFetchWindowSetResp(TSStatusCode.SUCCESS_STATUS);
- resp.setColumns(header.getRespColumns());
- resp.setDataTypeList(header.getRespDataTypeList());
+ resp.setColumnNameList(header.getRespColumns());
+ resp.setColumnTypeList(header.getRespDataTypeList());
+ resp.setColumnNameIndexMap(header.getColumnNameIndexMap());
+ resp.setQueryId(queryId);
return resp;
}
diff --git a/server/src/main/java/org/apache/iotdb/db/utils/QueryDataSetUtils.java b/server/src/main/java/org/apache/iotdb/db/utils/QueryDataSetUtils.java
index 50ae6ad575..9ac3cbd404 100644
--- a/server/src/main/java/org/apache/iotdb/db/utils/QueryDataSetUtils.java
+++ b/server/src/main/java/org/apache/iotdb/db/utils/QueryDataSetUtils.java
@@ -400,205 +400,30 @@ public class QueryDataSetUtils {
return res;
}
- public static List<TSQueryDataSet> convertTsBlocksToWindowSet(IQueryExecution queryExecution)
- throws IoTDBException, IOException {
- List<TSQueryDataSet> windowSet = new ArrayList<>();
-
- int columnNum = queryExecution.getOutputValueColumnCount();
- // one time column and each value column has an actual value buffer and a bitmap value to
- // indicate whether it is a null
- int columnNumWithTime = columnNum * 2 + 1;
+ public static List<List<ByteBuffer>> convertTsBlocksToWindowSet(IQueryExecution queryExecution)
+ throws IoTDBException {
+ List<List<ByteBuffer>> windowSet = new ArrayList<>();
while (true) {
- Optional<TsBlock> optionalTsBlock = queryExecution.getBatchResult();
- if (!optionalTsBlock.isPresent()) {
+ Optional<ByteBuffer> optionalByteBuffer = queryExecution.getByteBufferBatchResult();
+ if (!optionalByteBuffer.isPresent()) {
break;
}
- TsBlock tsBlock = optionalTsBlock.get();
- if (tsBlock.isEmpty()) {
- continue;
- }
-
- TSQueryDataSet tsQueryDataSet = new TSQueryDataSet();
-
- DataOutputStream[] dataOutputStreams = new DataOutputStream[columnNumWithTime];
- ByteArrayOutputStream[] byteArrayOutputStreams = new ByteArrayOutputStream[columnNumWithTime];
- for (int i = 0; i < columnNumWithTime; i++) {
- byteArrayOutputStreams[i] = new ByteArrayOutputStream();
- dataOutputStreams[i] = new DataOutputStream(byteArrayOutputStreams[i]);
- }
-
- int rowCount = 0;
- int[] valueOccupation = new int[columnNum];
-
- // used to record a bitmap for every 8 points
- int[] bitmaps = new int[columnNum];
-
- int currentCount = tsBlock.getPositionCount();
- // serialize time column
- for (int i = 0; i < currentCount; i++) {
- // use columnOutput to write byte array
- dataOutputStreams[0].writeLong(tsBlock.getTimeByIndex(i));
- }
-
- // serialize each value column and its bitmap
- for (int k = 0; k < columnNum; k++) {
- // get DataOutputStream for current value column and its bitmap
- DataOutputStream dataOutputStream = dataOutputStreams[2 * k + 1];
- DataOutputStream dataBitmapOutputStream = dataOutputStreams[2 * (k + 1)];
- Column column = tsBlock.getColumn(k);
- TSDataType type = column.getDataType();
- switch (type) {
- case INT32:
- for (int i = 0; i < currentCount; i++) {
- rowCount++;
- if (column.isNull(i)) {
- bitmaps[k] = bitmaps[k] << 1;
- } else {
- bitmaps[k] = (bitmaps[k] << 1) | FLAG;
- dataOutputStream.writeInt(column.getInt(i));
- valueOccupation[k] += 4;
- }
- if (rowCount != 0 && rowCount % 8 == 0) {
- dataBitmapOutputStream.writeByte(bitmaps[k]);
- // we should clear the bitmap every 8 points
- bitmaps[k] = 0;
- }
- }
- break;
- case INT64:
- for (int i = 0; i < currentCount; i++) {
- rowCount++;
- if (column.isNull(i)) {
- bitmaps[k] = bitmaps[k] << 1;
- } else {
- bitmaps[k] = (bitmaps[k] << 1) | FLAG;
- dataOutputStream.writeLong(column.getLong(i));
- valueOccupation[k] += 8;
- }
- if (rowCount != 0 && rowCount % 8 == 0) {
- dataBitmapOutputStream.writeByte(bitmaps[k]);
- // we should clear the bitmap every 8 points
- bitmaps[k] = 0;
- }
- }
- break;
- case FLOAT:
- for (int i = 0; i < currentCount; i++) {
- rowCount++;
- if (column.isNull(i)) {
- bitmaps[k] = bitmaps[k] << 1;
- } else {
- bitmaps[k] = (bitmaps[k] << 1) | FLAG;
- dataOutputStream.writeFloat(column.getFloat(i));
- valueOccupation[k] += 4;
- }
- if (rowCount != 0 && rowCount % 8 == 0) {
- dataBitmapOutputStream.writeByte(bitmaps[k]);
- // we should clear the bitmap every 8 points
- bitmaps[k] = 0;
- }
- }
- break;
- case DOUBLE:
- for (int i = 0; i < currentCount; i++) {
- rowCount++;
- if (column.isNull(i)) {
- bitmaps[k] = bitmaps[k] << 1;
- } else {
- bitmaps[k] = (bitmaps[k] << 1) | FLAG;
- dataOutputStream.writeDouble(column.getDouble(i));
- valueOccupation[k] += 8;
- }
- if (rowCount != 0 && rowCount % 8 == 0) {
- dataBitmapOutputStream.writeByte(bitmaps[k]);
- // we should clear the bitmap every 8 points
- bitmaps[k] = 0;
- }
- }
- break;
- case BOOLEAN:
- for (int i = 0; i < currentCount; i++) {
- rowCount++;
- if (column.isNull(i)) {
- bitmaps[k] = bitmaps[k] << 1;
- } else {
- bitmaps[k] = (bitmaps[k] << 1) | FLAG;
- dataOutputStream.writeBoolean(column.getBoolean(i));
- valueOccupation[k] += 1;
- }
- if (rowCount != 0 && rowCount % 8 == 0) {
- dataBitmapOutputStream.writeByte(bitmaps[k]);
- // we should clear the bitmap every 8 points
- bitmaps[k] = 0;
- }
- }
- break;
- case TEXT:
- for (int i = 0; i < currentCount; i++) {
- rowCount++;
- if (column.isNull(i)) {
- bitmaps[k] = bitmaps[k] << 1;
- } else {
- bitmaps[k] = (bitmaps[k] << 1) | FLAG;
- Binary binary = column.getBinary(i);
- dataOutputStream.writeInt(binary.getLength());
- dataOutputStream.write(binary.getValues());
- valueOccupation[k] = valueOccupation[k] + 4 + binary.getLength();
- }
- if (rowCount != 0 && rowCount % 8 == 0) {
- dataBitmapOutputStream.writeByte(bitmaps[k]);
- // we should clear the bitmap every 8 points
- bitmaps[k] = 0;
- }
- }
- break;
- default:
- throw new UnSupportedDataTypeException(
- String.format("Data type %s is not supported.", type));
- }
- if (k != columnNum - 1) {
- rowCount -= currentCount;
- }
- }
-
- // feed the remaining bitmap
- int remaining = rowCount % 8;
- for (int k = 0; k < columnNum; k++) {
- if (remaining != 0) {
- DataOutputStream dataBitmapOutputStream = dataOutputStreams[2 * (k + 1)];
- dataBitmapOutputStream.writeByte(bitmaps[k] << (8 - remaining));
- }
+ List<ByteBuffer> res = new ArrayList<>();
+ ByteBuffer byteBuffer = optionalByteBuffer.get();
+ byteBuffer.mark();
+ int valueColumnCount = byteBuffer.getInt();
+ for (int i = 0; i < valueColumnCount; i++) {
+ byteBuffer.get();
}
-
- // calculate the time buffer size
- int timeOccupation = rowCount * 8;
- ByteBuffer timeBuffer = ByteBuffer.allocate(timeOccupation);
- timeBuffer.put(byteArrayOutputStreams[0].toByteArray());
- timeBuffer.flip();
- tsQueryDataSet.setTime(timeBuffer);
-
- // calculate the bitmap buffer size
- int bitmapOccupation = (rowCount + 7) / 8;
-
- List<ByteBuffer> bitmapList = new LinkedList<>();
- List<ByteBuffer> valueList = new LinkedList<>();
- for (int i = 1; i < byteArrayOutputStreams.length; i += 2) {
- ByteBuffer valueBuffer = ByteBuffer.allocate(valueOccupation[(i - 1) / 2]);
- valueBuffer.put(byteArrayOutputStreams[i].toByteArray());
- valueBuffer.flip();
- valueList.add(valueBuffer);
-
- ByteBuffer bitmapBuffer = ByteBuffer.allocate(bitmapOccupation);
- bitmapBuffer.put(byteArrayOutputStreams[i + 1].toByteArray());
- bitmapBuffer.flip();
- bitmapList.add(bitmapBuffer);
+ int positionCount = byteBuffer.getInt();
+ byteBuffer.reset();
+ if (positionCount != 0) {
+ res.add(byteBuffer);
}
- tsQueryDataSet.setBitmapList(bitmapList);
- tsQueryDataSet.setValueList(valueList);
- windowSet.add(tsQueryDataSet);
+ windowSet.add(res);
}
return windowSet;
}
diff --git a/session/src/main/java/org/apache/iotdb/session/ISession.java b/session/src/main/java/org/apache/iotdb/session/ISession.java
index 3f20aa228d..1b140e0d9f 100644
--- a/session/src/main/java/org/apache/iotdb/session/ISession.java
+++ b/session/src/main/java/org/apache/iotdb/session/ISession.java
@@ -440,4 +440,14 @@ public interface ISession extends AutoCloseable {
void sortTablet(Tablet tablet);
TSConnectionInfoResp fetchAllConnections() throws IoTDBConnectionException;
+
+ List<SessionDataSet> fetchWindowSet(
+ List<String> queryPaths,
+ String functionName,
+ long startTime,
+ long endTime,
+ long interval,
+ long slidingStep,
+ List<Integer> indexes)
+ throws StatementExecutionException;
}
diff --git a/session/src/main/java/org/apache/iotdb/session/Session.java b/session/src/main/java/org/apache/iotdb/session/Session.java
index fd18e8a746..68e35b367b 100644
--- a/session/src/main/java/org/apache/iotdb/session/Session.java
+++ b/session/src/main/java/org/apache/iotdb/session/Session.java
@@ -3263,6 +3263,20 @@ public class Session implements ISession {
return defaultSessionConnection.fetchAllConnections();
}
+ @Override
+ public List<SessionDataSet> fetchWindowSet(
+ List<String> queryPaths,
+ String functionName,
+ long startTime,
+ long endTime,
+ long interval,
+ long slidingStep,
+ List<Integer> indexes)
+ throws StatementExecutionException {
+ return defaultSessionConnection.fetchWindowSet(
+ queryPaths, functionName, startTime, endTime, interval, slidingStep, indexes);
+ }
+
public static class Builder {
private String host = SessionConfig.DEFAULT_HOST;
private int rpcPort = SessionConfig.DEFAULT_PORT;
diff --git a/session/src/main/java/org/apache/iotdb/session/SessionConnection.java b/session/src/main/java/org/apache/iotdb/session/SessionConnection.java
index 630f6ffdeb..5f24969113 100644
--- a/session/src/main/java/org/apache/iotdb/session/SessionConnection.java
+++ b/session/src/main/java/org/apache/iotdb/session/SessionConnection.java
@@ -27,6 +27,7 @@ import org.apache.iotdb.rpc.RpcTransportFactory;
import org.apache.iotdb.rpc.RpcUtils;
import org.apache.iotdb.rpc.StatementExecutionException;
import org.apache.iotdb.service.rpc.thrift.IClientRPCService;
+import org.apache.iotdb.service.rpc.thrift.TGroupByTimeParameter;
import org.apache.iotdb.service.rpc.thrift.TSAppendSchemaTemplateReq;
import org.apache.iotdb.service.rpc.thrift.TSCloseSessionReq;
import org.apache.iotdb.service.rpc.thrift.TSConnectionInfoResp;
@@ -38,6 +39,8 @@ import org.apache.iotdb.service.rpc.thrift.TSDeleteDataReq;
import org.apache.iotdb.service.rpc.thrift.TSDropSchemaTemplateReq;
import org.apache.iotdb.service.rpc.thrift.TSExecuteStatementReq;
import org.apache.iotdb.service.rpc.thrift.TSExecuteStatementResp;
+import org.apache.iotdb.service.rpc.thrift.TSFetchWindowSetReq;
+import org.apache.iotdb.service.rpc.thrift.TSFetchWindowSetResp;
import org.apache.iotdb.service.rpc.thrift.TSInsertRecordReq;
import org.apache.iotdb.service.rpc.thrift.TSInsertRecordsOfOneDeviceReq;
import org.apache.iotdb.service.rpc.thrift.TSInsertRecordsReq;
@@ -66,6 +69,7 @@ import org.apache.thrift.transport.TTransportException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import java.nio.ByteBuffer;
import java.time.ZoneId;
import java.util.ArrayList;
import java.util.List;
@@ -481,6 +485,47 @@ public class SessionConnection {
tsExecuteStatementResp.isIgnoreTimeStamp());
}
+ public List<SessionDataSet> fetchWindowSet(
+ List<String> queryPaths,
+ String functionName,
+ long startTime,
+ long endTime,
+ long interval,
+ long slidingStep,
+ List<Integer> indexes)
+ throws StatementExecutionException {
+ TSFetchWindowSetReq req =
+ new TSFetchWindowSetReq(
+ sessionId,
+ statementId,
+ queryPaths,
+ new TGroupByTimeParameter(startTime, endTime, interval, slidingStep, indexes));
+ TSFetchWindowSetResp resp;
+ try {
+ resp = client.fetchWindowSet(req);
+ RpcUtils.verifySuccess(resp.getStatus());
+ } catch (TException e) {
+ throw new StatementExecutionException("");
+ }
+
+ List<SessionDataSet> windowSet = new ArrayList<>();
+ for (List<ByteBuffer> queryResult : resp.getQueryResultList()) {
+ windowSet.add(
+ new SessionDataSet(
+ "",
+ resp.columnNameList,
+ resp.columnTypeList,
+ resp.columnNameIndexMap,
+ resp.queryId,
+ statementId,
+ client,
+ sessionId,
+ queryResult,
+ false));
+ }
+ return windowSet;
+ }
+
protected void insertRecord(TSInsertRecordReq request)
throws IoTDBConnectionException, StatementExecutionException, RedirectException {
request.setSessionId(sessionId);
diff --git a/thrift/src/main/thrift/client.thrift b/thrift/src/main/thrift/client.thrift
index 4f956b0173..429853e123 100644
--- a/thrift/src/main/thrift/client.thrift
+++ b/thrift/src/main/thrift/client.thrift
@@ -431,9 +431,11 @@ struct TSFetchWindowSetReq {
struct TSFetchWindowSetResp {
1: required common.TSStatus status
- 2: required list<string> columns
- 3: required list<string> dataTypeList
- 4: required list<TSQueryDataSet> queryDataSetList
+ 2: required i64 queryId
+ 3: required list<string> columnNameList
+ 4: required list<string> columnTypeList
+ 5: required map<string, i32> columnNameIndexMap
+ 6: required list<list<binary>> queryResultList
}
// The sender and receiver need to check some info to confirm validity