You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@orc.apache.org by ga...@apache.org on 2022/11/02 09:42:13 UTC
[orc] branch main updated: ORC-1304: [C++] Fix seeking over empty PRESENT stream
This is an automated email from the ASF dual-hosted git repository.
gangwu pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/orc.git
The following commit(s) were added to refs/heads/main by this push:
new 87d23c507 ORC-1304: [C++] Fix seeking over empty PRESENT stream
87d23c507 is described below
commit 87d23c5079d3916bce225350aac881d4db16bd76
Author: coderex2522 <re...@gmail.com>
AuthorDate: Wed Nov 2 17:42:08 2022 +0800
ORC-1304: [C++] Fix seeking over empty PRESENT stream
This closes #1299
---
c++/src/ByteRLE.cc | 21 +++++-----
c++/src/RLEv1.cc | 24 ++++++------
c++/src/RLEv1.hh | 2 +
c++/test/TestByteRle.cc | 12 ++++++
c++/test/TestReader.cc | 98 ++++++++++++++++++++++++++++++++++++++++++++++
c++/test/TestRleDecoder.cc | 14 ++++++-
6 files changed, 150 insertions(+), 21 deletions(-)
diff --git a/c++/src/ByteRLE.cc b/c++/src/ByteRLE.cc
index e3a265dd6..261d07759 100644
--- a/c++/src/ByteRLE.cc
+++ b/c++/src/ByteRLE.cc
@@ -344,6 +344,7 @@ namespace orc {
inline void nextBuffer();
inline signed char readByte();
inline void readHeader();
+ inline void reset();
std::unique_ptr<SeekableInputStream> inputStream;
size_t remainingValues;
@@ -385,11 +386,7 @@ namespace orc {
}
}
- ByteRleDecoderImpl::ByteRleDecoderImpl(
- std::unique_ptr<SeekableInputStream> input,
- ReaderMetrics* _metrics)
- : metrics(_metrics) {
- inputStream = std::move(input);
+ void ByteRleDecoderImpl::reset() {
repeating = false;
remainingValues = 0;
value = 0;
@@ -397,6 +394,14 @@ namespace orc {
bufferEnd = nullptr;
}
+ ByteRleDecoderImpl::ByteRleDecoderImpl(
+ std::unique_ptr<SeekableInputStream> input,
+ ReaderMetrics* _metrics)
+ : metrics(_metrics) {
+ inputStream = std::move(input);
+ reset();
+ }
+
ByteRleDecoderImpl::~ByteRleDecoderImpl() {
// PASS
}
@@ -404,10 +409,8 @@ namespace orc {
void ByteRleDecoderImpl::seek(PositionProvider& location) {
// move the input stream
inputStream->seek(location);
- // force a re-read from the stream
- bufferEnd = bufferStart;
- // read a new header
- readHeader();
+ // reset the decoder status and lazily call readHeader()
+ reset();
// skip ahead the given number of records
ByteRleDecoderImpl::skip(location.next());
}
diff --git a/c++/src/RLEv1.cc b/c++/src/RLEv1.cc
index a913656cf..3089cc40e 100644
--- a/c++/src/RLEv1.cc
+++ b/c++/src/RLEv1.cc
@@ -192,27 +192,29 @@ void RleDecoderV1::readHeader() {
}
}
+void RleDecoderV1::reset() {
+ remainingValues = 0;
+ value = 0;
+ bufferStart = nullptr;
+ bufferEnd = nullptr;
+ delta = 0;
+ repeating = false;
+}
+
RleDecoderV1::RleDecoderV1(std::unique_ptr<SeekableInputStream> input,
bool hasSigned,
ReaderMetrics* _metrics)
: RleDecoder(_metrics),
inputStream(std::move(input)),
- isSigned(hasSigned),
- remainingValues(0),
- value(0),
- bufferStart(nullptr),
- bufferEnd(bufferStart),
- delta(0),
- repeating(false) {
+ isSigned(hasSigned) {
+ reset();
}
void RleDecoderV1::seek(PositionProvider& location) {
// move the input stream
inputStream->seek(location);
- // force a re-read from the stream
- bufferEnd = bufferStart;
- // read a new header
- readHeader();
+ // reset the decoder status and lazily call readHeader()
+ reset();
// skip ahead the given number of records
skip(location.next());
}
diff --git a/c++/src/RLEv1.hh b/c++/src/RLEv1.hh
index 0b1c44b59..5ef431acc 100644
--- a/c++/src/RLEv1.hh
+++ b/c++/src/RLEv1.hh
@@ -77,6 +77,8 @@ private:
inline void skipLongs(uint64_t numValues);
+ inline void reset();
+
const std::unique_ptr<SeekableInputStream> inputStream;
const bool isSigned;
uint64_t remainingValues;
diff --git a/c++/test/TestByteRle.cc b/c++/test/TestByteRle.cc
index d2715f34e..5363d58e3 100644
--- a/c++/test/TestByteRle.cc
+++ b/c++/test/TestByteRle.cc
@@ -823,6 +823,18 @@ TEST(ByteRle, testSeek) {
} while (i != 0);
}
+TEST(ByteRle, seekOverEmptyPresentStream) {
+ const char* buffer = nullptr;
+ std::unique_ptr<ByteRleDecoder> rle =
+ createByteRleDecoder(
+ std::unique_ptr<orc::SeekableInputStream>
+ (new SeekableArrayInputStream(buffer, 0, 1)),
+ getDefaultReaderMetrics());
+ std::list<uint64_t> position(2, 0);
+ PositionProvider location(position);
+ rle->seek(location);
+}
+
TEST(BooleanRle, simpleTest) {
const unsigned char buffer[] = {0x61, 0xf0, 0xfd, 0x55, 0xAA, 0x55};
std::unique_ptr<SeekableInputStream> stream
diff --git a/c++/test/TestReader.cc b/c++/test/TestReader.cc
index 4e04280ab..ed8d36caa 100644
--- a/c++/test/TestReader.cc
+++ b/c++/test/TestReader.cc
@@ -676,4 +676,102 @@ namespace orc {
EXPECT_EQ(1, nestedUnionBatch.offsets.data()[1]);
}
+ TEST(TestReadIntent, testSeekOverEmptyPresentStream) {
+ MemoryOutputStream memStream(DEFAULT_MEM_STREAM_SIZE);
+ MemoryPool* pool = getDefaultPool();
+ uint64_t rowCount = 5000;
+ {
+ auto type = std::unique_ptr<Type>(
+ Type::buildTypeFromString(
+ "struct<col1:struct<col2:int>,col3:struct<col4:int>,"
+ "col5:array<int>,col6:map<int,int>>"));
+ WriterOptions options;
+ options.setStripeSize(1024 * 1024)
+ .setCompressionBlockSize(1024)
+ .setCompression(CompressionKind_NONE)
+ .setMemoryPool(pool)
+ .setRowIndexStride(1000);
+
+ // the child columns of the col3,col5,col6 have the empty present stream
+ auto writer = createWriter(*type, &memStream, options);
+ auto batch = writer->createRowBatch(rowCount);
+ auto& structBatch = dynamic_cast<StructVectorBatch&>(*batch);
+ auto& structBatch1 = dynamic_cast<StructVectorBatch&>(*structBatch.fields[0]);
+ auto& structBatch2 = dynamic_cast<StructVectorBatch&>(*structBatch.fields[1]);
+ auto& listBatch = dynamic_cast<ListVectorBatch&>(*structBatch.fields[2]);
+ auto& mapBatch = dynamic_cast<MapVectorBatch&>(*structBatch.fields[3]);
+
+ auto& longBatch1 = dynamic_cast<LongVectorBatch&>(*structBatch1.fields[0]);
+ auto& longBatch2 = dynamic_cast<LongVectorBatch&>(*structBatch2.fields[0]);
+ auto& longBatch3 = dynamic_cast<LongVectorBatch&>(*listBatch.elements);
+ auto& longKeyBatch = dynamic_cast<LongVectorBatch&>(*mapBatch.keys);
+ auto& longValueBatch = dynamic_cast<LongVectorBatch&>(*mapBatch.elements);
+
+ structBatch.numElements = rowCount;
+ structBatch1.numElements = rowCount;
+ structBatch2.numElements = rowCount;
+ listBatch.numElements = rowCount;
+ mapBatch.numElements = rowCount;
+ longBatch1.numElements = rowCount;
+ longBatch2.numElements = rowCount;
+ longBatch3.numElements = rowCount;
+ longKeyBatch.numElements = rowCount;
+ longValueBatch.numElements = rowCount;
+
+ structBatch1.hasNulls = false;
+ structBatch2.hasNulls = true;
+ listBatch.hasNulls = true;
+ mapBatch.hasNulls = true;
+ longBatch1.hasNulls = false;
+ longBatch2.hasNulls = true;
+ longBatch3.hasNulls = true;
+ longKeyBatch.hasNulls = true;
+ longValueBatch.hasNulls = true;
+ for (uint64_t i = 0; i < rowCount; ++i) {
+ longBatch1.data[i] = static_cast<int64_t>(i);
+ longBatch1.notNull[i] = 1;
+
+ structBatch2.notNull[i] = 0;
+ listBatch.notNull[i] = 0;
+ listBatch.offsets[i] = 0;
+ mapBatch.notNull[i] = 0;
+ longBatch2.notNull[i] = 0;
+ longBatch3.notNull[i] = 0;
+ longKeyBatch.notNull[i] = 0;
+ longValueBatch.notNull[i] = 0;
+ }
+ writer->add(*batch);
+ writer->close();
+ }
+ {
+ std::unique_ptr<InputStream> inStream(
+ new MemoryInputStream(memStream.getData(), memStream.getLength()));
+ ReaderOptions readerOptions;
+ readerOptions.setMemoryPool(*pool);
+ std::unique_ptr<Reader> reader =
+ createReader(std::move(inStream), readerOptions);
+ EXPECT_EQ(rowCount, reader->getNumberOfRows());
+ std::unique_ptr<RowReader> rowReader =
+ reader->createRowReader(RowReaderOptions());
+ auto batch = rowReader->createRowBatch(1000);
+ // seek over the empty present stream
+ rowReader->seekToRow(2000);
+ EXPECT_TRUE(rowReader->next(*batch));
+ EXPECT_EQ(1000, batch->numElements);
+ auto& structBatch = dynamic_cast<StructVectorBatch&>(*batch);
+ auto& structBatch1 = dynamic_cast<StructVectorBatch&>(*structBatch.fields[0]);
+ auto& structBatch2 = dynamic_cast<StructVectorBatch&>(*structBatch.fields[1]);
+ auto& listBatch = dynamic_cast<ListVectorBatch&>(*structBatch.fields[2]);
+ auto& mapBatch = dynamic_cast<MapVectorBatch&>(*structBatch.fields[3]);
+
+ auto& longBatch1 = dynamic_cast<LongVectorBatch&>(*structBatch1.fields[0]);
+ for (uint64_t i = 0; i < 1000; ++i) {
+ EXPECT_EQ(longBatch1.data[i], static_cast<int64_t>(i + 2000));
+ EXPECT_TRUE(longBatch1.notNull[i]);
+ EXPECT_FALSE(structBatch2.notNull[i]);
+ EXPECT_FALSE(listBatch.notNull[i]);
+ EXPECT_FALSE(mapBatch.notNull[i]);
+ }
+ }
+ }
} // namespace
diff --git a/c++/test/TestRleDecoder.cc b/c++/test/TestRleDecoder.cc
index 04fb74700..985dfc17a 100644
--- a/c++/test/TestRleDecoder.cc
+++ b/c++/test/TestRleDecoder.cc
@@ -3002,6 +3002,18 @@ TEST(RLEv1, testLeadingNulls) {
for (size_t i = 5; i < 10; ++i) {
EXPECT_EQ(i - 4, data[i]) << "Output wrong at " << i;
}
-};
+}
+
+TEST(RLEv1, seekOverEmptyPresentStream) {
+ const char* buffer = nullptr;
+ std::unique_ptr<RleDecoder> rle =
+ createRleDecoder(std::unique_ptr<SeekableInputStream>
+ (new SeekableArrayInputStream(buffer, 0, 1)),
+ false, RleVersion_1, *getDefaultPool(),
+ getDefaultReaderMetrics());
+ std::list<uint64_t> position(2, 0);
+ PositionProvider location(position);
+ rle->seek(location);
+}
} // namespace orc