You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@orc.apache.org by xn...@apache.org on 2019/03/08 17:10:49 UTC

[orc] branch master updated: ORC-473: [C++] Fix DecomressionStream::seek to read from right position (#370)

This is an automated email from the ASF dual-hosted git repository.

xndai pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/orc.git


The following commit(s) were added to refs/heads/master by this push:
     new 728b1d1  ORC-473: [C++] Fix DecomressionStream::seek to read from right position (#370)
728b1d1 is described below

commit 728b1d19c7fa0f09e460aea37092f76cbdefd140
Author: Gang Wu <us...@gmail.com>
AuthorDate: Fri Mar 8 09:10:38 2019 -0800

    ORC-473: [C++] Fix DecomressionStream::seek to read from right position (#370)
    
    If DecomressionStream has called Next() to read some data and it has
    intermediate state, calling seek() can cause it to read from a wrong
    position. This patch clears all states and forces it to read from right
    position.
    
    Fix #370
---
 c++/src/Compression.cc      | 16 +++++++++++
 c++/test/TestCompression.cc | 67 +++++++++++++++++++++++++++++++++++++++++++++
 2 files changed, 83 insertions(+)

diff --git a/c++/src/Compression.cc b/c++/src/Compression.cc
index 44bd22d..362a641 100644
--- a/c++/src/Compression.cc
+++ b/c++/src/Compression.cc
@@ -565,6 +565,14 @@ DIAGNOSTIC_POP
   }
 
   void ZlibDecompressionStream::seek(PositionProvider& position) {
+    // clear state to force seek to read from the right position
+    state = DECOMPRESS_HEADER;
+    outputBuffer = nullptr;
+    outputBufferLength = 0;
+    remainingLength = 0;
+    inputBuffer = nullptr;
+    inputBufferEnd = nullptr;
+
     input->seek(position);
     bytesReturned = static_cast<off_t>(input->ByteCount());
     if (!Skip(static_cast<int>(position.next()))) {
@@ -794,6 +802,14 @@ DIAGNOSTIC_POP
   }
 
   void BlockDecompressionStream::seek(PositionProvider& position) {
+    // clear state to force seek to read from the right position
+    state = DECOMPRESS_HEADER;
+    outputBufferPtr = nullptr;
+    outputBufferLength = 0;
+    remainingLength = 0;
+    inputBufferPtr = nullptr;
+    inputBufferPtrEnd = nullptr;
+
     input->seek(position);
     if (!Skip(static_cast<int>(position.next()))) {
       throw ParseError("Bad skip in " + getName());
diff --git a/c++/test/TestCompression.cc b/c++/test/TestCompression.cc
index 9ca2160..2e1cba9 100644
--- a/c++/test/TestCompression.cc
+++ b/c++/test/TestCompression.cc
@@ -16,6 +16,7 @@
  * limitations under the License.
  */
 
+#include "ColumnWriter.hh"
 #include "Compression.hh"
 #include "MemoryOutputStream.hh"
 #include "RLEv1.hh"
@@ -336,4 +337,70 @@ namespace orc {
   TEST(Compression, zstd_protobuff_compression) {
     protobuff_compression(CompressionKind_ZSTD, proto::ZSTD);
   }
+
+  void testSeekDecompressionStream(CompressionKind kind) {
+    MemoryOutputStream memStream(DEFAULT_MEM_STREAM_SIZE);
+    MemoryPool * pool = getDefaultPool();
+    CompressionStrategy strategy = CompressionStrategy_COMPRESSION;
+    uint64_t batchSize = 1024, blockSize = 256;
+
+    AppendOnlyBufferedStream outStream(createCompressor(
+      kind, &memStream, strategy, DEFAULT_MEM_STREAM_SIZE, blockSize, *pool));
+
+    // write 3 batches of data and record positions between every batch
+    size_t row = 0;
+    proto::RowIndexEntry rowIndexEntry1, rowIndexEntry2;
+    RowIndexPositionRecorder recorder1(rowIndexEntry1), recorder2(rowIndexEntry2);
+    for (size_t repeat = 0; repeat != 3; ++repeat) {
+      for (size_t i = 0; i != batchSize; ++i) {
+        std::string data = std::to_string(row++);
+        outStream.write(data.c_str(), data.size());
+      }
+      if (repeat == 0) {
+        outStream.recordPosition(&recorder1);
+      } else if (repeat == 1) {
+        outStream.recordPosition(&recorder2);
+      }
+    }
+    outStream.flush();
+
+    // try to decompress them
+    std::unique_ptr<SeekableInputStream> inputStream(
+      new SeekableArrayInputStream(memStream.getData(), memStream.getLength()));
+    std::unique_ptr<SeekableInputStream> decompressStream =
+      createDecompressor(kind,
+                         std::move(inputStream),
+                         blockSize,
+                         *pool);
+
+    // prepare positions to seek to
+    EXPECT_EQ(rowIndexEntry1.positions_size(), rowIndexEntry2.positions_size());
+    std::list<uint64_t> pos1, pos2;
+    for (int i = 0; i < rowIndexEntry1.positions_size(); ++i) {
+      pos1.push_back(rowIndexEntry1.positions(i));
+      pos2.push_back(rowIndexEntry2.positions(i));
+    }
+    PositionProvider provider1(pos1), provider2(pos2);
+    const void* data;
+    int size;
+
+    // seek to positions between first two batches
+    decompressStream->seek(provider1);
+    decompressStream->Next(&data, &size);
+    std::string data1(static_cast<const char*>(data), 4);
+    std::string expected1 = "1024";
+    EXPECT_EQ(expected1, data1);
+
+    // seek to positions between last two batches
+    decompressStream->seek(provider2);
+    decompressStream->Next(&data, &size);
+    std::string data2(static_cast<const char*>(data), 4);
+    std::string expected2 = "2048";
+    EXPECT_EQ(expected2, data2);
+  }
+
+  TEST(Compression, seekDecompressionStream) {
+    testSeekDecompressionStream(CompressionKind_ZSTD);
+    testSeekDecompressionStream(CompressionKind_ZLIB);
+  }
 }