You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@orc.apache.org by do...@apache.org on 2021/12/25 04:04:25 UTC

[orc] branch main updated: ORC-1060: Reduce memory usage when vectorized reading dictionary string encoding columns (#971)

This is an automated email from the ASF dual-hosted git repository.

dongjoon pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/orc.git


The following commit(s) were added to refs/heads/main by this push:
     new 3a2cb60  ORC-1060: Reduce memory usage when vectorized reading dictionary string encoding columns (#971)
3a2cb60 is described below

commit 3a2cb60e4ab6af6305c351fbdb51b98f460f64a0
Author: expxiaoli <ex...@users.noreply.github.com>
AuthorDate: Sat Dec 25 12:01:07 2021 +0800

    ORC-1060: Reduce memory usage when vectorized reading dictionary string encoding columns (#971)
    
    ### What changes were proposed in this pull request?
    
    In old code, when dictionary string encoding columns are read by vectorized reading, 2 copy of current stripe's dictionary data and 1 copy of next stripe's dictionary data are hold in memory when reading across different stripes. That could make vectorized reading's memory usage is larger than row reading. This patch fixes this issue, and only hold 1 copy of current stripe's dictionary data.
    
    This patch logic has 3 parts:
    
    1) Directly read data to primitive byte array, rather than using DynamicByteArray as intermediate variable. Using DynamicByteArray as intermediate variable causes 2 copy of current stripe's dictionary data are hold in memory.
    
    2) Lazy read dictionary data until read current batch data. In previous code, RecordReaderImpl class's nextBatch method reads dictionary data of next stripe through advanceToNextRow method, then memory will hold two stripe's dictionary data. Through lazy read logic, only one stripe's dictionary data is hold in memory when reading across different stripes.
    
    3) Before lazy read dictionary data from current stripe, remove batch data's reference to dictionary data from previous stripe. This could allow GC to clean previous stripe's dictionary data memory.
    
    ### Why are the changes needed?
    
    Reduce memory usage.
    
    ### How was this patch tested?
    
    Pass the existing CIs.
---
 .../org/apache/orc/impl/TreeReaderFactory.java     | 76 +++++++++++++---------
 1 file changed, 47 insertions(+), 29 deletions(-)

diff --git a/java/core/src/java/org/apache/orc/impl/TreeReaderFactory.java b/java/core/src/java/org/apache/orc/impl/TreeReaderFactory.java
index f1002a1..47ee911 100644
--- a/java/core/src/java/org/apache/orc/impl/TreeReaderFactory.java
+++ b/java/core/src/java/org/apache/orc/impl/TreeReaderFactory.java
@@ -2148,12 +2148,15 @@ public class TreeReaderFactory {
    */
   public static class StringDictionaryTreeReader extends TreeReader {
     private static final byte[] EMPTY_BYTE_ARRAY = new byte[0];
-    private DynamicByteArray dictionaryBuffer;
     private int[] dictionaryOffsets;
     protected IntegerReader reader;
+    private InStream lengthStream;
+    private InStream dictionaryStream;
+    private OrcProto.ColumnEncoding lengthEncoding;
 
-    private byte[] dictionaryBufferInBytesCache = null;
+    private byte[] dictionaryBuffer = null;
     private final LongColumnVector scratchlcv;
+    private boolean initDictionary = false;
 
     StringDictionaryTreeReader(int columnId, Context context) throws IOException {
       this(columnId, null, null, null, null, null, context);
@@ -2167,14 +2170,10 @@ public class TreeReaderFactory {
       if (data != null && encoding != null) {
         this.reader = createIntegerReader(encoding.getKind(), data, false, context);
       }
-
-      if (dictionary != null && encoding != null) {
-        readDictionaryStream(dictionary);
-      }
-
-      if (length != null && encoding != null) {
-        readDictionaryLengthStream(length, encoding);
-      }
+      lengthStream = length;
+      dictionaryStream = dictionary;
+      lengthEncoding = encoding;
+      initDictionary = false;
     }
 
     @Override
@@ -2190,15 +2189,14 @@ public class TreeReaderFactory {
     public void startStripe(StripePlanner planner, ReadPhase readPhase) throws IOException {
       super.startStripe(planner, readPhase);
 
-      // read the dictionary blob
       StreamName name = new StreamName(columnId,
           OrcProto.Stream.Kind.DICTIONARY_DATA);
-      InStream in = planner.getStream(name);
-      readDictionaryStream(in);
+      dictionaryStream = planner.getStream(name);
+      initDictionary = false;
 
       // read the lengths
       name = new StreamName(columnId, OrcProto.Stream.Kind.LENGTH);
-      in = planner.getStream(name);
+      InStream in = planner.getStream(name);
       OrcProto.ColumnEncoding encoding = planner.getEncoding(columnId);
       readDictionaryLengthStream(in, encoding);
 
@@ -2231,10 +2229,18 @@ public class TreeReaderFactory {
     private void readDictionaryStream(InStream in) throws IOException {
       if (in != null) { // Guard against empty dictionary stream.
         if (in.available() > 0) {
-          dictionaryBuffer = new DynamicByteArray(64, in.available());
-          dictionaryBuffer.readAll(in);
-          // Since its start of strip invalidate the cache.
-          dictionaryBufferInBytesCache = null;
+          // remove reference to previous dictionary buffer
+          dictionaryBuffer = null;
+          int dictionaryBufferSize = dictionaryOffsets[dictionaryOffsets.length - 1];
+          dictionaryBuffer = new byte[dictionaryBufferSize];
+          int pos = 0;
+          int chunkSize = in.available();
+          byte[] chunkBytes = new byte[chunkSize];
+          while (pos < dictionaryBufferSize) {
+            int currentLength = in.read(chunkBytes, 0, chunkSize);
+            System.arraycopy(chunkBytes, 0, dictionaryBuffer, pos, currentLength);
+            pos += currentLength;
+          }
         }
         in.close();
       } else {
@@ -2261,6 +2267,23 @@ public class TreeReaderFactory {
                            ReadPhase readPhase) throws IOException {
       final BytesColumnVector result = (BytesColumnVector) previousVector;
 
+      // remove reference to previous dictionary buffer
+      for (int i = 0; i < batchSize; i++) {
+        result.vector[i] = null;
+      }
+
+      // lazy read dictionary buffer,
+      // ensure there is at most one dictionary buffer in memory when reading cross different file stripes
+      if (!initDictionary) {
+        if (lengthStream != null && lengthEncoding != null) {
+          readDictionaryLengthStream(lengthStream, lengthEncoding);
+        }
+        if (dictionaryStream != null) {
+          readDictionaryStream(dictionaryStream);
+        }
+        initDictionary = true;
+      }
+
       // Read present/isNull stream
       super.nextVector(result, isNull, batchSize, filterContext, readPhase);
       readDictionaryByteArray(result, filterContext, batchSize);
@@ -2274,11 +2297,6 @@ public class TreeReaderFactory {
 
       if (dictionaryBuffer != null) {
 
-        // Load dictionaryBuffer into cache.
-        if (dictionaryBufferInBytesCache == null) {
-          dictionaryBufferInBytesCache = dictionaryBuffer.get();
-        }
-
         // Read string offsets
         scratchlcv.isRepeating = result.isRepeating;
         scratchlcv.noNulls = result.noNulls;
@@ -2291,7 +2309,7 @@ public class TreeReaderFactory {
           if (filterContext.isSelectedInUse()) {
             // Set all string values to null - offset and length is zero
             for (int i = 0; i < batchSize; i++) {
-              result.setRef(i, dictionaryBufferInBytesCache, 0, 0);
+              result.setRef(i, dictionaryBuffer, 0, 0);
             }
             // Read selected rows from stream
             for (int i = 0; i != filterContext.getSelectedSize(); i++) {
@@ -2299,7 +2317,7 @@ public class TreeReaderFactory {
               if (!scratchlcv.isNull[idx]) {
                 offset = dictionaryOffsets[(int) scratchlcv.vector[idx]];
                 length = getDictionaryEntryLength((int) scratchlcv.vector[idx], offset);
-                result.setRef(idx, dictionaryBufferInBytesCache, offset, length);
+                result.setRef(idx, dictionaryBuffer, offset, length);
               }
             }
           } else {
@@ -2307,10 +2325,10 @@ public class TreeReaderFactory {
               if (!scratchlcv.isNull[i]) {
                 offset = dictionaryOffsets[(int) scratchlcv.vector[i]];
                 length = getDictionaryEntryLength((int) scratchlcv.vector[i], offset);
-                result.setRef(i, dictionaryBufferInBytesCache, offset, length);
+                result.setRef(i, dictionaryBuffer, offset, length);
               } else {
                 // If the value is null then set offset and length to zero (null string)
-                result.setRef(i, dictionaryBufferInBytesCache, 0, 0);
+                result.setRef(i, dictionaryBuffer, 0, 0);
               }
             }
           }
@@ -2320,7 +2338,7 @@ public class TreeReaderFactory {
           // set all the elements to the same value
           offset = dictionaryOffsets[(int) scratchlcv.vector[0]];
           length = getDictionaryEntryLength((int) scratchlcv.vector[0], offset);
-          result.setRef(0, dictionaryBufferInBytesCache, offset, length);
+          result.setRef(0, dictionaryBuffer, offset, length);
         }
         result.isRepeating = scratchlcv.isRepeating;
       } else {
@@ -2348,7 +2366,7 @@ public class TreeReaderFactory {
       if (entry < dictionaryOffsets.length - 1) {
         length = dictionaryOffsets[entry + 1] - offset;
       } else {
-        length = dictionaryBuffer.size() - offset;
+        length = dictionaryBuffer.length - offset;
       }
       return length;
     }