You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@orc.apache.org by xn...@apache.org on 2019/03/08 17:10:49 UTC
[orc] branch master updated: ORC-473: [C++] Fix
DecomressionStream::seek to read from right position (#370)
This is an automated email from the ASF dual-hosted git repository.
xndai pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/orc.git
The following commit(s) were added to refs/heads/master by this push:
new 728b1d1 ORC-473: [C++] Fix DecomressionStream::seek to read from right position (#370)
728b1d1 is described below
commit 728b1d19c7fa0f09e460aea37092f76cbdefd140
Author: Gang Wu <us...@gmail.com>
AuthorDate: Fri Mar 8 09:10:38 2019 -0800
ORC-473: [C++] Fix DecomressionStream::seek to read from right position (#370)
If DecomressionStream has called Next() to read some data and it has
intermediate state, calling seek() can cause it to read from a wrong
position. This patch clears all states and forces it to read from right
position.
Fix #370
---
c++/src/Compression.cc | 16 +++++++++++
c++/test/TestCompression.cc | 67 +++++++++++++++++++++++++++++++++++++++++++++
2 files changed, 83 insertions(+)
diff --git a/c++/src/Compression.cc b/c++/src/Compression.cc
index 44bd22d..362a641 100644
--- a/c++/src/Compression.cc
+++ b/c++/src/Compression.cc
@@ -565,6 +565,14 @@ DIAGNOSTIC_POP
}
void ZlibDecompressionStream::seek(PositionProvider& position) {
+ // clear state to force seek to read from the right position
+ state = DECOMPRESS_HEADER;
+ outputBuffer = nullptr;
+ outputBufferLength = 0;
+ remainingLength = 0;
+ inputBuffer = nullptr;
+ inputBufferEnd = nullptr;
+
input->seek(position);
bytesReturned = static_cast<off_t>(input->ByteCount());
if (!Skip(static_cast<int>(position.next()))) {
@@ -794,6 +802,14 @@ DIAGNOSTIC_POP
}
void BlockDecompressionStream::seek(PositionProvider& position) {
+ // clear state to force seek to read from the right position
+ state = DECOMPRESS_HEADER;
+ outputBufferPtr = nullptr;
+ outputBufferLength = 0;
+ remainingLength = 0;
+ inputBufferPtr = nullptr;
+ inputBufferPtrEnd = nullptr;
+
input->seek(position);
if (!Skip(static_cast<int>(position.next()))) {
throw ParseError("Bad skip in " + getName());
diff --git a/c++/test/TestCompression.cc b/c++/test/TestCompression.cc
index 9ca2160..2e1cba9 100644
--- a/c++/test/TestCompression.cc
+++ b/c++/test/TestCompression.cc
@@ -16,6 +16,7 @@
* limitations under the License.
*/
+#include "ColumnWriter.hh"
#include "Compression.hh"
#include "MemoryOutputStream.hh"
#include "RLEv1.hh"
@@ -336,4 +337,70 @@ namespace orc {
TEST(Compression, zstd_protobuff_compression) {
protobuff_compression(CompressionKind_ZSTD, proto::ZSTD);
}
+
+ void testSeekDecompressionStream(CompressionKind kind) {
+ MemoryOutputStream memStream(DEFAULT_MEM_STREAM_SIZE);
+ MemoryPool * pool = getDefaultPool();
+ CompressionStrategy strategy = CompressionStrategy_COMPRESSION;
+ uint64_t batchSize = 1024, blockSize = 256;
+
+ AppendOnlyBufferedStream outStream(createCompressor(
+ kind, &memStream, strategy, DEFAULT_MEM_STREAM_SIZE, blockSize, *pool));
+
+ // write 3 batches of data and record positions between every batch
+ size_t row = 0;
+ proto::RowIndexEntry rowIndexEntry1, rowIndexEntry2;
+ RowIndexPositionRecorder recorder1(rowIndexEntry1), recorder2(rowIndexEntry2);
+ for (size_t repeat = 0; repeat != 3; ++repeat) {
+ for (size_t i = 0; i != batchSize; ++i) {
+ std::string data = std::to_string(row++);
+ outStream.write(data.c_str(), data.size());
+ }
+ if (repeat == 0) {
+ outStream.recordPosition(&recorder1);
+ } else if (repeat == 1) {
+ outStream.recordPosition(&recorder2);
+ }
+ }
+ outStream.flush();
+
+ // try to decompress them
+ std::unique_ptr<SeekableInputStream> inputStream(
+ new SeekableArrayInputStream(memStream.getData(), memStream.getLength()));
+ std::unique_ptr<SeekableInputStream> decompressStream =
+ createDecompressor(kind,
+ std::move(inputStream),
+ blockSize,
+ *pool);
+
+ // prepare positions to seek to
+ EXPECT_EQ(rowIndexEntry1.positions_size(), rowIndexEntry2.positions_size());
+ std::list<uint64_t> pos1, pos2;
+ for (int i = 0; i < rowIndexEntry1.positions_size(); ++i) {
+ pos1.push_back(rowIndexEntry1.positions(i));
+ pos2.push_back(rowIndexEntry2.positions(i));
+ }
+ PositionProvider provider1(pos1), provider2(pos2);
+ const void* data;
+ int size;
+
+ // seek to positions between first two batches
+ decompressStream->seek(provider1);
+ decompressStream->Next(&data, &size);
+ std::string data1(static_cast<const char*>(data), 4);
+ std::string expected1 = "1024";
+ EXPECT_EQ(expected1, data1);
+
+ // seek to positions between last two batches
+ decompressStream->seek(provider2);
+ decompressStream->Next(&data, &size);
+ std::string data2(static_cast<const char*>(data), 4);
+ std::string expected2 = "2048";
+ EXPECT_EQ(expected2, data2);
+ }
+
+ TEST(Compression, seekDecompressionStream) {
+ testSeekDecompressionStream(CompressionKind_ZSTD);
+ testSeekDecompressionStream(CompressionKind_ZLIB);
+ }
}