You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ja...@apache.org on 2022/04/12 12:50:45 UTC

[iotdb] branch master updated: [IOTDB-2889] Add method converting TsBlock to TSQueryDataSet in QueryDataSetUtils (#5490)

This is an automated email from the ASF dual-hosted git repository.

jackietien pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/master by this push:
     new d3b30cf654 [IOTDB-2889] Add method converting TsBlock to TSQueryDataSet in QueryDataSetUtils (#5490)
d3b30cf654 is described below

commit d3b30cf6540f8550b7fe08a6f1044978c21435d2
Author: Jackie Tien <ja...@gmail.com>
AuthorDate: Tue Apr 12 20:50:40 2022 +0800

    [IOTDB-2889] Add method converting TsBlock to TSQueryDataSet in QueryDataSetUtils (#5490)
---
 .../iotdb/db/mpp/execution/QueryExecution.java     |  21 ++-
 .../apache/iotdb/db/utils/QueryDataSetUtils.java   | 181 ++++++++++++++++++++-
 2 files changed, 198 insertions(+), 4 deletions(-)

diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/QueryExecution.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/QueryExecution.java
index 78458122d7..9b89162df0 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/QueryExecution.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/QueryExecution.java
@@ -155,7 +155,7 @@ public class QueryExecution implements IQueryExecution {
    */
   public TsBlock getBatchResult() {
     try {
-      initialResultHandle();
+      initResultHandle();
       ListenableFuture<Void> blocked = resultHandle.isBlocked();
       blocked.get();
       return resultHandle.receive();
@@ -170,6 +170,23 @@ public class QueryExecution implements IQueryExecution {
     }
   }
 
+  /** @return true if there is more tsblocks, otherwise false */
+  public boolean hasNextResult() {
+    try {
+      initResultHandle();
+      return resultHandle.isFinished();
+    } catch (IOException e) {
+      throwIfUnchecked(e.getCause());
+      throw new RuntimeException(e.getCause());
+    }
+  }
+
+  /** return the result column count without the time column */
+  public int getOutputValueColumnCount() {
+    // TODO need return the actual size while there exists output columns in Analysis
+    return 1;
+  }
+
   /**
    * This method is a synchronized method. For READ, it will block until all the FragmentInstances
    * have been submitted. For WRITE, it will block until all the FragmentInstances have finished.
@@ -204,7 +221,7 @@ public class QueryExecution implements IQueryExecution {
     }
   }
 
-  private synchronized void initialResultHandle() throws IOException {
+  private void initResultHandle() throws IOException {
     if (this.resultHandle == null) {
       this.resultHandle =
           DataBlockService.getInstance()
diff --git a/server/src/main/java/org/apache/iotdb/db/utils/QueryDataSetUtils.java b/server/src/main/java/org/apache/iotdb/db/utils/QueryDataSetUtils.java
index 075e5a2ba7..29efa1a4c7 100644
--- a/server/src/main/java/org/apache/iotdb/db/utils/QueryDataSetUtils.java
+++ b/server/src/main/java/org/apache/iotdb/db/utils/QueryDataSetUtils.java
@@ -18,12 +18,15 @@
  */
 package org.apache.iotdb.db.utils;
 
+import org.apache.iotdb.db.mpp.execution.QueryExecution;
 import org.apache.iotdb.db.tools.watermark.WatermarkEncoder;
 import org.apache.iotdb.service.rpc.thrift.TSQueryDataSet;
 import org.apache.iotdb.tsfile.exception.write.UnSupportedDataTypeException;
 import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
 import org.apache.iotdb.tsfile.read.common.Field;
 import org.apache.iotdb.tsfile.read.common.RowRecord;
+import org.apache.iotdb.tsfile.read.common.block.TsBlock;
+import org.apache.iotdb.tsfile.read.common.block.column.Column;
 import org.apache.iotdb.tsfile.read.query.dataset.QueryDataSet;
 import org.apache.iotdb.tsfile.utils.Binary;
 import org.apache.iotdb.tsfile.utils.BitMap;
@@ -40,7 +43,7 @@ import java.util.List;
 /** TimeValuePairUtils to convert between thrift format and TsFile format. */
 public class QueryDataSetUtils {
 
-  private static final int flag = 0x01;
+  private static final int FLAG = 0x01;
 
   private QueryDataSetUtils() {}
 
@@ -88,7 +91,7 @@ public class QueryDataSetUtils {
           if (field == null || field.getDataType() == null) {
             bitmap[k] = (bitmap[k] << 1);
           } else {
-            bitmap[k] = (bitmap[k] << 1) | flag;
+            bitmap[k] = (bitmap[k] << 1) | FLAG;
             TSDataType type = field.getDataType();
             switch (type) {
               case INT32:
@@ -173,6 +176,180 @@ public class QueryDataSetUtils {
     return tsQueryDataSet;
   }
 
+  public static TSQueryDataSet convertTsBlockByFetchSize(
+      QueryExecution queryExecution, int fetchSize) throws IOException {
+    int columnNum = queryExecution.getOutputValueColumnCount();
+    TSQueryDataSet tsQueryDataSet = new TSQueryDataSet();
+    // one time column and each value column has an actual value buffer and a bitmap value to
+    // indicate whether it is a null
+    int columnNumWithTime = columnNum * 2 + 1;
+    DataOutputStream[] dataOutputStreams = new DataOutputStream[columnNumWithTime];
+    ByteArrayOutputStream[] byteArrayOutputStreams = new ByteArrayOutputStream[columnNumWithTime];
+    for (int i = 0; i < columnNumWithTime; i++) {
+      byteArrayOutputStreams[i] = new ByteArrayOutputStream();
+      dataOutputStreams[i] = new DataOutputStream(byteArrayOutputStreams[i]);
+    }
+
+    int rowCount = 0;
+    int[] valueOccupation = new int[columnNum];
+    while (rowCount < fetchSize && queryExecution.hasNextResult()) {
+      TsBlock tsBlock = queryExecution.getBatchResult();
+      int currentCount = tsBlock.getPositionCount();
+      // serialize time column
+      for (int i = 0; i < currentCount; i++) {
+        // use columnOutput to write byte array
+        dataOutputStreams[0].writeLong(tsBlock.getTimeByIndex(i));
+      }
+
+      // serialize each value column and its bitmap
+      for (int k = 0; k < columnNum; k++) {
+        // get DataOutputStream for current value column and its bitmap
+        DataOutputStream dataOutputStream = dataOutputStreams[2 * k + 1];
+        DataOutputStream dataBitmapOutputStream = dataOutputStreams[2 * (k + 1)];
+        // used to record a bitmap for every 8 points
+        int bitmap = 0;
+        Column column = tsBlock.getColumn(k);
+        TSDataType type = column.getDataType();
+        switch (type) {
+          case INT32:
+            for (int i = 0; i < currentCount; i++) {
+              if (column.isNull(i)) {
+                bitmap = bitmap << 1;
+              } else {
+                bitmap = (bitmap << 1) | FLAG;
+                dataOutputStream.writeInt(column.getInt(i));
+                valueOccupation[k] += 4;
+              }
+              if (i % 8 == 0) {
+                dataBitmapOutputStream.writeByte(bitmap);
+                // we should clear the bitmap every 8 points
+                bitmap = 0;
+              }
+            }
+            break;
+          case INT64:
+            for (int i = 0; i < currentCount; i++) {
+              if (column.isNull(i)) {
+                bitmap = bitmap << 1;
+              } else {
+                bitmap = (bitmap << 1) | FLAG;
+                dataOutputStream.writeLong(column.getLong(i));
+                valueOccupation[k] += 8;
+              }
+              if (i % 8 == 0) {
+                dataBitmapOutputStream.writeByte(bitmap);
+                // we should clear the bitmap every 8 points
+                bitmap = 0;
+              }
+            }
+            break;
+          case FLOAT:
+            for (int i = 0; i < currentCount; i++) {
+              if (column.isNull(i)) {
+                bitmap = bitmap << 1;
+              } else {
+                bitmap = (bitmap << 1) | FLAG;
+                dataOutputStream.writeFloat(column.getFloat(i));
+                valueOccupation[k] += 4;
+              }
+              if (i % 8 == 0) {
+                dataBitmapOutputStream.writeByte(bitmap);
+                // we should clear the bitmap every 8 points
+                bitmap = 0;
+              }
+            }
+            break;
+          case DOUBLE:
+            for (int i = 0; i < currentCount; i++) {
+              if (column.isNull(i)) {
+                bitmap = bitmap << 1;
+              } else {
+                bitmap = (bitmap << 1) | FLAG;
+                dataOutputStream.writeDouble(column.getDouble(i));
+                valueOccupation[k] += 8;
+              }
+              if (i % 8 == 0) {
+                dataBitmapOutputStream.writeByte(bitmap);
+                // we should clear the bitmap every 8 points
+                bitmap = 0;
+              }
+            }
+            break;
+          case BOOLEAN:
+            for (int i = 0; i < currentCount; i++) {
+              if (column.isNull(i)) {
+                bitmap = bitmap << 1;
+              } else {
+                bitmap = (bitmap << 1) | FLAG;
+                dataOutputStream.writeBoolean(column.getBoolean(i));
+                valueOccupation[k] += 1;
+              }
+              if (i % 8 == 0) {
+                dataBitmapOutputStream.writeByte(bitmap);
+                // we should clear the bitmap every 8 points
+                bitmap = 0;
+              }
+            }
+            break;
+          case TEXT:
+            for (int i = 0; i < currentCount; i++) {
+              if (column.isNull(i)) {
+                bitmap = bitmap << 1;
+              } else {
+                bitmap = (bitmap << 1) | FLAG;
+                Binary binary = column.getBinary(i);
+                dataOutputStream.writeInt(binary.getLength());
+                dataOutputStream.write(binary.getValues());
+                valueOccupation[k] = valueOccupation[k] + 4 + binary.getLength();
+              }
+              if (i % 8 == 0) {
+                dataBitmapOutputStream.writeByte(bitmap);
+                // we should clear the bitmap every 8 points
+                bitmap = 0;
+              }
+            }
+            break;
+          default:
+            throw new UnSupportedDataTypeException(
+                String.format("Data type %s is not supported.", type));
+        }
+        // feed the remaining bitmap
+        int remaining = currentCount % 8;
+        if (remaining != 0) {
+          dataBitmapOutputStream.writeByte(bitmap << (8 - remaining));
+        }
+      }
+      rowCount += currentCount;
+    }
+
+    // calculate the time buffer size
+    int timeOccupation = rowCount * 8;
+    ByteBuffer timeBuffer = ByteBuffer.allocate(timeOccupation);
+    timeBuffer.put(byteArrayOutputStreams[0].toByteArray());
+    timeBuffer.flip();
+    tsQueryDataSet.setTime(timeBuffer);
+
+    // calculate the bitmap buffer size
+    int bitmapOccupation = (rowCount + 7) / 8;
+
+    List<ByteBuffer> bitmapList = new LinkedList<>();
+    List<ByteBuffer> valueList = new LinkedList<>();
+    for (int i = 1; i < byteArrayOutputStreams.length; i += 2) {
+      ByteBuffer valueBuffer = ByteBuffer.allocate(valueOccupation[(i - 1) / 2]);
+      valueBuffer.put(byteArrayOutputStreams[i].toByteArray());
+      valueBuffer.flip();
+      valueList.add(valueBuffer);
+
+      ByteBuffer bitmapBuffer = ByteBuffer.allocate(bitmapOccupation);
+      bitmapBuffer.put(byteArrayOutputStreams[i + 1].toByteArray());
+      bitmapBuffer.flip();
+      bitmapList.add(bitmapBuffer);
+    }
+    tsQueryDataSet.setBitmapList(bitmapList);
+    tsQueryDataSet.setValueList(valueList);
+    return tsQueryDataSet;
+  }
+
   public static long[] readTimesFromBuffer(ByteBuffer buffer, int size) {
     long[] times = new long[size];
     for (int i = 0; i < size; i++) {