You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by qi...@apache.org on 2019/12/13 07:48:30 UTC

[incubator-iotdb] branch f_batch_reader updated: add fill buffer in EngineDataSetWithoutValueFilter (#646)

This is an automated email from the ASF dual-hosted git repository.

qiaojialin pushed a commit to branch f_batch_reader
in repository https://gitbox.apache.org/repos/asf/incubator-iotdb.git


The following commit(s) were added to refs/heads/f_batch_reader by this push:
     new 05f33f0  add fill buffer in EngineDataSetWithoutValueFilter (#646)
05f33f0 is described below

commit 05f33f01a644a788019175bc205e7ae116b20754
Author: Jackie Tien <Ja...@foxmail.com>
AuthorDate: Fri Dec 13 15:48:24 2019 +0800

    add fill buffer in EngineDataSetWithoutValueFilter (#646)
    
    * add fill buffer in EngineDataSetWithoutValueFilter
---
 .../dataset/EngineDataSetWithoutValueFilter.java   | 231 ++++++++++++---------
 .../org/apache/iotdb/db/service/TSServiceImpl.java |  31 ++-
 2 files changed, 158 insertions(+), 104 deletions(-)

diff --git a/server/src/main/java/org/apache/iotdb/db/query/dataset/EngineDataSetWithoutValueFilter.java b/server/src/main/java/org/apache/iotdb/db/query/dataset/EngineDataSetWithoutValueFilter.java
index 364f229..6cdef0c 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/dataset/EngineDataSetWithoutValueFilter.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/dataset/EngineDataSetWithoutValueFilter.java
@@ -19,47 +19,33 @@
 
 package org.apache.iotdb.db.query.dataset;
 
-import java.io.IOException;
-import java.nio.ByteBuffer;
-import java.util.List;
-import java.util.TreeSet;
-import org.apache.iotdb.db.query.reader.IPointReader;
-import org.apache.iotdb.db.utils.TimeValuePair;
-import org.apache.iotdb.db.utils.TsPrimitiveType;
+import org.apache.iotdb.service.rpc.thrift.TSQueryDataSet;
 import org.apache.iotdb.tsfile.exception.write.UnSupportedDataTypeException;
 import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
 import org.apache.iotdb.tsfile.read.common.BatchData;
-import org.apache.iotdb.tsfile.read.common.Field;
 import org.apache.iotdb.tsfile.read.common.Path;
 import org.apache.iotdb.tsfile.read.common.RowRecord;
 import org.apache.iotdb.tsfile.read.query.dataset.QueryDataSet;
 import org.apache.iotdb.tsfile.read.reader.IBatchReader;
 
-/**
- * TODO implement this class as TsFile DataSetWithoutTimeGenerator.
- */
+import java.io.ByteArrayOutputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.TreeSet;
+
+
 public class EngineDataSetWithoutValueFilter extends QueryDataSet {
 
   private List<IBatchReader> seriesReaderWithoutValueFilterList;
 
-  private TimeValuePair[] cacheTimeValueList;
-
   private TreeSet<Long> timeHeap;
 
+  private BatchData[] cachedBatchDataArray;
 
-  private List<List<BatchData>> batchDataInFilter;
-
-  private ByteBuffer timeBuffer;
-
-  /**
-   * each buffer contains values of one time series
-   */
-  private List<ByteBuffer> valueBuffers;
-
-  /**
-   * indicate whether the points in value buffer are null
-   */
-  private List<ByteBuffer> bitmapBuffer;
+  private static final int flag = 0x01;
 
   /**
    * constructor of EngineDataSetWithoutValueFilter.
@@ -79,90 +65,150 @@ public class EngineDataSetWithoutValueFilter extends QueryDataSet {
 
   private void initHeap() throws IOException {
     timeHeap = new TreeSet<>();
-    cacheTimeValueList = new TimeValuePair[seriesReaderWithoutValueFilterList.size()];
+    cachedBatchDataArray = new BatchData[seriesReaderWithoutValueFilterList.size()];
 
     for (int i = 0; i < seriesReaderWithoutValueFilterList.size(); i++) {
       IBatchReader reader = seriesReaderWithoutValueFilterList.get(i);
-      if (reader.hasNext()) {
-        TimeValuePair timeValuePair = reader.next();
-        cacheTimeValueList[i] = timeValuePair;
-        timeHeapPut(timeValuePair.getTimestamp());
+      if (reader.hasNextBatch()) {
+        BatchData batchData = reader.nextBatch();
+        cachedBatchDataArray[i] = batchData;
+        timeHeapPut(batchData.currentTime());
       }
     }
   }
 
 
   /**
-   *
-   * Yuan Tian !!!
-   *
    * fill time buffer, value buffers and bitmap buffers
-   *
-   * @param batchReaders readers of each time series
    */
-  private void fillBuffer(List<IBatchReader> batchReaders) {
-
-  }
-
-  @Override
-  protected boolean hasNextWithoutConstraint() {
-    return !timeHeap.isEmpty();
-  }
-
-  @Override
-  protected RowRecord nextWithoutConstraint() throws IOException {
-    long minTime = timeHeapGet();
-
-    RowRecord record = new RowRecord(minTime);
+  public TSQueryDataSet fillBuffer(int fetchSize) throws IOException {
+    int columnNum = seriesReaderWithoutValueFilterList.size();
+    TSQueryDataSet tsQueryDataSet = new TSQueryDataSet();
+    // one time column and each value column has a actual value buffer and a bitmap value to indicate whether it is a null
+    int columnNumWithTime = columnNum * 2 + 1;
+    DataOutputStream[] dataOutputStreams = new DataOutputStream[columnNumWithTime];
+    ByteArrayOutputStream[] byteArrayOutputStreams = new ByteArrayOutputStream[columnNumWithTime];
+    for (int i = 0; i < columnNumWithTime; i++) {
+      byteArrayOutputStreams[i] = new ByteArrayOutputStream();
+      dataOutputStreams[i] = new DataOutputStream(byteArrayOutputStreams[i]);
+    }
 
-    for (int i = 0; i < seriesReaderWithoutValueFilterList.size(); i++) {
-      IBatchReader reader = seriesReaderWithoutValueFilterList.get(i);
-      if (cacheTimeValueList[i] == null) {
-        record.addField(new Field(null));
-      } else {
-        if (cacheTimeValueList[i].getTimestamp() == minTime) {
-          record.addField(getField(cacheTimeValueList[i].getValue(), dataTypes.get(i)));
-          if (seriesReaderWithoutValueFilterList.get(i).hasNext()) {
-            cacheTimeValueList[i] = reader.next();
-            timeHeapPut(cacheTimeValueList[i].getTimestamp());
+    int rowCount = 0;
+    int[] valueOccupation = new int[columnNum];
+    // used to record a bitmap for every 8 row record
+    int[] bitmap = new int[columnNum];
+    for (int i = 0; i < fetchSize; i++) {
+      if (!timeHeap.isEmpty()) {
+        long minTime = timeHeapGet();
+        // use columnOutput to write byte array
+        dataOutputStreams[0].writeLong(minTime);
+        for (int k = 0; k < cachedBatchDataArray.length; k++) {
+          if (cachedBatchDataArray[k] != null && !cachedBatchDataArray[k].hasNext()) {
+            if (seriesReaderWithoutValueFilterList.get(k).hasNextBatch())
+              cachedBatchDataArray[k] = seriesReaderWithoutValueFilterList.get(k).nextBatch();
+            else
+              cachedBatchDataArray[k] = null;
+          }
+          BatchData batchData = cachedBatchDataArray[k];
+          DataOutputStream dataOutputStream = dataOutputStreams[2*k + 1]; // DO NOT FORGET +1
+          if (batchData == null || batchData.currentTime() != minTime) {
+            bitmap[k] =  (bitmap[k] << 1);
+            if (batchData != null)
+              timeHeapPut(batchData.currentTime());
+          } else {
+            bitmap[k] =  (bitmap[k] << 1) | flag;
+            TSDataType type = batchData.getDataType();
+            switch (type) {
+              case INT32:
+                dataOutputStream.writeInt(batchData.getInt());
+                valueOccupation[k] += 4;
+                break;
+              case INT64:
+                dataOutputStream.writeLong(batchData.getLong());
+                valueOccupation[k] += 8;
+                break;
+              case FLOAT:
+                dataOutputStream.writeFloat(batchData.getFloat());
+                valueOccupation[k] += 4;
+                break;
+              case DOUBLE:
+                dataOutputStream.writeDouble(batchData.getDouble());
+                valueOccupation[k] += 8;
+                break;
+              case BOOLEAN:
+                dataOutputStream.writeBoolean(batchData.getBoolean());
+                valueOccupation[k] += 1;
+                break;
+              case TEXT:
+                dataOutputStream.writeInt(batchData.getBinary().getLength());
+                dataOutputStream.write(batchData.getBinary().getValues());
+                valueOccupation[k] = valueOccupation[k] + 4 + batchData.getBinary().getLength();
+                break;
+              default:
+                throw new UnSupportedDataTypeException(
+                        String.format("Data type %s is not supported.", type));
+            }
           }
-        } else {
-          record.addField(new Field(null));
         }
+        rowCount++;
+        if (rowCount % 8 == 0) {
+          for (int j = 0; j < bitmap.length; j++) {
+            DataOutputStream dataBitmapOutputStream = dataOutputStreams[2*(j+1)];
+            dataBitmapOutputStream.writeByte(bitmap[j]);
+            // we should clear the bitmap every 8 row record
+            bitmap[j] = 0;
+          }
+        }
+      } else {
+        break;
       }
     }
 
-    return record;
-  }
-
-  private Field getField(TsPrimitiveType tsPrimitiveType, TSDataType dataType) {
-    if (tsPrimitiveType == null) {
-      return new Field(null);
+    // feed the remaining bitmap
+    int remaining = rowCount % 8;
+    if (remaining != 0) {
+      for (int j = 0; j < bitmap.length; j++) {
+        DataOutputStream dataBitmapOutputStream = dataOutputStreams[2*(j+1)];
+        dataBitmapOutputStream.writeByte(bitmap[j] << (8-remaining));
+      }
     }
-    Field field = new Field(dataType);
-    switch (dataType) {
-      case INT32:
-        field.setIntV(tsPrimitiveType.getInt());
-        break;
-      case INT64:
-        field.setLongV(tsPrimitiveType.getLong());
-        break;
-      case FLOAT:
-        field.setFloatV(tsPrimitiveType.getFloat());
-        break;
-      case DOUBLE:
-        field.setDoubleV(tsPrimitiveType.getDouble());
-        break;
-      case BOOLEAN:
-        field.setBoolV(tsPrimitiveType.getBoolean());
-        break;
-      case TEXT:
-        field.setBinaryV(tsPrimitiveType.getBinary());
-        break;
-      default:
-        throw new UnSupportedDataTypeException("UnSupported: " + dataType);
+
+    // calculate the time buffer size
+    int timeOccupation = rowCount * 8;
+    ByteBuffer timeBuffer = ByteBuffer.allocate(timeOccupation);
+    timeBuffer.put(byteArrayOutputStreams[0].toByteArray());
+    timeBuffer.flip();
+    tsQueryDataSet.setTime(timeBuffer);
+
+    // calculate the bitmap buffer size
+    int bitmapOccupation = rowCount / 8 + (rowCount % 8 == 0 ? 0 : 1);
+
+    List<ByteBuffer> bitmapList = new LinkedList<>();
+    List<ByteBuffer> valueList = new LinkedList<>();
+    for (int i = 1; i < byteArrayOutputStreams.length; i += 2) {
+      ByteBuffer valueBuffer = ByteBuffer.allocate(valueOccupation[(i-1)/2]);
+      valueBuffer.put(byteArrayOutputStreams[i].toByteArray());
+      valueBuffer.flip();
+      valueList.add(valueBuffer);
+
+      ByteBuffer bitmapBuffer = ByteBuffer.allocate(bitmapOccupation);
+      bitmapBuffer.put(byteArrayOutputStreams[i+1].toByteArray());
+      bitmapBuffer.flip();
+      bitmapList.add(bitmapBuffer);
     }
-    return field;
+    tsQueryDataSet.setBitmapList(bitmapList);
+    tsQueryDataSet.setValueList(valueList);
+    return tsQueryDataSet;
+  }
+
+  @Override
+  protected boolean hasNextWithoutConstraint() throws IOException {
+    throw new IOException("The method can't be invoked, please try to use fillBuffer method directly!");
+  }
+
+  @Override
+  protected RowRecord nextWithoutConstraint() throws IOException {
+    throw new IOException("The method can't be invoked, please try to use fillBuffer method directly!");
   }
 
   /**
@@ -176,7 +222,4 @@ public class EngineDataSetWithoutValueFilter extends QueryDataSet {
     return timeHeap.pollFirst();
   }
 
-  public List<IPointReader> getReaders() {
-    return seriesReaderWithoutValueFilterList;
-  }
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/service/TSServiceImpl.java b/server/src/main/java/org/apache/iotdb/db/service/TSServiceImpl.java
index ddaed4e..fac6dd5 100644
--- a/server/src/main/java/org/apache/iotdb/db/service/TSServiceImpl.java
+++ b/server/src/main/java/org/apache/iotdb/db/service/TSServiceImpl.java
@@ -49,6 +49,7 @@ import org.apache.iotdb.db.qp.physical.crud.QueryPlan;
 import org.apache.iotdb.db.qp.physical.sys.*;
 import org.apache.iotdb.db.query.context.QueryContext;
 import org.apache.iotdb.db.query.control.QueryResourceManager;
+import org.apache.iotdb.db.query.dataset.EngineDataSetWithoutValueFilter;
 import org.apache.iotdb.db.tools.watermark.GroupedLSBWatermarkEncoder;
 import org.apache.iotdb.db.tools.watermark.WatermarkEncoder;
 import org.apache.iotdb.db.utils.QueryDataSetUtils;
@@ -875,6 +876,7 @@ public class TSServiceImpl implements TSIService.Iface, ServerContext {
       QueryDataSet queryDataSet = queryId2DataSet.get().get(req.queryId);
       TSQueryDataSet result = fillRpcReturnData(req.fetchSize, queryDataSet);
 
+
       boolean hasResultSet = result.bufferForTime().limit() != 0;
       if (!hasResultSet) {
         queryId2DataSet.get().remove(req.queryId);
@@ -900,18 +902,27 @@ public class TSServiceImpl implements TSIService.Iface, ServerContext {
       throw new TException(e);
     }
     TSQueryDataSet result;
-    if (config.isEnableWatermark() && authorizer.isUserUseWaterMark(username.get())) {
-      WatermarkEncoder encoder;
-      if (config.getWatermarkMethodName().equals(IoTDBConfig.WATERMARK_GROUPED_LSB)) {
-        encoder = new GroupedLSBWatermarkEncoder(config);
+    // optimize for query without value filter and
+    // !!!!!!!!!!!!!!!!!!Attention !!!!!!!!!!!!!!!!!!!
+    // !!!!!!!don't support watermark now!!!!!
+    if (queryDataSet instanceof EngineDataSetWithoutValueFilter) {
+      result = ((EngineDataSetWithoutValueFilter)queryDataSet).fillBuffer(fetchSize);
+    }
+    // TODO need to refactor the other query in the future
+    else {
+      if (config.isEnableWatermark() && authorizer.isUserUseWaterMark(username.get())) {
+        WatermarkEncoder encoder;
+        if (config.getWatermarkMethodName().equals(IoTDBConfig.WATERMARK_GROUPED_LSB)) {
+          encoder = new GroupedLSBWatermarkEncoder(config);
+        } else {
+          throw new UnSupportedDataTypeException(String.format(
+                  "Watermark method is not supported yet: %s", config.getWatermarkMethodName()));
+        }
+        result = QueryDataSetUtils
+                .convertQueryDataSetByFetchSize(queryDataSet, fetchSize, encoder);
       } else {
-        throw new UnSupportedDataTypeException(String.format(
-            "Watermark method is not supported yet: %s", config.getWatermarkMethodName()));
+        result = QueryDataSetUtils.convertQueryDataSetByFetchSize(queryDataSet, fetchSize);
       }
-      result = QueryDataSetUtils
-          .convertQueryDataSetByFetchSize(queryDataSet, fetchSize, encoder);
-    } else {
-      result = QueryDataSetUtils.convertQueryDataSetByFetchSize(queryDataSet, fetchSize);
     }
     return result;
   }