You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@orc.apache.org by ga...@apache.org on 2019/05/24 10:02:15 UTC
[orc] branch master updated: ORC-501: [C++]Move blob ownership from
StringColumnReader to StringVectorBatch to avoid data overriden
This is an automated email from the ASF dual-hosted git repository.
gangwu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/orc.git
The following commit(s) were added to refs/heads/master by this push:
new ceef536 ORC-501: [C++]Move blob ownership from StringColumnReader to StringVectorBatch to avoid data overriden
ceef536 is described below
commit ceef5367425862f3d7beb703b8bd19fca890a64a
Author: Yurui Zhou <gi...@yuruiz.com>
AuthorDate: Fri May 24 18:02:10 2019 +0800
ORC-501: [C++]Move blob ownership from StringColumnReader to StringVectorBatch to avoid data overriden
This fixes #393
---
c++/include/orc/Vector.hh | 2 ++
c++/src/ColumnReader.cc | 56 ++++++++++-------------------------------------
c++/src/Vector.cc | 3 ++-
3 files changed, 15 insertions(+), 46 deletions(-)
diff --git a/c++/include/orc/Vector.hh b/c++/include/orc/Vector.hh
index 5bdf27c..629c0b7 100644
--- a/c++/include/orc/Vector.hh
+++ b/c++/include/orc/Vector.hh
@@ -122,6 +122,8 @@ namespace orc {
DataBuffer<char*> data;
// the length of each string
DataBuffer<int64_t> length;
+ // string blob
+ DataBuffer<char> blob;
};
struct StringDictionary {
diff --git a/c++/src/ColumnReader.cc b/c++/src/ColumnReader.cc
index 4e36f4d..573ec89 100644
--- a/c++/src/ColumnReader.cc
+++ b/c++/src/ColumnReader.cc
@@ -601,7 +601,6 @@ namespace orc {
class StringDirectColumnReader: public ColumnReader {
private:
- DataBuffer<char> blobBuffer;
std::unique_ptr<RleDecoder> lengthRle;
std::unique_ptr<SeekableInputStream> blobStream;
const char *lastBuffer;
@@ -631,8 +630,7 @@ namespace orc {
StringDirectColumnReader::StringDirectColumnReader
(const Type& type,
StripeStreams& stripe
- ): ColumnReader(type, stripe),
- blobBuffer(stripe.getMemoryPool()) {
+ ): ColumnReader(type, stripe) {
RleVersion rleVersion = convertRleVersion(stripe.getEncoding(columnId)
.kind());
std::unique_ptr<SeekableInputStream> stream =
@@ -722,10 +720,9 @@ namespace orc {
// Load data from the blob stream into our buffer until we have enough
// to get the rest directly out of the stream's buffer.
size_t bytesBuffered = 0;
- blobBuffer.resize(totalLength);
- char *ptr= blobBuffer.data();
+ byteBatch.blob.resize(totalLength);
+ char *ptr= byteBatch.blob.data();
while (bytesBuffered + lastBufferLength < totalLength) {
- blobBuffer.resize(bytesBuffered + lastBufferLength);
memcpy(ptr + bytesBuffered, lastBuffer, lastBufferLength);
bytesBuffered += lastBufferLength;
const void* readBuffer;
@@ -737,58 +734,27 @@ namespace orc {
lastBufferLength = static_cast<size_t>(readLength);
}
- // Set up the start pointers for the ones that will come out of the buffer.
- size_t filledSlots = 0;
- size_t usedBytes = 0;
- ptr = blobBuffer.data();
- if (notNull) {
- while (filledSlots < numValues &&
- (!notNull[filledSlots] ||
- usedBytes + static_cast<size_t>(lengthPtr[filledSlots]) <=
- bytesBuffered)) {
- if (notNull[filledSlots]) {
- startPtr[filledSlots] = ptr + usedBytes;
- usedBytes += static_cast<size_t>(lengthPtr[filledSlots]);
- }
- filledSlots += 1;
- }
- } else {
- while (filledSlots < numValues &&
- (usedBytes + static_cast<size_t>(lengthPtr[filledSlots]) <=
- bytesBuffered)) {
- startPtr[filledSlots] = ptr + usedBytes;
- usedBytes += static_cast<size_t>(lengthPtr[filledSlots]);
- filledSlots += 1;
- }
- }
-
- // do we need to complete the last value in the blob buffer?
- if (usedBytes < bytesBuffered) {
- size_t moreBytes = static_cast<size_t>(lengthPtr[filledSlots]) -
- (bytesBuffered - usedBytes);
- blobBuffer.resize(bytesBuffered + moreBytes);
- ptr = blobBuffer.data();
+ if (bytesBuffered < totalLength) {
+ size_t moreBytes = totalLength - bytesBuffered;
memcpy(ptr + bytesBuffered, lastBuffer, moreBytes);
lastBuffer += moreBytes;
lastBufferLength -= moreBytes;
- startPtr[filledSlots++] = ptr + usedBytes;
}
- // Finally, set up any remaining entries into the stream buffer
+ size_t filledSlots = 0;
+ ptr = byteBatch.blob.data();
if (notNull) {
while (filledSlots < numValues) {
if (notNull[filledSlots]) {
- startPtr[filledSlots] = const_cast<char*>(lastBuffer);
- lastBuffer += lengthPtr[filledSlots];
- lastBufferLength -= static_cast<size_t>(lengthPtr[filledSlots]);
+ startPtr[filledSlots] = const_cast<char*>(ptr);
+ ptr += lengthPtr[filledSlots];
}
filledSlots += 1;
}
} else {
while (filledSlots < numValues) {
- startPtr[filledSlots] = const_cast<char*>(lastBuffer);
- lastBuffer += lengthPtr[filledSlots];
- lastBufferLength -= static_cast<size_t>(lengthPtr[filledSlots]);
+ startPtr[filledSlots] = const_cast<char*>(ptr);
+ ptr += lengthPtr[filledSlots];
filledSlots += 1;
}
}
diff --git a/c++/src/Vector.cc b/c++/src/Vector.cc
index 9043dde..1256c0c 100644
--- a/c++/src/Vector.cc
+++ b/c++/src/Vector.cc
@@ -151,7 +151,8 @@ namespace orc {
StringVectorBatch::StringVectorBatch(uint64_t capacity, MemoryPool& pool
): ColumnVectorBatch(capacity, pool),
data(pool, capacity),
- length(pool, capacity) {
+ length(pool, capacity),
+ blob(pool) {
// PASS
}