You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by ja...@apache.org on 2018/04/17 08:32:29 UTC

carbondata git commit: [CARBONDATA-2325]Page level uncompress and Improve query performance for unsafe no-dictionary columns

Repository: carbondata
Updated Branches:
  refs/heads/master 4a9adce46 -> ceac8abf6


[CARBONDATA-2325]Page level uncompress and Improve query performance for unsafe no-dictionary columns

Page Level Decoder for query
Added page level on demand decoding, in current code, all pages of blocklet is getting uncompressed, because of this memory footprint is too high and cause OOM, Now added code to support page level decoding, one page will be decoding when all the records are processed next page data will be decoded. It will improve query performance for example limit query.
Unsafe No Dictionary(Unsafe variable length)
Optimized getRow(for Vector processing) And putArray method

This closes #2149


Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/ceac8abf
Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/ceac8abf
Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/ceac8abf

Branch: refs/heads/master
Commit: ceac8abf619faff911c64f45d9de2386c6cfb91a
Parents: 4a9adce
Author: kumarvishal <ku...@gmail.com>
Authored: Mon Apr 9 19:40:13 2018 +0800
Committer: Jacky Li <ja...@qq.com>
Committed: Tue Apr 17 16:31:18 2018 +0800

----------------------------------------------------------------------
 .../chunk/impl/DimensionRawColumnChunk.java     |   6 +
 .../chunk/impl/MeasureRawColumnChunk.java       |   7 +-
 ...feVariableLengthDimensionDataChunkStore.java | 190 ++++++++++++-------
 .../scanner/impl/BlockletFilterScanner.java     |  20 +-
 .../scan/scanner/impl/BlockletFullScanner.java  |  16 +-
 5 files changed, 134 insertions(+), 105 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/carbondata/blob/ceac8abf/core/src/main/java/org/apache/carbondata/core/datastore/chunk/impl/DimensionRawColumnChunk.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/chunk/impl/DimensionRawColumnChunk.java b/core/src/main/java/org/apache/carbondata/core/datastore/chunk/impl/DimensionRawColumnChunk.java
index c7a8337..d5320d6 100644
--- a/core/src/main/java/org/apache/carbondata/core/datastore/chunk/impl/DimensionRawColumnChunk.java
+++ b/core/src/main/java/org/apache/carbondata/core/datastore/chunk/impl/DimensionRawColumnChunk.java
@@ -94,6 +94,11 @@ public class DimensionRawColumnChunk extends AbstractRawColumnChunk {
    */
   public DimensionColumnPage convertToDimColDataChunkWithOutCache(int index) {
     assert index < pagesCount;
+    // in case of filter query filter column if filter column is decoded and stored.
+    // then return the same
+    if (dataChunks != null && null != dataChunks[index]) {
+      return dataChunks[index];
+    }
     try {
       return chunkReader.decodeColumnPage(this, index);
     } catch (Exception e) {
@@ -111,6 +116,7 @@ public class DimensionRawColumnChunk extends AbstractRawColumnChunk {
         }
       }
     }
+    rawData = null;
   }
 
   public void setFileReader(FileReader fileReader) {

http://git-wip-us.apache.org/repos/asf/carbondata/blob/ceac8abf/core/src/main/java/org/apache/carbondata/core/datastore/chunk/impl/MeasureRawColumnChunk.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/chunk/impl/MeasureRawColumnChunk.java b/core/src/main/java/org/apache/carbondata/core/datastore/chunk/impl/MeasureRawColumnChunk.java
index 2311887..9448f30 100644
--- a/core/src/main/java/org/apache/carbondata/core/datastore/chunk/impl/MeasureRawColumnChunk.java
+++ b/core/src/main/java/org/apache/carbondata/core/datastore/chunk/impl/MeasureRawColumnChunk.java
@@ -93,7 +93,11 @@ public class MeasureRawColumnChunk extends AbstractRawColumnChunk {
    */
   public ColumnPage convertToColumnPageWithOutCache(int index) {
     assert index < pagesCount;
-
+    // in case of filter query filter columns blocklet pages will uncompressed
+    // so no need to decode again
+    if (null != columnPages && columnPages[index] != null) {
+      return columnPages[index];
+    }
     try {
       return chunkReader.decodeColumnPage(this, index);
     } catch (IOException | MemoryException e) {
@@ -111,6 +115,7 @@ public class MeasureRawColumnChunk extends AbstractRawColumnChunk {
         }
       }
     }
+    rawData = null;
   }
 
   public void setFileReader(FileReader fileReader) {

http://git-wip-us.apache.org/repos/asf/carbondata/blob/ceac8abf/core/src/main/java/org/apache/carbondata/core/datastore/chunk/store/impl/unsafe/UnsafeVariableLengthDimensionDataChunkStore.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/chunk/store/impl/unsafe/UnsafeVariableLengthDimensionDataChunkStore.java b/core/src/main/java/org/apache/carbondata/core/datastore/chunk/store/impl/unsafe/UnsafeVariableLengthDimensionDataChunkStore.java
index 11f2ab8..07dc806 100644
--- a/core/src/main/java/org/apache/carbondata/core/datastore/chunk/store/impl/unsafe/UnsafeVariableLengthDimensionDataChunkStore.java
+++ b/core/src/main/java/org/apache/carbondata/core/datastore/chunk/store/impl/unsafe/UnsafeVariableLengthDimensionDataChunkStore.java
@@ -44,10 +44,20 @@ public class UnsafeVariableLengthDimensionDataChunkStore
    */
   private long dataPointersOffsets;
 
+  /**
+   * Reusable data array
+   * this will be useful for vector scenario, as it will be created once and filled every time
+   * if new data length is bigger than exiting data length then create new data with bigger length
+   * and assign to value
+   */
+  private byte[] value;
+
   public UnsafeVariableLengthDimensionDataChunkStore(long totalSize, boolean isInvertedIdex,
       int numberOfRows) {
     super(totalSize, isInvertedIdex, numberOfRows);
     this.numberOfRows = numberOfRows;
+    // initials size assigning to some random value
+    this.value = new byte[20];
   }
 
   /**
@@ -78,70 +88,93 @@ public class UnsafeVariableLengthDimensionDataChunkStore
 
     // start position will be used to store the current data position
     int startOffset = 0;
-    // position from where offsets will start
-    long pointerOffsets = this.dataPointersOffsets;
     // as first position will be start from 2 byte as data is stored first in the memory block
     // we need to skip first two bytes this is because first two bytes will be length of the data
     // which we have to skip
-    CarbonUnsafe.getUnsafe().putInt(dataPageMemoryBlock.getBaseObject(),
-        dataPageMemoryBlock.getBaseOffset() + pointerOffsets,
-        CarbonCommonConstants.SHORT_SIZE_IN_BYTE);
-    // incrementing the pointers as first value is already filled and as we are storing as int
-    // we need to increment the 4 bytes to set the position of the next value to set
-    pointerOffsets += CarbonCommonConstants.INT_SIZE_IN_BYTE;
+    int [] dataOffsets = new int[numberOfRows];
+    dataOffsets[0] = CarbonCommonConstants.SHORT_SIZE_IN_BYTE;
     // creating a byte buffer which will wrap the length of the row
-    // using byte buffer as unsafe will return bytes in little-endian encoding
-    ByteBuffer buffer = ByteBuffer.allocate(CarbonCommonConstants.SHORT_SIZE_IN_BYTE);
-    // store length of data
-    byte[] length = new byte[CarbonCommonConstants.SHORT_SIZE_IN_BYTE];
-    // as first offset is already stored, we need to start from the 2nd row in data array
+    ByteBuffer buffer = ByteBuffer.wrap(data);
     for (int i = 1; i < numberOfRows; i++) {
-      // first copy the length of previous row
-      CarbonUnsafe.getUnsafe().copyMemory(dataPageMemoryBlock.getBaseObject(),
-          dataPageMemoryBlock.getBaseOffset() + startOffset, length, CarbonUnsafe.BYTE_ARRAY_OFFSET,
-          CarbonCommonConstants.SHORT_SIZE_IN_BYTE);
-      buffer.put(length);
-      buffer.flip();
+      buffer.position(startOffset);
       // so current row position will be
       // previous row length + 2 bytes used for storing previous row data
-      startOffset += CarbonCommonConstants.SHORT_SIZE_IN_BYTE + buffer.getShort();
+      startOffset += buffer.getShort() + CarbonCommonConstants.SHORT_SIZE_IN_BYTE;
       // as same byte buffer is used to avoid creating many byte buffer for each row
       // we need to clear the byte buffer
-      buffer.clear();
-      // now put the offset of current row, here we need to add 2 more bytes as current will
-      // also have length part so we have to skip length
-      CarbonUnsafe.getUnsafe().putInt(dataPageMemoryBlock.getBaseObject(),
-          dataPageMemoryBlock.getBaseOffset() + pointerOffsets,
-          startOffset + CarbonCommonConstants.SHORT_SIZE_IN_BYTE);
-      // incrementing the pointers as first value is already filled and as we are storing as int
-      // we need to increment the 4 bytes to set the position of the next value to set
-      pointerOffsets += CarbonCommonConstants.INT_SIZE_IN_BYTE;
+      dataOffsets[i] = startOffset + CarbonCommonConstants.SHORT_SIZE_IN_BYTE;
     }
-
+    CarbonUnsafe.getUnsafe().copyMemory(dataOffsets, CarbonUnsafe.INT_ARRAY_OFFSET,
+        dataPageMemoryBlock.getBaseObject(),
+        dataPageMemoryBlock.getBaseOffset() + this.dataPointersOffsets,
+        dataOffsets.length * CarbonCommonConstants.INT_SIZE_IN_BYTE);
   }
 
   /**
    * Below method will be used to get the row based on row id passed
-   *
+   * Getting the row from unsafe works in below logic
+   * 1. if inverted index is present then get the row id based on reverse inverted index
+   * 2. get the current row id data offset
+   * 3. if it's not a last row- get the next row offset
+   * Subtract the current row offset + 2 bytes(to skip the data length) with next row offset
+   * 4. if it's last row
+   * subtract the current row offset + 2 bytes(to skip the data length) with complete data length
    * @param rowId
    * @return row
    */
   @Override public byte[] getRow(int rowId) {
+    // get the actual row id
+    rowId = getRowId(rowId);
+    // get offset of data in unsafe
+    int currentDataOffset = getOffSet(rowId);
+    // get the data length
+    short length = getLength(rowId, currentDataOffset);
+    // create data array
+    byte[] data = new byte[length];
+    // fill the row data
+    fillRowInternal(length, data, currentDataOffset);
+    return data;
+  }
+
+  /**
+   * Returns the actual row id for data
+   * if inverted index is present then get the row id based on reverse inverted index
+   * otherwise return the same row id
+   * @param rowId row id
+   * @return actual row id
+   */
+  private int getRowId(int rowId) {
     // if column was explicitly sorted we need to get the rowid based inverted index reverse
     if (isExplicitSorted) {
       rowId = CarbonUnsafe.getUnsafe().getInt(dataPageMemoryBlock.getBaseObject(),
           dataPageMemoryBlock.getBaseOffset() + this.invertedIndexReverseOffset + ((long)rowId
               * CarbonCommonConstants.INT_SIZE_IN_BYTE));
     }
-    // now to get the row from memory block we need to do following thing
-    // 1. first get the current offset
-    // 2. if it's not a last row- get the next row offset
-    // Subtract the current row offset + 2 bytes(to skip the data length) with next row offset
-    // else subtract the current row offset + 2 bytes(to skip the data length)
-    // with complete data length
-    int currentDataOffset = CarbonUnsafe.getUnsafe().getInt(dataPageMemoryBlock.getBaseObject(),
-        dataPageMemoryBlock.getBaseOffset() + this.dataPointersOffsets + (rowId
+    return rowId;
+  }
+
+  /**
+   * get data offset based on current row id
+   * @param rowId row id
+   * @return data offset
+   */
+  private int getOffSet(int rowId) {
+    return CarbonUnsafe.getUnsafe().getInt(dataPageMemoryBlock.getBaseObject(),
+        dataPageMemoryBlock.getBaseOffset() + this.dataPointersOffsets + ((long)rowId
             * CarbonCommonConstants.INT_SIZE_IN_BYTE));
+  }
+
+  /**
+   * To get the length of data for row id
+   * if it's not a last row- get the next row offset
+   * Subtract the current row offset + 2 bytes(to skip the data length) with next row offset
+   * if it's last row
+   * subtract the current row offset + 2 bytes(to skip the data length) with complete data length
+   * @param rowId rowId
+   * @param currentDataOffset current data offset
+   * @return length of row
+   */
+  private short getLength(int rowId, int currentDataOffset) {
     short length = 0;
     // calculating the length of data
     if (rowId < numberOfRows - 1) {
@@ -154,34 +187,70 @@ public class UnsafeVariableLengthDimensionDataChunkStore
       // for last record we need to subtract with data length
       length = (short) (this.dataLength - currentDataOffset);
     }
-    byte[] data = new byte[length];
+    return length;
+  }
+
+  /**
+   * Return the row from unsafe
+   * @param length length of the data
+   * @param data data array
+   * @param currentDataOffset current data offset
+   */
+  private void fillRowInternal(short length, byte[] data, int currentDataOffset) {
     CarbonUnsafe.getUnsafe().copyMemory(dataPageMemoryBlock.getBaseObject(),
         dataPageMemoryBlock.getBaseOffset() + currentDataOffset, data,
         CarbonUnsafe.BYTE_ARRAY_OFFSET, length);
-    return data;
   }
 
+  /**
+   *
+   * Below method will be used to put the row in vector based on row id passed
+   * Getting the row from unsafe works in below logic
+   * 1. if inverted index is present then get the row id based on reverse inverted index
+   * 2. get the current row id data offset
+   * 3. if it's not a last row- get the next row offset
+   * Subtract the current row offset + 2 bytes(to skip the data length) with next row offset
+   * 4. if it's last row
+   * subtract the current row offset + 2 bytes(to skip the data length) with complete data length
+   * @param rowId row id
+   * @param vector vector to be filled
+   * @param vectorRow vector row id
+   *
+   */
   @Override public void fillRow(int rowId, CarbonColumnVector vector, int vectorRow) {
-    byte[] value = getRow(rowId);
+    // get the row id from reverse inverted index based on row id
+    rowId = getRowId(rowId);
+    // get the current row offset
+    int currentDataOffset = getOffSet(rowId);
+    // get the row data length
+    short length = getLength(rowId, currentDataOffset);
+    // check if value length is less the current data length
+    // then create a new array else use the same
+    if (length > value.length) {
+      value = new byte[length];
+    }
+    // get the row from unsafe
+    fillRowInternal(length, value, currentDataOffset);
     DataType dt = vector.getType();
-    if ((!(dt == DataTypes.STRING) && value.length == 0) || ByteUtil.UnsafeComparer.INSTANCE
-        .equals(CarbonCommonConstants.MEMBER_DEFAULT_VAL_ARRAY, value)) {
+    if ((!(dt == DataTypes.STRING) && length == 0) || ByteUtil.UnsafeComparer.INSTANCE
+        .equals(CarbonCommonConstants.MEMBER_DEFAULT_VAL_ARRAY, 0,
+            CarbonCommonConstants.MEMBER_DEFAULT_VAL_ARRAY.length, value, 0, length)) {
       vector.putNull(vectorRow);
     } else {
       if (dt == DataTypes.STRING) {
-        vector.putBytes(vectorRow, 0, value.length, value);
+        vector.putBytes(vectorRow, 0, length, value);
       } else if (dt == DataTypes.BOOLEAN) {
         vector.putBoolean(vectorRow, ByteUtil.toBoolean(value[0]));
       } else if (dt == DataTypes.SHORT) {
-        vector.putShort(vectorRow, ByteUtil.toShort(value, 0, value.length));
+        vector.putShort(vectorRow, ByteUtil.toShort(value, 0, length));
       } else if (dt == DataTypes.INT) {
-        vector.putInt(vectorRow, ByteUtil.toInt(value, 0, value.length));
+        vector.putInt(vectorRow, ByteUtil.toInt(value, 0, length));
       } else if (dt == DataTypes.LONG) {
         vector.putLong(vectorRow,
             DataTypeUtil.getDataBasedOnRestructuredDataType(value, vector.getBlockDataType(), 0,
-                value.length));
+                length));
       } else if (dt == DataTypes.TIMESTAMP) {
-        vector.putLong(vectorRow, ByteUtil.toLong(value, 0, value.length) * 1000L);
+        vector.putLong(vectorRow, ByteUtil.toLong(value, 0, length) * 1000L);
       }
     }
   }
@@ -194,27 +263,8 @@ public class UnsafeVariableLengthDimensionDataChunkStore
    * @return compare result
    */
   @Override public int compareTo(int rowId, byte[] compareValue) {
-    // now to get the row from memory block we need to do following thing
-    // 1. first get the current offset
-    // 2. if it's not a last row- get the next row offset
-    // Subtract the current row offset + 2 bytes(to skip the data length) with next row offset
-    // else subtract the current row offset
-    // with complete data length get the offset of set of data
-    int currentDataOffset = CarbonUnsafe.getUnsafe().getInt(dataPageMemoryBlock.getBaseObject(),
-        dataPageMemoryBlock.getBaseOffset() + this.dataPointersOffsets + ((long) rowId
-            * CarbonCommonConstants.INT_SIZE_IN_BYTE * 1L));
-    short length = 0;
-    // calculating the length of data
-    if (rowId < numberOfRows - 1) {
-      int OffsetOfNextdata = CarbonUnsafe.getUnsafe().getInt(dataPageMemoryBlock.getBaseObject(),
-          dataPageMemoryBlock.getBaseOffset() + this.dataPointersOffsets + ((rowId + 1)
-              * CarbonCommonConstants.INT_SIZE_IN_BYTE));
-      length = (short) (OffsetOfNextdata - (currentDataOffset
-          + CarbonCommonConstants.SHORT_SIZE_IN_BYTE));
-    } else {
-      // for last record we need to subtract with data length
-      length = (short) (this.dataLength - currentDataOffset);
-    }
+    int currentDataOffset = getOffSet(rowId);;
+    short length = getLength(rowId, currentDataOffset);
     // as this class handles this variable length data, so filter value can be
     // smaller or bigger than than actual data, so we need to take the smaller length
     int compareResult;

http://git-wip-us.apache.org/repos/asf/carbondata/blob/ceac8abf/core/src/main/java/org/apache/carbondata/core/scan/scanner/impl/BlockletFilterScanner.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/scan/scanner/impl/BlockletFilterScanner.java b/core/src/main/java/org/apache/carbondata/core/scan/scanner/impl/BlockletFilterScanner.java
index 033c3dd..ec49c77 100644
--- a/core/src/main/java/org/apache/carbondata/core/scan/scanner/impl/BlockletFilterScanner.java
+++ b/core/src/main/java/org/apache/carbondata/core/scan/scanner/impl/BlockletFilterScanner.java
@@ -260,16 +260,6 @@ public class BlockletFilterScanner extends BlockletFullScanner {
 
     DimensionColumnPage[][] dimensionColumnPages =
         new DimensionColumnPage[numDimensionChunks][numPages];
-    for (int chunkIndex = 0; chunkIndex < numDimensionChunks; chunkIndex++) {
-      if (dimensionRawColumnChunks[chunkIndex] != null) {
-        for (int pageId = 0; pageId < numPages; pageId++) {
-          dimensionColumnPages[chunkIndex][pageId] =
-              dimensionRawColumnChunks[chunkIndex].decodeColumnPage(pageId);
-        }
-      }
-    }
-
-
     MeasureRawColumnChunk[] measureRawColumnChunks =
         new MeasureRawColumnChunk[blockExecutionInfo.getTotalNumberOfMeasureToRead()];
     int numMeasureChunks = measureRawColumnChunks.length;
@@ -302,21 +292,13 @@ public class BlockletFilterScanner extends BlockletFullScanner {
       }
     }
     ColumnPage[][] measureColumnPages = new ColumnPage[numMeasureChunks][numPages];
-    for (int chunkIndex = 0; chunkIndex < numMeasureChunks; chunkIndex++) {
-      if (measureRawColumnChunks[chunkIndex] != null) {
-        for (int pageId = 0; pageId < numPages; pageId++) {
-          measureColumnPages[chunkIndex][pageId] =
-              measureRawColumnChunks[chunkIndex].decodeColumnPage(pageId);
-        }
-      }
-    }
-
     scannedResult.setDimensionColumnPages(dimensionColumnPages);
     scannedResult.setPageFilteredRowId(pageFilteredRowId);
     scannedResult.setMeasureColumnPages(measureColumnPages);
     scannedResult.setDimRawColumnChunks(dimensionRawColumnChunks);
     scannedResult.setMsrRawColumnChunks(measureRawColumnChunks);
     scannedResult.setPageFilteredRowCount(pageFilteredRowCount);
+    scannedResult.fillDataChunks();
     // adding statistics for carbon scan time
     QueryStatistic scanTime = queryStatisticsModel.getStatisticsTypeAndObjMap()
         .get(QueryStatisticsConstants.SCAN_BLOCKlET_TIME);

http://git-wip-us.apache.org/repos/asf/carbondata/blob/ceac8abf/core/src/main/java/org/apache/carbondata/core/scan/scanner/impl/BlockletFullScanner.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/scan/scanner/impl/BlockletFullScanner.java b/core/src/main/java/org/apache/carbondata/core/scan/scanner/impl/BlockletFullScanner.java
index f0211dc..0cb4059 100644
--- a/core/src/main/java/org/apache/carbondata/core/scan/scanner/impl/BlockletFullScanner.java
+++ b/core/src/main/java/org/apache/carbondata/core/scan/scanner/impl/BlockletFullScanner.java
@@ -101,18 +101,6 @@ public class BlockletFullScanner implements BlockletScanner {
     scannedResult.setMeasureColumnPages(measureColumnPages);
     scannedResult.setDimRawColumnChunks(dimensionRawColumnChunks);
     scannedResult.setMsrRawColumnChunks(measureRawColumnChunks);
-    if (blockExecutionInfo.isPrefetchBlocklet()) {
-      for (int i = 0; i < dimensionRawColumnChunks.length; i++) {
-        if (dimensionRawColumnChunks[i] != null) {
-          dimensionColumnDataChunks[i] = dimensionRawColumnChunks[i].decodeAllColumnPages();
-        }
-      }
-      for (int i = 0; i < measureRawColumnChunks.length; i++) {
-        if (measureRawColumnChunks[i] != null) {
-          measureColumnPages[i] = measureRawColumnChunks[i].decodeAllColumnPages();
-        }
-      }
-    }
     int[] numberOfRows = null;
     if (blockExecutionInfo.getAllSelectedDimensionColumnIndexRange().length > 0) {
       for (int i = 0; i < dimensionRawColumnChunks.length; i++) {
@@ -145,9 +133,7 @@ public class BlockletFullScanner implements BlockletScanner {
       }
     }
     scannedResult.setPageFilteredRowCount(numberOfRows);
-    if (!blockExecutionInfo.isPrefetchBlocklet()) {
-      scannedResult.fillDataChunks();
-    }
+    scannedResult.fillDataChunks();
     // adding statistics for carbon scan time
     QueryStatistic scanTime = queryStatisticsModel.getStatisticsTypeAndObjMap()
         .get(QueryStatisticsConstants.SCAN_BLOCKlET_TIME);