You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@orc.apache.org by ga...@apache.org on 2019/05/24 10:02:15 UTC

[orc] branch master updated: ORC-501: [C++]Move blob ownership from StringColumnReader to StringVectorBatch to avoid data overriden

This is an automated email from the ASF dual-hosted git repository.

gangwu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/orc.git


The following commit(s) were added to refs/heads/master by this push:
     new ceef536  ORC-501: [C++]Move blob ownership from StringColumnReader to StringVectorBatch to avoid data overriden
ceef536 is described below

commit ceef5367425862f3d7beb703b8bd19fca890a64a
Author: Yurui Zhou <gi...@yuruiz.com>
AuthorDate: Fri May 24 18:02:10 2019 +0800

    ORC-501: [C++]Move blob ownership from StringColumnReader to StringVectorBatch to avoid data overriden
    
    This fixes #393
---
 c++/include/orc/Vector.hh |  2 ++
 c++/src/ColumnReader.cc   | 56 ++++++++++-------------------------------------
 c++/src/Vector.cc         |  3 ++-
 3 files changed, 15 insertions(+), 46 deletions(-)

diff --git a/c++/include/orc/Vector.hh b/c++/include/orc/Vector.hh
index 5bdf27c..629c0b7 100644
--- a/c++/include/orc/Vector.hh
+++ b/c++/include/orc/Vector.hh
@@ -122,6 +122,8 @@ namespace orc {
     DataBuffer<char*> data;
     // the length of each string
     DataBuffer<int64_t> length;
+    // string blob
+    DataBuffer<char> blob;
   };
 
   struct StringDictionary {
diff --git a/c++/src/ColumnReader.cc b/c++/src/ColumnReader.cc
index 4e36f4d..573ec89 100644
--- a/c++/src/ColumnReader.cc
+++ b/c++/src/ColumnReader.cc
@@ -601,7 +601,6 @@ namespace orc {
 
   class StringDirectColumnReader: public ColumnReader {
   private:
-    DataBuffer<char> blobBuffer;
     std::unique_ptr<RleDecoder> lengthRle;
     std::unique_ptr<SeekableInputStream> blobStream;
     const char *lastBuffer;
@@ -631,8 +630,7 @@ namespace orc {
   StringDirectColumnReader::StringDirectColumnReader
                  (const Type& type,
                   StripeStreams& stripe
-                  ): ColumnReader(type, stripe),
-                     blobBuffer(stripe.getMemoryPool()) {
+                  ): ColumnReader(type, stripe) {
     RleVersion rleVersion = convertRleVersion(stripe.getEncoding(columnId)
                                                 .kind());
     std::unique_ptr<SeekableInputStream> stream =
@@ -722,10 +720,9 @@ namespace orc {
     // Load data from the blob stream into our buffer until we have enough
     // to get the rest directly out of the stream's buffer.
     size_t bytesBuffered = 0;
-    blobBuffer.resize(totalLength);
-    char *ptr= blobBuffer.data();
+    byteBatch.blob.resize(totalLength);
+    char *ptr= byteBatch.blob.data();
     while (bytesBuffered + lastBufferLength < totalLength) {
-      blobBuffer.resize(bytesBuffered + lastBufferLength);
       memcpy(ptr + bytesBuffered, lastBuffer, lastBufferLength);
       bytesBuffered += lastBufferLength;
       const void* readBuffer;
@@ -737,58 +734,27 @@ namespace orc {
       lastBufferLength = static_cast<size_t>(readLength);
     }
 
-    // Set up the start pointers for the ones that will come out of the buffer.
-    size_t filledSlots = 0;
-    size_t usedBytes = 0;
-    ptr = blobBuffer.data();
-    if (notNull) {
-      while (filledSlots < numValues &&
-             (!notNull[filledSlots] ||
-              usedBytes + static_cast<size_t>(lengthPtr[filledSlots]) <=
-              bytesBuffered)) {
-        if (notNull[filledSlots]) {
-          startPtr[filledSlots] = ptr + usedBytes;
-          usedBytes += static_cast<size_t>(lengthPtr[filledSlots]);
-        }
-        filledSlots += 1;
-      }
-    } else {
-      while (filledSlots < numValues &&
-             (usedBytes + static_cast<size_t>(lengthPtr[filledSlots]) <=
-              bytesBuffered)) {
-        startPtr[filledSlots] = ptr + usedBytes;
-        usedBytes += static_cast<size_t>(lengthPtr[filledSlots]);
-        filledSlots += 1;
-      }
-    }
-
-    // do we need to complete the last value in the blob buffer?
-    if (usedBytes < bytesBuffered) {
-      size_t moreBytes = static_cast<size_t>(lengthPtr[filledSlots]) -
-        (bytesBuffered - usedBytes);
-      blobBuffer.resize(bytesBuffered + moreBytes);
-      ptr = blobBuffer.data();
+    if (bytesBuffered < totalLength) {
+      size_t moreBytes = totalLength - bytesBuffered;
       memcpy(ptr + bytesBuffered, lastBuffer, moreBytes);
       lastBuffer += moreBytes;
       lastBufferLength -= moreBytes;
-      startPtr[filledSlots++] = ptr + usedBytes;
     }
 
-    // Finally, set up any remaining entries into the stream buffer
+    size_t filledSlots = 0;
+    ptr = byteBatch.blob.data();
     if (notNull) {
       while (filledSlots < numValues) {
         if (notNull[filledSlots]) {
-          startPtr[filledSlots] = const_cast<char*>(lastBuffer);
-          lastBuffer += lengthPtr[filledSlots];
-          lastBufferLength -= static_cast<size_t>(lengthPtr[filledSlots]);
+          startPtr[filledSlots] = const_cast<char*>(ptr);
+          ptr += lengthPtr[filledSlots];
         }
         filledSlots += 1;
       }
     } else {
       while (filledSlots < numValues) {
-        startPtr[filledSlots] = const_cast<char*>(lastBuffer);
-        lastBuffer += lengthPtr[filledSlots];
-        lastBufferLength -= static_cast<size_t>(lengthPtr[filledSlots]);
+        startPtr[filledSlots] = const_cast<char*>(ptr);
+        ptr += lengthPtr[filledSlots];
         filledSlots += 1;
       }
     }
diff --git a/c++/src/Vector.cc b/c++/src/Vector.cc
index 9043dde..1256c0c 100644
--- a/c++/src/Vector.cc
+++ b/c++/src/Vector.cc
@@ -151,7 +151,8 @@ namespace orc {
   StringVectorBatch::StringVectorBatch(uint64_t capacity, MemoryPool& pool
                ): ColumnVectorBatch(capacity, pool),
                   data(pool, capacity),
-                  length(pool, capacity) {
+                  length(pool, capacity),
+                  blob(pool) {
     // PASS
   }