You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by qi...@apache.org on 2019/12/13 07:48:30 UTC
[incubator-iotdb] branch f_batch_reader updated: add fill buffer in
EngineDataSetWithoutValueFilter (#646)
This is an automated email from the ASF dual-hosted git repository.
qiaojialin pushed a commit to branch f_batch_reader
in repository https://gitbox.apache.org/repos/asf/incubator-iotdb.git
The following commit(s) were added to refs/heads/f_batch_reader by this push:
new 05f33f0 add fill buffer in EngineDataSetWithoutValueFilter (#646)
05f33f0 is described below
commit 05f33f01a644a788019175bc205e7ae116b20754
Author: Jackie Tien <Ja...@foxmail.com>
AuthorDate: Fri Dec 13 15:48:24 2019 +0800
add fill buffer in EngineDataSetWithoutValueFilter (#646)
* add fill buffer in EngineDataSetWithoutValueFilter
---
.../dataset/EngineDataSetWithoutValueFilter.java | 231 ++++++++++++---------
.../org/apache/iotdb/db/service/TSServiceImpl.java | 31 ++-
2 files changed, 158 insertions(+), 104 deletions(-)
diff --git a/server/src/main/java/org/apache/iotdb/db/query/dataset/EngineDataSetWithoutValueFilter.java b/server/src/main/java/org/apache/iotdb/db/query/dataset/EngineDataSetWithoutValueFilter.java
index 364f229..6cdef0c 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/dataset/EngineDataSetWithoutValueFilter.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/dataset/EngineDataSetWithoutValueFilter.java
@@ -19,47 +19,33 @@
package org.apache.iotdb.db.query.dataset;
-import java.io.IOException;
-import java.nio.ByteBuffer;
-import java.util.List;
-import java.util.TreeSet;
-import org.apache.iotdb.db.query.reader.IPointReader;
-import org.apache.iotdb.db.utils.TimeValuePair;
-import org.apache.iotdb.db.utils.TsPrimitiveType;
+import org.apache.iotdb.service.rpc.thrift.TSQueryDataSet;
import org.apache.iotdb.tsfile.exception.write.UnSupportedDataTypeException;
import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
import org.apache.iotdb.tsfile.read.common.BatchData;
-import org.apache.iotdb.tsfile.read.common.Field;
import org.apache.iotdb.tsfile.read.common.Path;
import org.apache.iotdb.tsfile.read.common.RowRecord;
import org.apache.iotdb.tsfile.read.query.dataset.QueryDataSet;
import org.apache.iotdb.tsfile.read.reader.IBatchReader;
-/**
- * TODO implement this class as TsFile DataSetWithoutTimeGenerator.
- */
+import java.io.ByteArrayOutputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.TreeSet;
+
+
public class EngineDataSetWithoutValueFilter extends QueryDataSet {
private List<IBatchReader> seriesReaderWithoutValueFilterList;
- private TimeValuePair[] cacheTimeValueList;
-
private TreeSet<Long> timeHeap;
+ private BatchData[] cachedBatchDataArray;
- private List<List<BatchData>> batchDataInFilter;
-
- private ByteBuffer timeBuffer;
-
- /**
- * each buffer contains values of one time series
- */
- private List<ByteBuffer> valueBuffers;
-
- /**
- * indicate whether the points in value buffer are null
- */
- private List<ByteBuffer> bitmapBuffer;
+ private static final int flag = 0x01;
/**
* constructor of EngineDataSetWithoutValueFilter.
@@ -79,90 +65,150 @@ public class EngineDataSetWithoutValueFilter extends QueryDataSet {
private void initHeap() throws IOException {
timeHeap = new TreeSet<>();
- cacheTimeValueList = new TimeValuePair[seriesReaderWithoutValueFilterList.size()];
+ cachedBatchDataArray = new BatchData[seriesReaderWithoutValueFilterList.size()];
for (int i = 0; i < seriesReaderWithoutValueFilterList.size(); i++) {
IBatchReader reader = seriesReaderWithoutValueFilterList.get(i);
- if (reader.hasNext()) {
- TimeValuePair timeValuePair = reader.next();
- cacheTimeValueList[i] = timeValuePair;
- timeHeapPut(timeValuePair.getTimestamp());
+ if (reader.hasNextBatch()) {
+ BatchData batchData = reader.nextBatch();
+ cachedBatchDataArray[i] = batchData;
+ timeHeapPut(batchData.currentTime());
}
}
}
/**
- *
- * Yuan Tian !!!
- *
* fill time buffer, value buffers and bitmap buffers
- *
- * @param batchReaders readers of each time series
*/
- private void fillBuffer(List<IBatchReader> batchReaders) {
-
- }
-
- @Override
- protected boolean hasNextWithoutConstraint() {
- return !timeHeap.isEmpty();
- }
-
- @Override
- protected RowRecord nextWithoutConstraint() throws IOException {
- long minTime = timeHeapGet();
-
- RowRecord record = new RowRecord(minTime);
+ public TSQueryDataSet fillBuffer(int fetchSize) throws IOException {
+ int columnNum = seriesReaderWithoutValueFilterList.size();
+ TSQueryDataSet tsQueryDataSet = new TSQueryDataSet();
+ // one time column and each value column has a actual value buffer and a bitmap value to indicate whether it is a null
+ int columnNumWithTime = columnNum * 2 + 1;
+ DataOutputStream[] dataOutputStreams = new DataOutputStream[columnNumWithTime];
+ ByteArrayOutputStream[] byteArrayOutputStreams = new ByteArrayOutputStream[columnNumWithTime];
+ for (int i = 0; i < columnNumWithTime; i++) {
+ byteArrayOutputStreams[i] = new ByteArrayOutputStream();
+ dataOutputStreams[i] = new DataOutputStream(byteArrayOutputStreams[i]);
+ }
- for (int i = 0; i < seriesReaderWithoutValueFilterList.size(); i++) {
- IBatchReader reader = seriesReaderWithoutValueFilterList.get(i);
- if (cacheTimeValueList[i] == null) {
- record.addField(new Field(null));
- } else {
- if (cacheTimeValueList[i].getTimestamp() == minTime) {
- record.addField(getField(cacheTimeValueList[i].getValue(), dataTypes.get(i)));
- if (seriesReaderWithoutValueFilterList.get(i).hasNext()) {
- cacheTimeValueList[i] = reader.next();
- timeHeapPut(cacheTimeValueList[i].getTimestamp());
+ int rowCount = 0;
+ int[] valueOccupation = new int[columnNum];
+ // used to record a bitmap for every 8 row record
+ int[] bitmap = new int[columnNum];
+ for (int i = 0; i < fetchSize; i++) {
+ if (!timeHeap.isEmpty()) {
+ long minTime = timeHeapGet();
+ // use columnOutput to write byte array
+ dataOutputStreams[0].writeLong(minTime);
+ for (int k = 0; k < cachedBatchDataArray.length; k++) {
+ if (cachedBatchDataArray[k] != null && !cachedBatchDataArray[k].hasNext()) {
+ if (seriesReaderWithoutValueFilterList.get(k).hasNextBatch())
+ cachedBatchDataArray[k] = seriesReaderWithoutValueFilterList.get(k).nextBatch();
+ else
+ cachedBatchDataArray[k] = null;
+ }
+ BatchData batchData = cachedBatchDataArray[k];
+ DataOutputStream dataOutputStream = dataOutputStreams[2*k + 1]; // DO NOT FORGET +1
+ if (batchData == null || batchData.currentTime() != minTime) {
+ bitmap[k] = (bitmap[k] << 1);
+ if (batchData != null)
+ timeHeapPut(batchData.currentTime());
+ } else {
+ bitmap[k] = (bitmap[k] << 1) | flag;
+ TSDataType type = batchData.getDataType();
+ switch (type) {
+ case INT32:
+ dataOutputStream.writeInt(batchData.getInt());
+ valueOccupation[k] += 4;
+ break;
+ case INT64:
+ dataOutputStream.writeLong(batchData.getLong());
+ valueOccupation[k] += 8;
+ break;
+ case FLOAT:
+ dataOutputStream.writeFloat(batchData.getFloat());
+ valueOccupation[k] += 4;
+ break;
+ case DOUBLE:
+ dataOutputStream.writeDouble(batchData.getDouble());
+ valueOccupation[k] += 8;
+ break;
+ case BOOLEAN:
+ dataOutputStream.writeBoolean(batchData.getBoolean());
+ valueOccupation[k] += 1;
+ break;
+ case TEXT:
+ dataOutputStream.writeInt(batchData.getBinary().getLength());
+ dataOutputStream.write(batchData.getBinary().getValues());
+ valueOccupation[k] = valueOccupation[k] + 4 + batchData.getBinary().getLength();
+ break;
+ default:
+ throw new UnSupportedDataTypeException(
+ String.format("Data type %s is not supported.", type));
+ }
}
- } else {
- record.addField(new Field(null));
}
+ rowCount++;
+ if (rowCount % 8 == 0) {
+ for (int j = 0; j < bitmap.length; j++) {
+ DataOutputStream dataBitmapOutputStream = dataOutputStreams[2*(j+1)];
+ dataBitmapOutputStream.writeByte(bitmap[j]);
+ // we should clear the bitmap every 8 row record
+ bitmap[j] = 0;
+ }
+ }
+ } else {
+ break;
}
}
- return record;
- }
-
- private Field getField(TsPrimitiveType tsPrimitiveType, TSDataType dataType) {
- if (tsPrimitiveType == null) {
- return new Field(null);
+ // feed the remaining bitmap
+ int remaining = rowCount % 8;
+ if (remaining != 0) {
+ for (int j = 0; j < bitmap.length; j++) {
+ DataOutputStream dataBitmapOutputStream = dataOutputStreams[2*(j+1)];
+ dataBitmapOutputStream.writeByte(bitmap[j] << (8-remaining));
+ }
}
- Field field = new Field(dataType);
- switch (dataType) {
- case INT32:
- field.setIntV(tsPrimitiveType.getInt());
- break;
- case INT64:
- field.setLongV(tsPrimitiveType.getLong());
- break;
- case FLOAT:
- field.setFloatV(tsPrimitiveType.getFloat());
- break;
- case DOUBLE:
- field.setDoubleV(tsPrimitiveType.getDouble());
- break;
- case BOOLEAN:
- field.setBoolV(tsPrimitiveType.getBoolean());
- break;
- case TEXT:
- field.setBinaryV(tsPrimitiveType.getBinary());
- break;
- default:
- throw new UnSupportedDataTypeException("UnSupported: " + dataType);
+
+ // calculate the time buffer size
+ int timeOccupation = rowCount * 8;
+ ByteBuffer timeBuffer = ByteBuffer.allocate(timeOccupation);
+ timeBuffer.put(byteArrayOutputStreams[0].toByteArray());
+ timeBuffer.flip();
+ tsQueryDataSet.setTime(timeBuffer);
+
+ // calculate the bitmap buffer size
+ int bitmapOccupation = rowCount / 8 + (rowCount % 8 == 0 ? 0 : 1);
+
+ List<ByteBuffer> bitmapList = new LinkedList<>();
+ List<ByteBuffer> valueList = new LinkedList<>();
+ for (int i = 1; i < byteArrayOutputStreams.length; i += 2) {
+ ByteBuffer valueBuffer = ByteBuffer.allocate(valueOccupation[(i-1)/2]);
+ valueBuffer.put(byteArrayOutputStreams[i].toByteArray());
+ valueBuffer.flip();
+ valueList.add(valueBuffer);
+
+ ByteBuffer bitmapBuffer = ByteBuffer.allocate(bitmapOccupation);
+ bitmapBuffer.put(byteArrayOutputStreams[i+1].toByteArray());
+ bitmapBuffer.flip();
+ bitmapList.add(bitmapBuffer);
}
- return field;
+ tsQueryDataSet.setBitmapList(bitmapList);
+ tsQueryDataSet.setValueList(valueList);
+ return tsQueryDataSet;
+ }
+
+ @Override
+ protected boolean hasNextWithoutConstraint() throws IOException {
+ throw new IOException("The method can't be invoked, please try to use fillBuffer method directly!");
+ }
+
+ @Override
+ protected RowRecord nextWithoutConstraint() throws IOException {
+ throw new IOException("The method can't be invoked, please try to use fillBuffer method directly!");
}
/**
@@ -176,7 +222,4 @@ public class EngineDataSetWithoutValueFilter extends QueryDataSet {
return timeHeap.pollFirst();
}
- public List<IPointReader> getReaders() {
- return seriesReaderWithoutValueFilterList;
- }
}
diff --git a/server/src/main/java/org/apache/iotdb/db/service/TSServiceImpl.java b/server/src/main/java/org/apache/iotdb/db/service/TSServiceImpl.java
index ddaed4e..fac6dd5 100644
--- a/server/src/main/java/org/apache/iotdb/db/service/TSServiceImpl.java
+++ b/server/src/main/java/org/apache/iotdb/db/service/TSServiceImpl.java
@@ -49,6 +49,7 @@ import org.apache.iotdb.db.qp.physical.crud.QueryPlan;
import org.apache.iotdb.db.qp.physical.sys.*;
import org.apache.iotdb.db.query.context.QueryContext;
import org.apache.iotdb.db.query.control.QueryResourceManager;
+import org.apache.iotdb.db.query.dataset.EngineDataSetWithoutValueFilter;
import org.apache.iotdb.db.tools.watermark.GroupedLSBWatermarkEncoder;
import org.apache.iotdb.db.tools.watermark.WatermarkEncoder;
import org.apache.iotdb.db.utils.QueryDataSetUtils;
@@ -875,6 +876,7 @@ public class TSServiceImpl implements TSIService.Iface, ServerContext {
QueryDataSet queryDataSet = queryId2DataSet.get().get(req.queryId);
TSQueryDataSet result = fillRpcReturnData(req.fetchSize, queryDataSet);
+
boolean hasResultSet = result.bufferForTime().limit() != 0;
if (!hasResultSet) {
queryId2DataSet.get().remove(req.queryId);
@@ -900,18 +902,27 @@ public class TSServiceImpl implements TSIService.Iface, ServerContext {
throw new TException(e);
}
TSQueryDataSet result;
- if (config.isEnableWatermark() && authorizer.isUserUseWaterMark(username.get())) {
- WatermarkEncoder encoder;
- if (config.getWatermarkMethodName().equals(IoTDBConfig.WATERMARK_GROUPED_LSB)) {
- encoder = new GroupedLSBWatermarkEncoder(config);
+ // optimize for query without value filter and
+ // !!!!!!!!!!!!!!!!!!Attention !!!!!!!!!!!!!!!!!!!
+ // !!!!!!!don't support watermark now!!!!!
+ if (queryDataSet instanceof EngineDataSetWithoutValueFilter) {
+ result = ((EngineDataSetWithoutValueFilter)queryDataSet).fillBuffer(fetchSize);
+ }
+ // TODO need to refactor the other query in the future
+ else {
+ if (config.isEnableWatermark() && authorizer.isUserUseWaterMark(username.get())) {
+ WatermarkEncoder encoder;
+ if (config.getWatermarkMethodName().equals(IoTDBConfig.WATERMARK_GROUPED_LSB)) {
+ encoder = new GroupedLSBWatermarkEncoder(config);
+ } else {
+ throw new UnSupportedDataTypeException(String.format(
+ "Watermark method is not supported yet: %s", config.getWatermarkMethodName()));
+ }
+ result = QueryDataSetUtils
+ .convertQueryDataSetByFetchSize(queryDataSet, fetchSize, encoder);
} else {
- throw new UnSupportedDataTypeException(String.format(
- "Watermark method is not supported yet: %s", config.getWatermarkMethodName()));
+ result = QueryDataSetUtils.convertQueryDataSetByFetchSize(queryDataSet, fetchSize);
}
- result = QueryDataSetUtils
- .convertQueryDataSetByFetchSize(queryDataSet, fetchSize, encoder);
- } else {
- result = QueryDataSetUtils.convertQueryDataSetByFetchSize(queryDataSet, fetchSize);
}
return result;
}