You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by hu...@apache.org on 2022/11/01 12:31:34 UTC

[iotdb] 05/07: implement fetchWindowSet in ClientRPCServiceImpl

This is an automated email from the ASF dual-hosted git repository.

hui pushed a commit to branch ml/windowSet
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit 7442531210b70324b16ba7ab6784e28cd23e26a3
Author: Minghui Liu <li...@foxmail.com>
AuthorDate: Tue Nov 1 16:36:15 2022 +0800

    implement fetchWindowSet in ClientRPCServiceImpl
---
 .../service/thrift/impl/ClientRPCServiceImpl.java  |  66 ++++++-
 .../db/service/thrift/impl/TSServiceImpl.java      |   6 +-
 .../apache/iotdb/db/utils/QueryDataSetUtils.java   | 203 +++++++++++++++++++++
 .../main/java/org/apache/iotdb/rpc/RpcUtils.java   |  13 ++
 thrift/src/main/thrift/client.thrift               |  20 +-
 5 files changed, 292 insertions(+), 16 deletions(-)

diff --git a/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/ClientRPCServiceImpl.java b/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/ClientRPCServiceImpl.java
index b3d6f41230..24eb5bf432 100644
--- a/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/ClientRPCServiceImpl.java
+++ b/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/ClientRPCServiceImpl.java
@@ -70,8 +70,6 @@ import org.apache.iotdb.metrics.utils.MetricLevel;
 import org.apache.iotdb.rpc.RpcUtils;
 import org.apache.iotdb.rpc.TSStatusCode;
 import org.apache.iotdb.service.rpc.thrift.ServerProperties;
-import org.apache.iotdb.service.rpc.thrift.TFetchWindowSetReq;
-import org.apache.iotdb.service.rpc.thrift.TFetchWindowSetResp;
 import org.apache.iotdb.service.rpc.thrift.TSAppendSchemaTemplateReq;
 import org.apache.iotdb.service.rpc.thrift.TSCancelOperationReq;
 import org.apache.iotdb.service.rpc.thrift.TSCloseOperationReq;
@@ -90,6 +88,8 @@ import org.apache.iotdb.service.rpc.thrift.TSFetchMetadataReq;
 import org.apache.iotdb.service.rpc.thrift.TSFetchMetadataResp;
 import org.apache.iotdb.service.rpc.thrift.TSFetchResultsReq;
 import org.apache.iotdb.service.rpc.thrift.TSFetchResultsResp;
+import org.apache.iotdb.service.rpc.thrift.TSFetchWindowSetReq;
+import org.apache.iotdb.service.rpc.thrift.TSFetchWindowSetResp;
 import org.apache.iotdb.service.rpc.thrift.TSGetTimeZoneResp;
 import org.apache.iotdb.service.rpc.thrift.TSInsertRecordReq;
 import org.apache.iotdb.service.rpc.thrift.TSInsertRecordsOfOneDeviceReq;
@@ -421,8 +421,59 @@ public class ClientRPCServiceImpl implements IClientRPCServiceWithHandler {
   }
 
   @Override
-  public TFetchWindowSetResp fetchWindowSet(TFetchWindowSetReq req) throws TException {
-    return null;
+  public TSFetchWindowSetResp fetchWindowSet(TSFetchWindowSetReq req) throws TException {
+    if (!SESSION_MANAGER.checkLogin(SESSION_MANAGER.getCurrSession())) {
+      return RpcUtils.getTSFetchWindowSetResp(getNotLoggedInStatus());
+    }
+    long startTime = System.currentTimeMillis();
+    try {
+      Statement s =
+          StatementGenerator.createStatement(req, SESSION_MANAGER.getCurrSession().getZoneId());
+
+      // permission check
+      TSStatus status = AuthorityChecker.checkAuthority(s, SESSION_MANAGER.getCurrSession());
+      if (status.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
+        return RpcUtils.getTSFetchWindowSetResp(status);
+      }
+
+      QUERY_FREQUENCY_RECORDER.incrementAndGet();
+      AUDIT_LOGGER.debug("Session {} execute fetch window set: {}", req.sessionId, req);
+      long queryId =
+          SESSION_MANAGER.requestQueryId(SESSION_MANAGER.getCurrSession(), req.statementId);
+      // create and cache dataset
+      ExecutionResult result =
+          COORDINATOR.execute(
+              s,
+              queryId,
+              SESSION_MANAGER.getSessionInfo(SESSION_MANAGER.getCurrSession()),
+              "",
+              PARTITION_FETCHER,
+              SCHEMA_FETCHER,
+              config.getQueryTimeoutThreshold());
+
+      if (result.status.code != TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
+        throw new RuntimeException("error code: " + result.status);
+      }
+
+      IQueryExecution queryExecution = COORDINATOR.getQueryExecution(queryId);
+
+      try (SetThreadName threadName = new SetThreadName(result.queryId.getId())) {
+        TSFetchWindowSetResp resp = createResponse(queryExecution.getDatasetHeader());
+        resp.setStatus(result.status);
+        resp.setQueryDataSetList(QueryDataSetUtils.convertTsBlocksToWindowSet(queryExecution));
+        return resp;
+      }
+    } catch (Exception e) {
+      // TODO call the coordinator to release query resource
+      return RpcUtils.getTSFetchWindowSetResp(
+          onQueryException(e, "\"" + req + "\". " + OperationType.EXECUTE_RAW_DATA_QUERY));
+    } finally {
+      addOperationLatency(Operation.EXECUTE_QUERY, startTime);
+      long costTime = System.currentTimeMillis() - startTime;
+      if (costTime >= CONFIG.getSlowQueryThreshold()) {
+        SLOW_SQL_LOGGER.info("Cost: {} ms, sql is {}", costTime, req);
+      }
+    }
   }
 
   @Override
@@ -1747,6 +1798,13 @@ public class ClientRPCServiceImpl implements IClientRPCServiceWithHandler {
     return resp;
   }
 
+  private TSFetchWindowSetResp createResponse(DatasetHeader header) {
+    TSFetchWindowSetResp resp = RpcUtils.getTSFetchWindowSetResp(TSStatusCode.SUCCESS_STATUS);
+    resp.setColumns(header.getRespColumns());
+    resp.setDataTypeList(header.getRespDataTypeList());
+    return resp;
+  }
+
   private TSStatus getNotLoggedInStatus() {
     return RpcUtils.getStatus(
         TSStatusCode.NOT_LOGIN_ERROR,
diff --git a/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/TSServiceImpl.java b/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/TSServiceImpl.java
index 06ef9eec23..9dd837b131 100644
--- a/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/TSServiceImpl.java
+++ b/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/TSServiceImpl.java
@@ -85,8 +85,6 @@ import org.apache.iotdb.rpc.RedirectException;
 import org.apache.iotdb.rpc.RpcUtils;
 import org.apache.iotdb.rpc.TSStatusCode;
 import org.apache.iotdb.service.rpc.thrift.ServerProperties;
-import org.apache.iotdb.service.rpc.thrift.TFetchWindowSetReq;
-import org.apache.iotdb.service.rpc.thrift.TFetchWindowSetResp;
 import org.apache.iotdb.service.rpc.thrift.TSAppendSchemaTemplateReq;
 import org.apache.iotdb.service.rpc.thrift.TSCancelOperationReq;
 import org.apache.iotdb.service.rpc.thrift.TSCloseOperationReq;
@@ -105,6 +103,8 @@ import org.apache.iotdb.service.rpc.thrift.TSFetchMetadataReq;
 import org.apache.iotdb.service.rpc.thrift.TSFetchMetadataResp;
 import org.apache.iotdb.service.rpc.thrift.TSFetchResultsReq;
 import org.apache.iotdb.service.rpc.thrift.TSFetchResultsResp;
+import org.apache.iotdb.service.rpc.thrift.TSFetchWindowSetReq;
+import org.apache.iotdb.service.rpc.thrift.TSFetchWindowSetResp;
 import org.apache.iotdb.service.rpc.thrift.TSGetTimeZoneResp;
 import org.apache.iotdb.service.rpc.thrift.TSInsertRecordReq;
 import org.apache.iotdb.service.rpc.thrift.TSInsertRecordsOfOneDeviceReq;
@@ -279,7 +279,7 @@ public class TSServiceImpl implements IClientRPCServiceWithHandler {
   }
 
   @Override
-  public TFetchWindowSetResp fetchWindowSet(TFetchWindowSetReq req) throws TException {
+  public TSFetchWindowSetResp fetchWindowSet(TSFetchWindowSetReq req) throws TException {
     return null;
   }
 
diff --git a/server/src/main/java/org/apache/iotdb/db/utils/QueryDataSetUtils.java b/server/src/main/java/org/apache/iotdb/db/utils/QueryDataSetUtils.java
index ca603bf535..50ae6ad575 100644
--- a/server/src/main/java/org/apache/iotdb/db/utils/QueryDataSetUtils.java
+++ b/server/src/main/java/org/apache/iotdb/db/utils/QueryDataSetUtils.java
@@ -400,6 +400,209 @@ public class QueryDataSetUtils {
     return res;
   }
 
+  public static List<TSQueryDataSet> convertTsBlocksToWindowSet(IQueryExecution queryExecution)
+      throws IoTDBException, IOException {
+    List<TSQueryDataSet> windowSet = new ArrayList<>();
+
+    int columnNum = queryExecution.getOutputValueColumnCount();
+    // one time column and each value column has an actual value buffer and a bitmap value to
+    // indicate whether it is a null
+    int columnNumWithTime = columnNum * 2 + 1;
+
+    while (true) {
+      Optional<TsBlock> optionalTsBlock = queryExecution.getBatchResult();
+      if (!optionalTsBlock.isPresent()) {
+        break;
+      }
+      TsBlock tsBlock = optionalTsBlock.get();
+      if (tsBlock.isEmpty()) {
+        continue;
+      }
+
+      TSQueryDataSet tsQueryDataSet = new TSQueryDataSet();
+
+      DataOutputStream[] dataOutputStreams = new DataOutputStream[columnNumWithTime];
+      ByteArrayOutputStream[] byteArrayOutputStreams = new ByteArrayOutputStream[columnNumWithTime];
+      for (int i = 0; i < columnNumWithTime; i++) {
+        byteArrayOutputStreams[i] = new ByteArrayOutputStream();
+        dataOutputStreams[i] = new DataOutputStream(byteArrayOutputStreams[i]);
+      }
+
+      int rowCount = 0;
+      int[] valueOccupation = new int[columnNum];
+
+      // used to record a bitmap for every 8 points
+      int[] bitmaps = new int[columnNum];
+
+      int currentCount = tsBlock.getPositionCount();
+      // serialize time column
+      for (int i = 0; i < currentCount; i++) {
+        // use columnOutput to write byte array
+        dataOutputStreams[0].writeLong(tsBlock.getTimeByIndex(i));
+      }
+
+      // serialize each value column and its bitmap
+      for (int k = 0; k < columnNum; k++) {
+        // get DataOutputStream for current value column and its bitmap
+        DataOutputStream dataOutputStream = dataOutputStreams[2 * k + 1];
+        DataOutputStream dataBitmapOutputStream = dataOutputStreams[2 * (k + 1)];
+
+        Column column = tsBlock.getColumn(k);
+        TSDataType type = column.getDataType();
+        switch (type) {
+          case INT32:
+            for (int i = 0; i < currentCount; i++) {
+              rowCount++;
+              if (column.isNull(i)) {
+                bitmaps[k] = bitmaps[k] << 1;
+              } else {
+                bitmaps[k] = (bitmaps[k] << 1) | FLAG;
+                dataOutputStream.writeInt(column.getInt(i));
+                valueOccupation[k] += 4;
+              }
+              if (rowCount != 0 && rowCount % 8 == 0) {
+                dataBitmapOutputStream.writeByte(bitmaps[k]);
+                // we should clear the bitmap every 8 points
+                bitmaps[k] = 0;
+              }
+            }
+            break;
+          case INT64:
+            for (int i = 0; i < currentCount; i++) {
+              rowCount++;
+              if (column.isNull(i)) {
+                bitmaps[k] = bitmaps[k] << 1;
+              } else {
+                bitmaps[k] = (bitmaps[k] << 1) | FLAG;
+                dataOutputStream.writeLong(column.getLong(i));
+                valueOccupation[k] += 8;
+              }
+              if (rowCount != 0 && rowCount % 8 == 0) {
+                dataBitmapOutputStream.writeByte(bitmaps[k]);
+                // we should clear the bitmap every 8 points
+                bitmaps[k] = 0;
+              }
+            }
+            break;
+          case FLOAT:
+            for (int i = 0; i < currentCount; i++) {
+              rowCount++;
+              if (column.isNull(i)) {
+                bitmaps[k] = bitmaps[k] << 1;
+              } else {
+                bitmaps[k] = (bitmaps[k] << 1) | FLAG;
+                dataOutputStream.writeFloat(column.getFloat(i));
+                valueOccupation[k] += 4;
+              }
+              if (rowCount != 0 && rowCount % 8 == 0) {
+                dataBitmapOutputStream.writeByte(bitmaps[k]);
+                // we should clear the bitmap every 8 points
+                bitmaps[k] = 0;
+              }
+            }
+            break;
+          case DOUBLE:
+            for (int i = 0; i < currentCount; i++) {
+              rowCount++;
+              if (column.isNull(i)) {
+                bitmaps[k] = bitmaps[k] << 1;
+              } else {
+                bitmaps[k] = (bitmaps[k] << 1) | FLAG;
+                dataOutputStream.writeDouble(column.getDouble(i));
+                valueOccupation[k] += 8;
+              }
+              if (rowCount != 0 && rowCount % 8 == 0) {
+                dataBitmapOutputStream.writeByte(bitmaps[k]);
+                // we should clear the bitmap every 8 points
+                bitmaps[k] = 0;
+              }
+            }
+            break;
+          case BOOLEAN:
+            for (int i = 0; i < currentCount; i++) {
+              rowCount++;
+              if (column.isNull(i)) {
+                bitmaps[k] = bitmaps[k] << 1;
+              } else {
+                bitmaps[k] = (bitmaps[k] << 1) | FLAG;
+                dataOutputStream.writeBoolean(column.getBoolean(i));
+                valueOccupation[k] += 1;
+              }
+              if (rowCount != 0 && rowCount % 8 == 0) {
+                dataBitmapOutputStream.writeByte(bitmaps[k]);
+                // we should clear the bitmap every 8 points
+                bitmaps[k] = 0;
+              }
+            }
+            break;
+          case TEXT:
+            for (int i = 0; i < currentCount; i++) {
+              rowCount++;
+              if (column.isNull(i)) {
+                bitmaps[k] = bitmaps[k] << 1;
+              } else {
+                bitmaps[k] = (bitmaps[k] << 1) | FLAG;
+                Binary binary = column.getBinary(i);
+                dataOutputStream.writeInt(binary.getLength());
+                dataOutputStream.write(binary.getValues());
+                valueOccupation[k] = valueOccupation[k] + 4 + binary.getLength();
+              }
+              if (rowCount != 0 && rowCount % 8 == 0) {
+                dataBitmapOutputStream.writeByte(bitmaps[k]);
+                // we should clear the bitmap every 8 points
+                bitmaps[k] = 0;
+              }
+            }
+            break;
+          default:
+            throw new UnSupportedDataTypeException(
+                String.format("Data type %s is not supported.", type));
+        }
+        if (k != columnNum - 1) {
+          rowCount -= currentCount;
+        }
+      }
+
+      // feed the remaining bitmap
+      int remaining = rowCount % 8;
+      for (int k = 0; k < columnNum; k++) {
+        if (remaining != 0) {
+          DataOutputStream dataBitmapOutputStream = dataOutputStreams[2 * (k + 1)];
+          dataBitmapOutputStream.writeByte(bitmaps[k] << (8 - remaining));
+        }
+      }
+
+      // calculate the time buffer size
+      int timeOccupation = rowCount * 8;
+      ByteBuffer timeBuffer = ByteBuffer.allocate(timeOccupation);
+      timeBuffer.put(byteArrayOutputStreams[0].toByteArray());
+      timeBuffer.flip();
+      tsQueryDataSet.setTime(timeBuffer);
+
+      // calculate the bitmap buffer size
+      int bitmapOccupation = (rowCount + 7) / 8;
+
+      List<ByteBuffer> bitmapList = new LinkedList<>();
+      List<ByteBuffer> valueList = new LinkedList<>();
+      for (int i = 1; i < byteArrayOutputStreams.length; i += 2) {
+        ByteBuffer valueBuffer = ByteBuffer.allocate(valueOccupation[(i - 1) / 2]);
+        valueBuffer.put(byteArrayOutputStreams[i].toByteArray());
+        valueBuffer.flip();
+        valueList.add(valueBuffer);
+
+        ByteBuffer bitmapBuffer = ByteBuffer.allocate(bitmapOccupation);
+        bitmapBuffer.put(byteArrayOutputStreams[i + 1].toByteArray());
+        bitmapBuffer.flip();
+        bitmapList.add(bitmapBuffer);
+      }
+      tsQueryDataSet.setBitmapList(bitmapList);
+      tsQueryDataSet.setValueList(valueList);
+
+      windowSet.add(tsQueryDataSet);
+    }
+    return windowSet;
+  }
+
   public static long[] readTimesFromBuffer(ByteBuffer buffer, int size) {
     long[] times = new long[size];
     for (int i = 0; i < size; i++) {
diff --git a/service-rpc/src/main/java/org/apache/iotdb/rpc/RpcUtils.java b/service-rpc/src/main/java/org/apache/iotdb/rpc/RpcUtils.java
index 89c9a21c4a..cbedcf75e2 100644
--- a/service-rpc/src/main/java/org/apache/iotdb/rpc/RpcUtils.java
+++ b/service-rpc/src/main/java/org/apache/iotdb/rpc/RpcUtils.java
@@ -25,6 +25,7 @@ import org.apache.iotdb.protocol.influxdb.rpc.thrift.InfluxTSStatus;
 import org.apache.iotdb.service.rpc.thrift.IClientRPCService;
 import org.apache.iotdb.service.rpc.thrift.TSExecuteStatementResp;
 import org.apache.iotdb.service.rpc.thrift.TSFetchResultsResp;
+import org.apache.iotdb.service.rpc.thrift.TSFetchWindowSetResp;
 
 import java.lang.reflect.Proxy;
 import java.text.SimpleDateFormat;
@@ -225,6 +226,18 @@ public class RpcUtils {
     return resp;
   }
 
+  public static TSFetchWindowSetResp getTSFetchWindowSetResp(TSStatus status) {
+    TSFetchWindowSetResp resp = new TSFetchWindowSetResp();
+    TSStatus tsStatus = new TSStatus(status);
+    resp.setStatus(tsStatus);
+    return resp;
+  }
+
+  public static TSFetchWindowSetResp getTSFetchWindowSetResp(TSStatusCode tsStatusCode) {
+    TSStatus status = getStatus(tsStatusCode);
+    return getTSFetchWindowSetResp(status);
+  }
+
   public static final String DEFAULT_TIME_FORMAT = "default";
   public static final String DEFAULT_TIMESTAMP_PRECISION = "ms";
 
diff --git a/thrift/src/main/thrift/client.thrift b/thrift/src/main/thrift/client.thrift
index 51b0d24c29..4f956b0173 100644
--- a/thrift/src/main/thrift/client.thrift
+++ b/thrift/src/main/thrift/client.thrift
@@ -421,17 +421,19 @@ struct TGroupByTimeParameter {
   5: required list<i32> indexes
 }
 
-struct TFetchWindowSetReq {
+struct TSFetchWindowSetReq {
   1: required i64 sessionId
-  2: required list<string> queryPaths
-  3: optional string functionName
-  4: required TGroupByTimeParameter groupByTimeParameter
+  2: required i64 statementId
+  3: required list<string> queryPaths
+  4: optional string functionName
+  5: required TGroupByTimeParameter groupByTimeParameter
 }
 
-struct TFetchWindowSetResp {
-  1: required list<string> columns
-  2: required list<string> dataTypeList
-  3: required list<TSQueryDataSet> queryDataSetList
+struct TSFetchWindowSetResp {
+  1: required common.TSStatus status
+  2: required list<string> columns
+  3: required list<string> dataTypeList
+  4: required list<TSQueryDataSet> queryDataSetList
 }
 
 // The sender and receiver need to check some info to confirm validity
@@ -582,5 +584,5 @@ service IClientRPCService {
 
   TSConnectionInfoResp fetchAllConnectionsInfo();
 
-  TFetchWindowSetResp fetchWindowSet(1:TFetchWindowSetReq req)
+  TSFetchWindowSetResp fetchWindowSet(1:TSFetchWindowSetReq req);
 }
\ No newline at end of file