You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ja...@apache.org on 2022/04/12 08:27:13 UTC
[iotdb] 01/01: [IOTDB-2889] Add method converting TsBlock to TSQueryDataSet in QueryDataSetUtils
This is an automated email from the ASF dual-hosted git repository.
jackietien pushed a commit to branch ty-mpp
in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit 8227c55084efc2c1a3a130591c3020082b0f2a1e
Author: JackieTien97 <ja...@gmail.com>
AuthorDate: Tue Apr 12 16:26:59 2022 +0800
[IOTDB-2889] Add method converting TsBlock to TSQueryDataSet in QueryDataSetUtils
---
.../iotdb/db/mpp/execution/QueryExecution.java | 18 ++-
.../apache/iotdb/db/utils/QueryDataSetUtils.java | 177 ++++++++++++++++++++-
2 files changed, 192 insertions(+), 3 deletions(-)
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/QueryExecution.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/QueryExecution.java
index 78458122d7..e70733b50e 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/QueryExecution.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/QueryExecution.java
@@ -170,6 +170,22 @@ public class QueryExecution implements IQueryExecution {
}
}
+ /** @return true if there is more tsblocks, otherwise false */
+ public boolean hasNextResult() {
+ try {
+ initialResultHandle();
+ return resultHandle.isFinished();
+ } catch (IOException e) {
+ throwIfUnchecked(e.getCause());
+ throw new RuntimeException(e.getCause());
+ }
+ }
+
+ /** return the result column count without the time column */
+ public int getOutputValueColumnCount() {
+ return 1;
+ }
+
/**
* This method is a synchronized method. For READ, it will block until all the FragmentInstances
* have been submitted. For WRITE, it will block until all the FragmentInstances have finished.
@@ -204,7 +220,7 @@ public class QueryExecution implements IQueryExecution {
}
}
- private synchronized void initialResultHandle() throws IOException {
+ private void initialResultHandle() throws IOException {
if (this.resultHandle == null) {
this.resultHandle =
DataBlockService.getInstance()
diff --git a/server/src/main/java/org/apache/iotdb/db/utils/QueryDataSetUtils.java b/server/src/main/java/org/apache/iotdb/db/utils/QueryDataSetUtils.java
index 075e5a2ba7..c65e57f208 100644
--- a/server/src/main/java/org/apache/iotdb/db/utils/QueryDataSetUtils.java
+++ b/server/src/main/java/org/apache/iotdb/db/utils/QueryDataSetUtils.java
@@ -18,12 +18,15 @@
*/
package org.apache.iotdb.db.utils;
+import org.apache.iotdb.db.mpp.execution.QueryExecution;
import org.apache.iotdb.db.tools.watermark.WatermarkEncoder;
import org.apache.iotdb.service.rpc.thrift.TSQueryDataSet;
import org.apache.iotdb.tsfile.exception.write.UnSupportedDataTypeException;
import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
import org.apache.iotdb.tsfile.read.common.Field;
import org.apache.iotdb.tsfile.read.common.RowRecord;
+import org.apache.iotdb.tsfile.read.common.block.TsBlock;
+import org.apache.iotdb.tsfile.read.common.block.column.Column;
import org.apache.iotdb.tsfile.read.query.dataset.QueryDataSet;
import org.apache.iotdb.tsfile.utils.Binary;
import org.apache.iotdb.tsfile.utils.BitMap;
@@ -40,7 +43,7 @@ import java.util.List;
/** TimeValuePairUtils to convert between thrift format and TsFile format. */
public class QueryDataSetUtils {
- private static final int flag = 0x01;
+ private static final int FLAG = 0x01;
private QueryDataSetUtils() {}
@@ -88,7 +91,7 @@ public class QueryDataSetUtils {
if (field == null || field.getDataType() == null) {
bitmap[k] = (bitmap[k] << 1);
} else {
- bitmap[k] = (bitmap[k] << 1) | flag;
+ bitmap[k] = (bitmap[k] << 1) | FLAG;
TSDataType type = field.getDataType();
switch (type) {
case INT32:
@@ -173,6 +176,176 @@ public class QueryDataSetUtils {
return tsQueryDataSet;
}
+ public static TSQueryDataSet convertTsBlockByFetchSize(
+ QueryExecution queryExecution, int fetchSize) throws IOException {
+ int columnNum = queryExecution.getOutputValueColumnCount();
+ TSQueryDataSet tsQueryDataSet = new TSQueryDataSet();
+ // one time column and each value column has a actual value buffer and a bitmap value to
+ // indicate whether it is a null
+ int columnNumWithTime = columnNum * 2 + 1;
+ DataOutputStream[] dataOutputStreams = new DataOutputStream[columnNumWithTime];
+ ByteArrayOutputStream[] byteArrayOutputStreams = new ByteArrayOutputStream[columnNumWithTime];
+ for (int i = 0; i < columnNumWithTime; i++) {
+ byteArrayOutputStreams[i] = new ByteArrayOutputStream();
+ dataOutputStreams[i] = new DataOutputStream(byteArrayOutputStreams[i]);
+ }
+
+ int rowCount = 0;
+ int[] valueOccupation = new int[columnNum];
+ while (rowCount < fetchSize && queryExecution.hasNextResult()) {
+ TsBlock tsBlock = queryExecution.getBatchResult();
+ int currentCount = tsBlock.getPositionCount();
+ for (int i = 0; i < currentCount; i++) {
+ // use columnOutput to write byte array
+ dataOutputStreams[0].writeLong(tsBlock.getTimeByIndex(i));
+ }
+ for (int k = 0; k < columnNum; k++) {
+ DataOutputStream dataOutputStream = dataOutputStreams[2 * k + 1];
+ DataOutputStream dataBitmapOutputStream = dataOutputStreams[2 * (k + 1)];
+ // used to record a bitmap for every 8 row record
+ int bitmap = 0;
+ Column column = tsBlock.getColumn(k);
+ TSDataType type = column.getDataType();
+ switch (type) {
+ case INT32:
+ for (int i = 0; i < currentCount; i++) {
+ if (column.isNull(i)) {
+ bitmap = bitmap << 1;
+ } else {
+ bitmap = (bitmap << 1) | FLAG;
+ dataOutputStream.writeInt(column.getInt(i));
+ valueOccupation[k] += 4;
+ }
+ if (i % 8 == 0) {
+ dataBitmapOutputStream.writeByte(bitmap);
+ // we should clear the bitmap every 8 row record
+ bitmap = 0;
+ }
+ }
+ break;
+ case INT64:
+ for (int i = 0; i < currentCount; i++) {
+ if (column.isNull(i)) {
+ bitmap = bitmap << 1;
+ } else {
+ bitmap = (bitmap << 1) | FLAG;
+ dataOutputStream.writeLong(column.getLong(i));
+ valueOccupation[k] += 8;
+ }
+ if (i % 8 == 0) {
+ dataBitmapOutputStream.writeByte(bitmap);
+ // we should clear the bitmap every 8 row record
+ bitmap = 0;
+ }
+ }
+ break;
+ case FLOAT:
+ for (int i = 0; i < currentCount; i++) {
+ if (column.isNull(i)) {
+ bitmap = bitmap << 1;
+ } else {
+ bitmap = (bitmap << 1) | FLAG;
+ dataOutputStream.writeFloat(column.getFloat(i));
+ valueOccupation[k] += 4;
+ }
+ if (i % 8 == 0) {
+ dataBitmapOutputStream.writeByte(bitmap);
+ // we should clear the bitmap every 8 row record
+ bitmap = 0;
+ }
+ }
+ break;
+ case DOUBLE:
+ for (int i = 0; i < currentCount; i++) {
+ if (column.isNull(i)) {
+ bitmap = bitmap << 1;
+ } else {
+ bitmap = (bitmap << 1) | FLAG;
+ dataOutputStream.writeDouble(column.getDouble(i));
+ valueOccupation[k] += 8;
+ }
+ if (i % 8 == 0) {
+ dataBitmapOutputStream.writeByte(bitmap);
+ // we should clear the bitmap every 8 row record
+ bitmap = 0;
+ }
+ }
+ break;
+ case BOOLEAN:
+ for (int i = 0; i < currentCount; i++) {
+ if (column.isNull(i)) {
+ bitmap = bitmap << 1;
+ } else {
+ bitmap = (bitmap << 1) | FLAG;
+ dataOutputStream.writeBoolean(column.getBoolean(i));
+ valueOccupation[k] += 1;
+ }
+ if (i % 8 == 0) {
+ dataBitmapOutputStream.writeByte(bitmap);
+ // we should clear the bitmap every 8 row record
+ bitmap = 0;
+ }
+ }
+ break;
+ case TEXT:
+ for (int i = 0; i < currentCount; i++) {
+ if (column.isNull(i)) {
+ bitmap = bitmap << 1;
+ } else {
+ bitmap = (bitmap << 1) | FLAG;
+ Binary binary = column.getBinary(i);
+ dataOutputStream.writeInt(binary.getLength());
+ dataOutputStream.write(binary.getValues());
+ valueOccupation[k] = valueOccupation[k] + 4 + binary.getLength();
+ }
+ if (i % 8 == 0) {
+ dataBitmapOutputStream.writeByte(bitmap);
+ // we should clear the bitmap every 8 row record
+ bitmap = 0;
+ }
+ }
+ break;
+ default:
+ throw new UnSupportedDataTypeException(
+ String.format("Data type %s is not supported.", type));
+ }
+ // feed the remaining bitmap
+ int remaining = currentCount % 8;
+ if (remaining != 0) {
+ dataBitmapOutputStream.writeByte(bitmap << (8 - remaining));
+ }
+ }
+ rowCount += currentCount;
+ }
+
+ // calculate the time buffer size
+ int timeOccupation = rowCount * 8;
+ ByteBuffer timeBuffer = ByteBuffer.allocate(timeOccupation);
+ timeBuffer.put(byteArrayOutputStreams[0].toByteArray());
+ timeBuffer.flip();
+ tsQueryDataSet.setTime(timeBuffer);
+
+ // calculate the bitmap buffer size
+ int bitmapOccupation = (rowCount + 7) / 8;
+
+ List<ByteBuffer> bitmapList = new LinkedList<>();
+ List<ByteBuffer> valueList = new LinkedList<>();
+ for (int i = 1; i < byteArrayOutputStreams.length; i += 2) {
+ ByteBuffer valueBuffer = ByteBuffer.allocate(valueOccupation[(i - 1) / 2]);
+ valueBuffer.put(byteArrayOutputStreams[i].toByteArray());
+ valueBuffer.flip();
+ valueList.add(valueBuffer);
+
+ ByteBuffer bitmapBuffer = ByteBuffer.allocate(bitmapOccupation);
+ bitmapBuffer.put(byteArrayOutputStreams[i + 1].toByteArray());
+ bitmapBuffer.flip();
+ bitmapList.add(bitmapBuffer);
+ }
+ tsQueryDataSet.setBitmapList(bitmapList);
+ tsQueryDataSet.setValueList(valueList);
+ return tsQueryDataSet;
+ }
+
public static long[] readTimesFromBuffer(ByteBuffer buffer, int size) {
long[] times = new long[size];
for (int i = 0; i < size; i++) {