You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ja...@apache.org on 2022/04/12 08:27:13 UTC

[iotdb] 01/01: [IOTDB-2889] Add method converting TsBlock to TSQueryDataSet in QueryDataSetUtils

This is an automated email from the ASF dual-hosted git repository.

jackietien pushed a commit to branch ty-mpp
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit 8227c55084efc2c1a3a130591c3020082b0f2a1e
Author: JackieTien97 <ja...@gmail.com>
AuthorDate: Tue Apr 12 16:26:59 2022 +0800

    [IOTDB-2889] Add method converting TsBlock to TSQueryDataSet in QueryDataSetUtils
---
 .../iotdb/db/mpp/execution/QueryExecution.java     |  18 ++-
 .../apache/iotdb/db/utils/QueryDataSetUtils.java   | 177 ++++++++++++++++++++-
 2 files changed, 192 insertions(+), 3 deletions(-)

diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/QueryExecution.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/QueryExecution.java
index 78458122d7..e70733b50e 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/QueryExecution.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/QueryExecution.java
@@ -170,6 +170,22 @@ public class QueryExecution implements IQueryExecution {
     }
   }
 
+  /** @return true if there is more tsblocks, otherwise false */
+  public boolean hasNextResult() {
+    try {
+      initialResultHandle();
+      return resultHandle.isFinished();
+    } catch (IOException e) {
+      throwIfUnchecked(e.getCause());
+      throw new RuntimeException(e.getCause());
+    }
+  }
+
+  /** return the result column count without the time column */
+  public int getOutputValueColumnCount() {
+    return 1;
+  }
+
   /**
    * This method is a synchronized method. For READ, it will block until all the FragmentInstances
    * have been submitted. For WRITE, it will block until all the FragmentInstances have finished.
@@ -204,7 +220,7 @@ public class QueryExecution implements IQueryExecution {
     }
   }
 
-  private synchronized void initialResultHandle() throws IOException {
+  private void initialResultHandle() throws IOException {
     if (this.resultHandle == null) {
       this.resultHandle =
           DataBlockService.getInstance()
diff --git a/server/src/main/java/org/apache/iotdb/db/utils/QueryDataSetUtils.java b/server/src/main/java/org/apache/iotdb/db/utils/QueryDataSetUtils.java
index 075e5a2ba7..c65e57f208 100644
--- a/server/src/main/java/org/apache/iotdb/db/utils/QueryDataSetUtils.java
+++ b/server/src/main/java/org/apache/iotdb/db/utils/QueryDataSetUtils.java
@@ -18,12 +18,15 @@
  */
 package org.apache.iotdb.db.utils;
 
+import org.apache.iotdb.db.mpp.execution.QueryExecution;
 import org.apache.iotdb.db.tools.watermark.WatermarkEncoder;
 import org.apache.iotdb.service.rpc.thrift.TSQueryDataSet;
 import org.apache.iotdb.tsfile.exception.write.UnSupportedDataTypeException;
 import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
 import org.apache.iotdb.tsfile.read.common.Field;
 import org.apache.iotdb.tsfile.read.common.RowRecord;
+import org.apache.iotdb.tsfile.read.common.block.TsBlock;
+import org.apache.iotdb.tsfile.read.common.block.column.Column;
 import org.apache.iotdb.tsfile.read.query.dataset.QueryDataSet;
 import org.apache.iotdb.tsfile.utils.Binary;
 import org.apache.iotdb.tsfile.utils.BitMap;
@@ -40,7 +43,7 @@ import java.util.List;
 /** TimeValuePairUtils to convert between thrift format and TsFile format. */
 public class QueryDataSetUtils {
 
-  private static final int flag = 0x01;
+  private static final int FLAG = 0x01;
 
   private QueryDataSetUtils() {}
 
@@ -88,7 +91,7 @@ public class QueryDataSetUtils {
           if (field == null || field.getDataType() == null) {
             bitmap[k] = (bitmap[k] << 1);
           } else {
-            bitmap[k] = (bitmap[k] << 1) | flag;
+            bitmap[k] = (bitmap[k] << 1) | FLAG;
             TSDataType type = field.getDataType();
             switch (type) {
               case INT32:
@@ -173,6 +176,176 @@ public class QueryDataSetUtils {
     return tsQueryDataSet;
   }
 
+  public static TSQueryDataSet convertTsBlockByFetchSize(
+      QueryExecution queryExecution, int fetchSize) throws IOException {
+    int columnNum = queryExecution.getOutputValueColumnCount();
+    TSQueryDataSet tsQueryDataSet = new TSQueryDataSet();
+    // one time column and each value column has a actual value buffer and a bitmap value to
+    // indicate whether it is a null
+    int columnNumWithTime = columnNum * 2 + 1;
+    DataOutputStream[] dataOutputStreams = new DataOutputStream[columnNumWithTime];
+    ByteArrayOutputStream[] byteArrayOutputStreams = new ByteArrayOutputStream[columnNumWithTime];
+    for (int i = 0; i < columnNumWithTime; i++) {
+      byteArrayOutputStreams[i] = new ByteArrayOutputStream();
+      dataOutputStreams[i] = new DataOutputStream(byteArrayOutputStreams[i]);
+    }
+
+    int rowCount = 0;
+    int[] valueOccupation = new int[columnNum];
+    while (rowCount < fetchSize && queryExecution.hasNextResult()) {
+      TsBlock tsBlock = queryExecution.getBatchResult();
+      int currentCount = tsBlock.getPositionCount();
+      for (int i = 0; i < currentCount; i++) {
+        // use columnOutput to write byte array
+        dataOutputStreams[0].writeLong(tsBlock.getTimeByIndex(i));
+      }
+      for (int k = 0; k < columnNum; k++) {
+        DataOutputStream dataOutputStream = dataOutputStreams[2 * k + 1];
+        DataOutputStream dataBitmapOutputStream = dataOutputStreams[2 * (k + 1)];
+        // used to record a bitmap for every 8 row record
+        int bitmap = 0;
+        Column column = tsBlock.getColumn(k);
+        TSDataType type = column.getDataType();
+        switch (type) {
+          case INT32:
+            for (int i = 0; i < currentCount; i++) {
+              if (column.isNull(i)) {
+                bitmap = bitmap << 1;
+              } else {
+                bitmap = (bitmap << 1) | FLAG;
+                dataOutputStream.writeInt(column.getInt(i));
+                valueOccupation[k] += 4;
+              }
+              if (i % 8 == 0) {
+                dataBitmapOutputStream.writeByte(bitmap);
+                // we should clear the bitmap every 8 row record
+                bitmap = 0;
+              }
+            }
+            break;
+          case INT64:
+            for (int i = 0; i < currentCount; i++) {
+              if (column.isNull(i)) {
+                bitmap = bitmap << 1;
+              } else {
+                bitmap = (bitmap << 1) | FLAG;
+                dataOutputStream.writeLong(column.getLong(i));
+                valueOccupation[k] += 8;
+              }
+              if (i % 8 == 0) {
+                dataBitmapOutputStream.writeByte(bitmap);
+                // we should clear the bitmap every 8 row record
+                bitmap = 0;
+              }
+            }
+            break;
+          case FLOAT:
+            for (int i = 0; i < currentCount; i++) {
+              if (column.isNull(i)) {
+                bitmap = bitmap << 1;
+              } else {
+                bitmap = (bitmap << 1) | FLAG;
+                dataOutputStream.writeFloat(column.getFloat(i));
+                valueOccupation[k] += 4;
+              }
+              if (i % 8 == 0) {
+                dataBitmapOutputStream.writeByte(bitmap);
+                // we should clear the bitmap every 8 row record
+                bitmap = 0;
+              }
+            }
+            break;
+          case DOUBLE:
+            for (int i = 0; i < currentCount; i++) {
+              if (column.isNull(i)) {
+                bitmap = bitmap << 1;
+              } else {
+                bitmap = (bitmap << 1) | FLAG;
+                dataOutputStream.writeDouble(column.getDouble(i));
+                valueOccupation[k] += 8;
+              }
+              if (i % 8 == 0) {
+                dataBitmapOutputStream.writeByte(bitmap);
+                // we should clear the bitmap every 8 row record
+                bitmap = 0;
+              }
+            }
+            break;
+          case BOOLEAN:
+            for (int i = 0; i < currentCount; i++) {
+              if (column.isNull(i)) {
+                bitmap = bitmap << 1;
+              } else {
+                bitmap = (bitmap << 1) | FLAG;
+                dataOutputStream.writeBoolean(column.getBoolean(i));
+                valueOccupation[k] += 1;
+              }
+              if (i % 8 == 0) {
+                dataBitmapOutputStream.writeByte(bitmap);
+                // we should clear the bitmap every 8 row record
+                bitmap = 0;
+              }
+            }
+            break;
+          case TEXT:
+            for (int i = 0; i < currentCount; i++) {
+              if (column.isNull(i)) {
+                bitmap = bitmap << 1;
+              } else {
+                bitmap = (bitmap << 1) | FLAG;
+                Binary binary = column.getBinary(i);
+                dataOutputStream.writeInt(binary.getLength());
+                dataOutputStream.write(binary.getValues());
+                valueOccupation[k] = valueOccupation[k] + 4 + binary.getLength();
+              }
+              if (i % 8 == 0) {
+                dataBitmapOutputStream.writeByte(bitmap);
+                // we should clear the bitmap every 8 row record
+                bitmap = 0;
+              }
+            }
+            break;
+          default:
+            throw new UnSupportedDataTypeException(
+                String.format("Data type %s is not supported.", type));
+        }
+        // feed the remaining bitmap
+        int remaining = currentCount % 8;
+        if (remaining != 0) {
+          dataBitmapOutputStream.writeByte(bitmap << (8 - remaining));
+        }
+      }
+      rowCount += currentCount;
+    }
+
+    // calculate the time buffer size
+    int timeOccupation = rowCount * 8;
+    ByteBuffer timeBuffer = ByteBuffer.allocate(timeOccupation);
+    timeBuffer.put(byteArrayOutputStreams[0].toByteArray());
+    timeBuffer.flip();
+    tsQueryDataSet.setTime(timeBuffer);
+
+    // calculate the bitmap buffer size
+    int bitmapOccupation = (rowCount + 7) / 8;
+
+    List<ByteBuffer> bitmapList = new LinkedList<>();
+    List<ByteBuffer> valueList = new LinkedList<>();
+    for (int i = 1; i < byteArrayOutputStreams.length; i += 2) {
+      ByteBuffer valueBuffer = ByteBuffer.allocate(valueOccupation[(i - 1) / 2]);
+      valueBuffer.put(byteArrayOutputStreams[i].toByteArray());
+      valueBuffer.flip();
+      valueList.add(valueBuffer);
+
+      ByteBuffer bitmapBuffer = ByteBuffer.allocate(bitmapOccupation);
+      bitmapBuffer.put(byteArrayOutputStreams[i + 1].toByteArray());
+      bitmapBuffer.flip();
+      bitmapList.add(bitmapBuffer);
+    }
+    tsQueryDataSet.setBitmapList(bitmapList);
+    tsQueryDataSet.setValueList(valueList);
+    return tsQueryDataSet;
+  }
+
   public static long[] readTimesFromBuffer(ByteBuffer buffer, int size) {
     long[] times = new long[size];
     for (int i = 0; i < size; i++) {