You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by hu...@apache.org on 2022/11/01 12:31:35 UTC

[iotdb] 06/07: implement java session

This is an automated email from the ASF dual-hosted git repository.

hui pushed a commit to branch ml/windowSet
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit db93dfb3f79efcc3d7e9ff7303daeb7c85ee03c2
Author: Minghui Liu <li...@foxmail.com>
AuthorDate: Tue Nov 1 17:13:00 2022 +0800

    implement java session
---
 .../service/thrift/impl/ClientRPCServiceImpl.java  |  14 +-
 .../apache/iotdb/db/utils/QueryDataSetUtils.java   | 207 ++-------------------
 .../java/org/apache/iotdb/session/ISession.java    |  10 +
 .../java/org/apache/iotdb/session/Session.java     |  14 ++
 .../apache/iotdb/session/SessionConnection.java    |  45 +++++
 thrift/src/main/thrift/client.thrift               |   8 +-
 6 files changed, 98 insertions(+), 200 deletions(-)

diff --git a/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/ClientRPCServiceImpl.java b/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/ClientRPCServiceImpl.java
index 24eb5bf432..1d7d875999 100644
--- a/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/ClientRPCServiceImpl.java
+++ b/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/ClientRPCServiceImpl.java
@@ -458,9 +458,9 @@ public class ClientRPCServiceImpl implements IClientRPCServiceWithHandler {
       IQueryExecution queryExecution = COORDINATOR.getQueryExecution(queryId);
 
       try (SetThreadName threadName = new SetThreadName(result.queryId.getId())) {
-        TSFetchWindowSetResp resp = createResponse(queryExecution.getDatasetHeader());
-        resp.setStatus(result.status);
-        resp.setQueryDataSetList(QueryDataSetUtils.convertTsBlocksToWindowSet(queryExecution));
+        TSFetchWindowSetResp resp =
+            createTSFetchWindowSetResp(queryExecution.getDatasetHeader(), queryId);
+        resp.setQueryResultList(QueryDataSetUtils.convertTsBlocksToWindowSet(queryExecution));
         return resp;
       }
     } catch (Exception e) {
@@ -1798,10 +1798,12 @@ public class ClientRPCServiceImpl implements IClientRPCServiceWithHandler {
     return resp;
   }
 
-  private TSFetchWindowSetResp createResponse(DatasetHeader header) {
+  private TSFetchWindowSetResp createTSFetchWindowSetResp(DatasetHeader header, long queryId) {
     TSFetchWindowSetResp resp = RpcUtils.getTSFetchWindowSetResp(TSStatusCode.SUCCESS_STATUS);
-    resp.setColumns(header.getRespColumns());
-    resp.setDataTypeList(header.getRespDataTypeList());
+    resp.setColumnNameList(header.getRespColumns());
+    resp.setColumnTypeList(header.getRespDataTypeList());
+    resp.setColumnNameIndexMap(header.getColumnNameIndexMap());
+    resp.setQueryId(queryId);
     return resp;
   }
 
diff --git a/server/src/main/java/org/apache/iotdb/db/utils/QueryDataSetUtils.java b/server/src/main/java/org/apache/iotdb/db/utils/QueryDataSetUtils.java
index 50ae6ad575..9ac3cbd404 100644
--- a/server/src/main/java/org/apache/iotdb/db/utils/QueryDataSetUtils.java
+++ b/server/src/main/java/org/apache/iotdb/db/utils/QueryDataSetUtils.java
@@ -400,205 +400,30 @@ public class QueryDataSetUtils {
     return res;
   }
 
-  public static List<TSQueryDataSet> convertTsBlocksToWindowSet(IQueryExecution queryExecution)
-      throws IoTDBException, IOException {
-    List<TSQueryDataSet> windowSet = new ArrayList<>();
-
-    int columnNum = queryExecution.getOutputValueColumnCount();
-    // one time column and each value column has an actual value buffer and a bitmap value to
-    // indicate whether it is a null
-    int columnNumWithTime = columnNum * 2 + 1;
+  public static List<List<ByteBuffer>> convertTsBlocksToWindowSet(IQueryExecution queryExecution)
+      throws IoTDBException {
+    List<List<ByteBuffer>> windowSet = new ArrayList<>();
 
     while (true) {
-      Optional<TsBlock> optionalTsBlock = queryExecution.getBatchResult();
-      if (!optionalTsBlock.isPresent()) {
+      Optional<ByteBuffer> optionalByteBuffer = queryExecution.getByteBufferBatchResult();
+      if (!optionalByteBuffer.isPresent()) {
         break;
       }
-      TsBlock tsBlock = optionalTsBlock.get();
-      if (tsBlock.isEmpty()) {
-        continue;
-      }
-
-      TSQueryDataSet tsQueryDataSet = new TSQueryDataSet();
-
-      DataOutputStream[] dataOutputStreams = new DataOutputStream[columnNumWithTime];
-      ByteArrayOutputStream[] byteArrayOutputStreams = new ByteArrayOutputStream[columnNumWithTime];
-      for (int i = 0; i < columnNumWithTime; i++) {
-        byteArrayOutputStreams[i] = new ByteArrayOutputStream();
-        dataOutputStreams[i] = new DataOutputStream(byteArrayOutputStreams[i]);
-      }
-
-      int rowCount = 0;
-      int[] valueOccupation = new int[columnNum];
-
-      // used to record a bitmap for every 8 points
-      int[] bitmaps = new int[columnNum];
-
-      int currentCount = tsBlock.getPositionCount();
-      // serialize time column
-      for (int i = 0; i < currentCount; i++) {
-        // use columnOutput to write byte array
-        dataOutputStreams[0].writeLong(tsBlock.getTimeByIndex(i));
-      }
-
-      // serialize each value column and its bitmap
-      for (int k = 0; k < columnNum; k++) {
-        // get DataOutputStream for current value column and its bitmap
-        DataOutputStream dataOutputStream = dataOutputStreams[2 * k + 1];
-        DataOutputStream dataBitmapOutputStream = dataOutputStreams[2 * (k + 1)];
 
-        Column column = tsBlock.getColumn(k);
-        TSDataType type = column.getDataType();
-        switch (type) {
-          case INT32:
-            for (int i = 0; i < currentCount; i++) {
-              rowCount++;
-              if (column.isNull(i)) {
-                bitmaps[k] = bitmaps[k] << 1;
-              } else {
-                bitmaps[k] = (bitmaps[k] << 1) | FLAG;
-                dataOutputStream.writeInt(column.getInt(i));
-                valueOccupation[k] += 4;
-              }
-              if (rowCount != 0 && rowCount % 8 == 0) {
-                dataBitmapOutputStream.writeByte(bitmaps[k]);
-                // we should clear the bitmap every 8 points
-                bitmaps[k] = 0;
-              }
-            }
-            break;
-          case INT64:
-            for (int i = 0; i < currentCount; i++) {
-              rowCount++;
-              if (column.isNull(i)) {
-                bitmaps[k] = bitmaps[k] << 1;
-              } else {
-                bitmaps[k] = (bitmaps[k] << 1) | FLAG;
-                dataOutputStream.writeLong(column.getLong(i));
-                valueOccupation[k] += 8;
-              }
-              if (rowCount != 0 && rowCount % 8 == 0) {
-                dataBitmapOutputStream.writeByte(bitmaps[k]);
-                // we should clear the bitmap every 8 points
-                bitmaps[k] = 0;
-              }
-            }
-            break;
-          case FLOAT:
-            for (int i = 0; i < currentCount; i++) {
-              rowCount++;
-              if (column.isNull(i)) {
-                bitmaps[k] = bitmaps[k] << 1;
-              } else {
-                bitmaps[k] = (bitmaps[k] << 1) | FLAG;
-                dataOutputStream.writeFloat(column.getFloat(i));
-                valueOccupation[k] += 4;
-              }
-              if (rowCount != 0 && rowCount % 8 == 0) {
-                dataBitmapOutputStream.writeByte(bitmaps[k]);
-                // we should clear the bitmap every 8 points
-                bitmaps[k] = 0;
-              }
-            }
-            break;
-          case DOUBLE:
-            for (int i = 0; i < currentCount; i++) {
-              rowCount++;
-              if (column.isNull(i)) {
-                bitmaps[k] = bitmaps[k] << 1;
-              } else {
-                bitmaps[k] = (bitmaps[k] << 1) | FLAG;
-                dataOutputStream.writeDouble(column.getDouble(i));
-                valueOccupation[k] += 8;
-              }
-              if (rowCount != 0 && rowCount % 8 == 0) {
-                dataBitmapOutputStream.writeByte(bitmaps[k]);
-                // we should clear the bitmap every 8 points
-                bitmaps[k] = 0;
-              }
-            }
-            break;
-          case BOOLEAN:
-            for (int i = 0; i < currentCount; i++) {
-              rowCount++;
-              if (column.isNull(i)) {
-                bitmaps[k] = bitmaps[k] << 1;
-              } else {
-                bitmaps[k] = (bitmaps[k] << 1) | FLAG;
-                dataOutputStream.writeBoolean(column.getBoolean(i));
-                valueOccupation[k] += 1;
-              }
-              if (rowCount != 0 && rowCount % 8 == 0) {
-                dataBitmapOutputStream.writeByte(bitmaps[k]);
-                // we should clear the bitmap every 8 points
-                bitmaps[k] = 0;
-              }
-            }
-            break;
-          case TEXT:
-            for (int i = 0; i < currentCount; i++) {
-              rowCount++;
-              if (column.isNull(i)) {
-                bitmaps[k] = bitmaps[k] << 1;
-              } else {
-                bitmaps[k] = (bitmaps[k] << 1) | FLAG;
-                Binary binary = column.getBinary(i);
-                dataOutputStream.writeInt(binary.getLength());
-                dataOutputStream.write(binary.getValues());
-                valueOccupation[k] = valueOccupation[k] + 4 + binary.getLength();
-              }
-              if (rowCount != 0 && rowCount % 8 == 0) {
-                dataBitmapOutputStream.writeByte(bitmaps[k]);
-                // we should clear the bitmap every 8 points
-                bitmaps[k] = 0;
-              }
-            }
-            break;
-          default:
-            throw new UnSupportedDataTypeException(
-                String.format("Data type %s is not supported.", type));
-        }
-        if (k != columnNum - 1) {
-          rowCount -= currentCount;
-        }
-      }
-
-      // feed the remaining bitmap
-      int remaining = rowCount % 8;
-      for (int k = 0; k < columnNum; k++) {
-        if (remaining != 0) {
-          DataOutputStream dataBitmapOutputStream = dataOutputStreams[2 * (k + 1)];
-          dataBitmapOutputStream.writeByte(bitmaps[k] << (8 - remaining));
-        }
+      List<ByteBuffer> res = new ArrayList<>();
+      ByteBuffer byteBuffer = optionalByteBuffer.get();
+      byteBuffer.mark();
+      int valueColumnCount = byteBuffer.getInt();
+      for (int i = 0; i < valueColumnCount; i++) {
+        byteBuffer.get();
       }
-
-      // calculate the time buffer size
-      int timeOccupation = rowCount * 8;
-      ByteBuffer timeBuffer = ByteBuffer.allocate(timeOccupation);
-      timeBuffer.put(byteArrayOutputStreams[0].toByteArray());
-      timeBuffer.flip();
-      tsQueryDataSet.setTime(timeBuffer);
-
-      // calculate the bitmap buffer size
-      int bitmapOccupation = (rowCount + 7) / 8;
-
-      List<ByteBuffer> bitmapList = new LinkedList<>();
-      List<ByteBuffer> valueList = new LinkedList<>();
-      for (int i = 1; i < byteArrayOutputStreams.length; i += 2) {
-        ByteBuffer valueBuffer = ByteBuffer.allocate(valueOccupation[(i - 1) / 2]);
-        valueBuffer.put(byteArrayOutputStreams[i].toByteArray());
-        valueBuffer.flip();
-        valueList.add(valueBuffer);
-
-        ByteBuffer bitmapBuffer = ByteBuffer.allocate(bitmapOccupation);
-        bitmapBuffer.put(byteArrayOutputStreams[i + 1].toByteArray());
-        bitmapBuffer.flip();
-        bitmapList.add(bitmapBuffer);
+      int positionCount = byteBuffer.getInt();
+      byteBuffer.reset();
+      if (positionCount != 0) {
+        res.add(byteBuffer);
       }
-      tsQueryDataSet.setBitmapList(bitmapList);
-      tsQueryDataSet.setValueList(valueList);
 
-      windowSet.add(tsQueryDataSet);
+      windowSet.add(res);
     }
     return windowSet;
   }
diff --git a/session/src/main/java/org/apache/iotdb/session/ISession.java b/session/src/main/java/org/apache/iotdb/session/ISession.java
index 3f20aa228d..1b140e0d9f 100644
--- a/session/src/main/java/org/apache/iotdb/session/ISession.java
+++ b/session/src/main/java/org/apache/iotdb/session/ISession.java
@@ -440,4 +440,14 @@ public interface ISession extends AutoCloseable {
   void sortTablet(Tablet tablet);
 
   TSConnectionInfoResp fetchAllConnections() throws IoTDBConnectionException;
+
+  List<SessionDataSet> fetchWindowSet(
+      List<String> queryPaths,
+      String functionName,
+      long startTime,
+      long endTime,
+      long interval,
+      long slidingStep,
+      List<Integer> indexes)
+      throws StatementExecutionException;
 }
diff --git a/session/src/main/java/org/apache/iotdb/session/Session.java b/session/src/main/java/org/apache/iotdb/session/Session.java
index fd18e8a746..68e35b367b 100644
--- a/session/src/main/java/org/apache/iotdb/session/Session.java
+++ b/session/src/main/java/org/apache/iotdb/session/Session.java
@@ -3263,6 +3263,20 @@ public class Session implements ISession {
     return defaultSessionConnection.fetchAllConnections();
   }
 
+  @Override
+  public List<SessionDataSet> fetchWindowSet(
+      List<String> queryPaths,
+      String functionName,
+      long startTime,
+      long endTime,
+      long interval,
+      long slidingStep,
+      List<Integer> indexes)
+      throws StatementExecutionException {
+    return defaultSessionConnection.fetchWindowSet(
+        queryPaths, functionName, startTime, endTime, interval, slidingStep, indexes);
+  }
+
   public static class Builder {
     private String host = SessionConfig.DEFAULT_HOST;
     private int rpcPort = SessionConfig.DEFAULT_PORT;
diff --git a/session/src/main/java/org/apache/iotdb/session/SessionConnection.java b/session/src/main/java/org/apache/iotdb/session/SessionConnection.java
index 630f6ffdeb..5f24969113 100644
--- a/session/src/main/java/org/apache/iotdb/session/SessionConnection.java
+++ b/session/src/main/java/org/apache/iotdb/session/SessionConnection.java
@@ -27,6 +27,7 @@ import org.apache.iotdb.rpc.RpcTransportFactory;
 import org.apache.iotdb.rpc.RpcUtils;
 import org.apache.iotdb.rpc.StatementExecutionException;
 import org.apache.iotdb.service.rpc.thrift.IClientRPCService;
+import org.apache.iotdb.service.rpc.thrift.TGroupByTimeParameter;
 import org.apache.iotdb.service.rpc.thrift.TSAppendSchemaTemplateReq;
 import org.apache.iotdb.service.rpc.thrift.TSCloseSessionReq;
 import org.apache.iotdb.service.rpc.thrift.TSConnectionInfoResp;
@@ -38,6 +39,8 @@ import org.apache.iotdb.service.rpc.thrift.TSDeleteDataReq;
 import org.apache.iotdb.service.rpc.thrift.TSDropSchemaTemplateReq;
 import org.apache.iotdb.service.rpc.thrift.TSExecuteStatementReq;
 import org.apache.iotdb.service.rpc.thrift.TSExecuteStatementResp;
+import org.apache.iotdb.service.rpc.thrift.TSFetchWindowSetReq;
+import org.apache.iotdb.service.rpc.thrift.TSFetchWindowSetResp;
 import org.apache.iotdb.service.rpc.thrift.TSInsertRecordReq;
 import org.apache.iotdb.service.rpc.thrift.TSInsertRecordsOfOneDeviceReq;
 import org.apache.iotdb.service.rpc.thrift.TSInsertRecordsReq;
@@ -66,6 +69,7 @@ import org.apache.thrift.transport.TTransportException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.nio.ByteBuffer;
 import java.time.ZoneId;
 import java.util.ArrayList;
 import java.util.List;
@@ -481,6 +485,47 @@ public class SessionConnection {
         tsExecuteStatementResp.isIgnoreTimeStamp());
   }
 
+  public List<SessionDataSet> fetchWindowSet(
+      List<String> queryPaths,
+      String functionName,
+      long startTime,
+      long endTime,
+      long interval,
+      long slidingStep,
+      List<Integer> indexes)
+      throws StatementExecutionException {
+    TSFetchWindowSetReq req =
+        new TSFetchWindowSetReq(
+            sessionId,
+            statementId,
+            queryPaths,
+            new TGroupByTimeParameter(startTime, endTime, interval, slidingStep, indexes));
+    TSFetchWindowSetResp resp;
+    try {
+      resp = client.fetchWindowSet(req);
+      RpcUtils.verifySuccess(resp.getStatus());
+    } catch (TException e) {
+      throw new StatementExecutionException("");
+    }
+
+    List<SessionDataSet> windowSet = new ArrayList<>();
+    for (List<ByteBuffer> queryResult : resp.getQueryResultList()) {
+      windowSet.add(
+          new SessionDataSet(
+              "",
+              resp.columnNameList,
+              resp.columnTypeList,
+              resp.columnNameIndexMap,
+              resp.queryId,
+              statementId,
+              client,
+              sessionId,
+              queryResult,
+              false));
+    }
+    return windowSet;
+  }
+
   protected void insertRecord(TSInsertRecordReq request)
       throws IoTDBConnectionException, StatementExecutionException, RedirectException {
     request.setSessionId(sessionId);
diff --git a/thrift/src/main/thrift/client.thrift b/thrift/src/main/thrift/client.thrift
index 4f956b0173..429853e123 100644
--- a/thrift/src/main/thrift/client.thrift
+++ b/thrift/src/main/thrift/client.thrift
@@ -431,9 +431,11 @@ struct TSFetchWindowSetReq {
 
 struct TSFetchWindowSetResp {
   1: required common.TSStatus status
-  2: required list<string> columns
-  3: required list<string> dataTypeList
-  4: required list<TSQueryDataSet> queryDataSetList
+  2: required i64 queryId
+  3: required list<string> columnNameList
+  4: required list<string> columnTypeList
+  5: required map<string, i32> columnNameIndexMap
+  6: required list<list<binary>> queryResultList
 }
 
 // The sender and receiver need to check some info to confirm validity