You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@orc.apache.org by ga...@apache.org on 2022/11/02 09:42:13 UTC

[orc] branch main updated: ORC-1304: [C++] Fix seeking over empty PRESENT stream

This is an automated email from the ASF dual-hosted git repository.

gangwu pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/orc.git


The following commit(s) were added to refs/heads/main by this push:
     new 87d23c507 ORC-1304: [C++] Fix seeking over empty PRESENT stream
87d23c507 is described below

commit 87d23c5079d3916bce225350aac881d4db16bd76
Author: coderex2522 <re...@gmail.com>
AuthorDate: Wed Nov 2 17:42:08 2022 +0800

    ORC-1304: [C++] Fix seeking over empty PRESENT stream
    
    This closes #1299
---
 c++/src/ByteRLE.cc         | 21 +++++-----
 c++/src/RLEv1.cc           | 24 ++++++------
 c++/src/RLEv1.hh           |  2 +
 c++/test/TestByteRle.cc    | 12 ++++++
 c++/test/TestReader.cc     | 98 ++++++++++++++++++++++++++++++++++++++++++++++
 c++/test/TestRleDecoder.cc | 14 ++++++-
 6 files changed, 150 insertions(+), 21 deletions(-)

diff --git a/c++/src/ByteRLE.cc b/c++/src/ByteRLE.cc
index e3a265dd6..261d07759 100644
--- a/c++/src/ByteRLE.cc
+++ b/c++/src/ByteRLE.cc
@@ -344,6 +344,7 @@ namespace orc {
     inline void nextBuffer();
     inline signed char readByte();
     inline void readHeader();
+    inline void reset();
 
     std::unique_ptr<SeekableInputStream> inputStream;
     size_t remainingValues;
@@ -385,11 +386,7 @@ namespace orc {
     }
   }
 
-  ByteRleDecoderImpl::ByteRleDecoderImpl(
-                        std::unique_ptr<SeekableInputStream> input,
-                        ReaderMetrics* _metrics)
-                        : metrics(_metrics) {
-    inputStream = std::move(input);
+  void ByteRleDecoderImpl::reset() {
     repeating = false;
     remainingValues = 0;
     value = 0;
@@ -397,6 +394,14 @@ namespace orc {
     bufferEnd = nullptr;
   }
 
+  ByteRleDecoderImpl::ByteRleDecoderImpl(
+                        std::unique_ptr<SeekableInputStream> input,
+                        ReaderMetrics* _metrics)
+                        : metrics(_metrics) {
+    inputStream = std::move(input);
+    reset();
+  }
+
   ByteRleDecoderImpl::~ByteRleDecoderImpl() {
     // PASS
   }
@@ -404,10 +409,8 @@ namespace orc {
   void ByteRleDecoderImpl::seek(PositionProvider& location) {
     // move the input stream
     inputStream->seek(location);
-    // force a re-read from the stream
-    bufferEnd = bufferStart;
-    // read a new header
-    readHeader();
+    // reset the decoder status and lazily call readHeader()
+    reset();
     // skip ahead the given number of records
     ByteRleDecoderImpl::skip(location.next());
   }
diff --git a/c++/src/RLEv1.cc b/c++/src/RLEv1.cc
index a913656cf..3089cc40e 100644
--- a/c++/src/RLEv1.cc
+++ b/c++/src/RLEv1.cc
@@ -192,27 +192,29 @@ void RleDecoderV1::readHeader() {
   }
 }
 
+void RleDecoderV1::reset() {
+  remainingValues = 0;
+  value = 0;
+  bufferStart = nullptr;
+  bufferEnd = nullptr;
+  delta = 0;
+  repeating = false;
+}
+
 RleDecoderV1::RleDecoderV1(std::unique_ptr<SeekableInputStream> input,
                            bool hasSigned,
                            ReaderMetrics* _metrics)
     : RleDecoder(_metrics),
       inputStream(std::move(input)),
-      isSigned(hasSigned),
-      remainingValues(0),
-      value(0),
-      bufferStart(nullptr),
-      bufferEnd(bufferStart),
-      delta(0),
-      repeating(false) {
+      isSigned(hasSigned) {
+  reset();
 }
 
 void RleDecoderV1::seek(PositionProvider& location) {
   // move the input stream
   inputStream->seek(location);
-  // force a re-read from the stream
-  bufferEnd = bufferStart;
-  // read a new header
-  readHeader();
+  // reset the decoder status and lazily call readHeader()
+  reset();
   // skip ahead the given number of records
   skip(location.next());
 }
diff --git a/c++/src/RLEv1.hh b/c++/src/RLEv1.hh
index 0b1c44b59..5ef431acc 100644
--- a/c++/src/RLEv1.hh
+++ b/c++/src/RLEv1.hh
@@ -77,6 +77,8 @@ private:
 
     inline void skipLongs(uint64_t numValues);
 
+    inline void reset();
+
     const std::unique_ptr<SeekableInputStream> inputStream;
     const bool isSigned;
     uint64_t remainingValues;
diff --git a/c++/test/TestByteRle.cc b/c++/test/TestByteRle.cc
index d2715f34e..5363d58e3 100644
--- a/c++/test/TestByteRle.cc
+++ b/c++/test/TestByteRle.cc
@@ -823,6 +823,18 @@ TEST(ByteRle, testSeek) {
   } while (i != 0);
 }
 
+TEST(ByteRle, seekOverEmptyPresentStream) {
+  const char* buffer = nullptr;
+  std::unique_ptr<ByteRleDecoder> rle =
+      createByteRleDecoder(
+        std::unique_ptr<orc::SeekableInputStream>
+	      (new SeekableArrayInputStream(buffer, 0, 1)),
+        getDefaultReaderMetrics());
+  std::list<uint64_t> position(2, 0);
+  PositionProvider location(position);
+  rle->seek(location);
+}
+
 TEST(BooleanRle, simpleTest) {
   const unsigned char buffer[] = {0x61, 0xf0, 0xfd, 0x55, 0xAA, 0x55};
   std::unique_ptr<SeekableInputStream> stream
diff --git a/c++/test/TestReader.cc b/c++/test/TestReader.cc
index 4e04280ab..ed8d36caa 100644
--- a/c++/test/TestReader.cc
+++ b/c++/test/TestReader.cc
@@ -676,4 +676,102 @@ namespace orc {
     EXPECT_EQ(1, nestedUnionBatch.offsets.data()[1]);
   }
 
+  TEST(TestReadIntent, testSeekOverEmptyPresentStream) {
+    MemoryOutputStream memStream(DEFAULT_MEM_STREAM_SIZE);
+    MemoryPool* pool = getDefaultPool();
+    uint64_t rowCount = 5000;
+    {
+      auto type = std::unique_ptr<Type>(
+        Type::buildTypeFromString(
+          "struct<col1:struct<col2:int>,col3:struct<col4:int>,"
+          "col5:array<int>,col6:map<int,int>>"));
+      WriterOptions options;
+      options.setStripeSize(1024 * 1024)
+          .setCompressionBlockSize(1024)
+          .setCompression(CompressionKind_NONE)
+          .setMemoryPool(pool)
+          .setRowIndexStride(1000);
+
+      // the child columns of the col3,col5,col6 have the empty present stream
+      auto writer = createWriter(*type, &memStream, options);
+      auto batch = writer->createRowBatch(rowCount);
+      auto& structBatch = dynamic_cast<StructVectorBatch&>(*batch);
+      auto& structBatch1 = dynamic_cast<StructVectorBatch&>(*structBatch.fields[0]);
+      auto& structBatch2 = dynamic_cast<StructVectorBatch&>(*structBatch.fields[1]);
+      auto& listBatch = dynamic_cast<ListVectorBatch&>(*structBatch.fields[2]);
+      auto& mapBatch = dynamic_cast<MapVectorBatch&>(*structBatch.fields[3]);
+
+      auto& longBatch1 = dynamic_cast<LongVectorBatch&>(*structBatch1.fields[0]);
+      auto& longBatch2 = dynamic_cast<LongVectorBatch&>(*structBatch2.fields[0]);
+      auto& longBatch3 = dynamic_cast<LongVectorBatch&>(*listBatch.elements);
+      auto& longKeyBatch = dynamic_cast<LongVectorBatch&>(*mapBatch.keys);
+      auto& longValueBatch = dynamic_cast<LongVectorBatch&>(*mapBatch.elements);
+
+      structBatch.numElements = rowCount;
+      structBatch1.numElements = rowCount;
+      structBatch2.numElements = rowCount;
+      listBatch.numElements = rowCount;
+      mapBatch.numElements = rowCount;
+      longBatch1.numElements = rowCount;
+      longBatch2.numElements = rowCount;
+      longBatch3.numElements = rowCount;
+      longKeyBatch.numElements = rowCount;
+      longValueBatch.numElements = rowCount;
+
+      structBatch1.hasNulls = false;
+      structBatch2.hasNulls = true;
+      listBatch.hasNulls = true;
+      mapBatch.hasNulls = true;
+      longBatch1.hasNulls = false;
+      longBatch2.hasNulls = true;
+      longBatch3.hasNulls = true;
+      longKeyBatch.hasNulls = true;
+      longValueBatch.hasNulls = true;
+      for (uint64_t i = 0; i < rowCount; ++i) {
+        longBatch1.data[i] = static_cast<int64_t>(i);
+        longBatch1.notNull[i] = 1;
+
+        structBatch2.notNull[i] = 0;
+        listBatch.notNull[i] = 0;
+        listBatch.offsets[i] = 0;
+        mapBatch.notNull[i] = 0;
+        longBatch2.notNull[i] = 0;
+        longBatch3.notNull[i] = 0;
+        longKeyBatch.notNull[i] = 0;
+        longValueBatch.notNull[i] = 0;
+      }
+      writer->add(*batch);
+      writer->close();
+    }
+    {
+      std::unique_ptr<InputStream> inStream(
+        new MemoryInputStream(memStream.getData(), memStream.getLength()));
+      ReaderOptions readerOptions;
+      readerOptions.setMemoryPool(*pool);
+      std::unique_ptr<Reader> reader =
+        createReader(std::move(inStream), readerOptions);
+      EXPECT_EQ(rowCount, reader->getNumberOfRows());
+      std::unique_ptr<RowReader> rowReader =
+        reader->createRowReader(RowReaderOptions());
+      auto batch = rowReader->createRowBatch(1000);
+      // seek over the empty present stream
+      rowReader->seekToRow(2000);
+      EXPECT_TRUE(rowReader->next(*batch));
+      EXPECT_EQ(1000, batch->numElements);
+      auto& structBatch = dynamic_cast<StructVectorBatch&>(*batch);
+      auto& structBatch1 = dynamic_cast<StructVectorBatch&>(*structBatch.fields[0]);
+      auto& structBatch2 = dynamic_cast<StructVectorBatch&>(*structBatch.fields[1]);
+      auto& listBatch = dynamic_cast<ListVectorBatch&>(*structBatch.fields[2]);
+      auto& mapBatch = dynamic_cast<MapVectorBatch&>(*structBatch.fields[3]);
+
+      auto& longBatch1 = dynamic_cast<LongVectorBatch&>(*structBatch1.fields[0]);
+      for (uint64_t i = 0; i < 1000; ++i) {
+        EXPECT_EQ(longBatch1.data[i], static_cast<int64_t>(i + 2000));
+        EXPECT_TRUE(longBatch1.notNull[i]);
+        EXPECT_FALSE(structBatch2.notNull[i]);
+        EXPECT_FALSE(listBatch.notNull[i]);
+        EXPECT_FALSE(mapBatch.notNull[i]);
+      }
+    }
+  }
 }  // namespace
diff --git a/c++/test/TestRleDecoder.cc b/c++/test/TestRleDecoder.cc
index 04fb74700..985dfc17a 100644
--- a/c++/test/TestRleDecoder.cc
+++ b/c++/test/TestRleDecoder.cc
@@ -3002,6 +3002,18 @@ TEST(RLEv1, testLeadingNulls) {
   for (size_t i = 5; i < 10; ++i) {
     EXPECT_EQ(i - 4, data[i]) << "Output wrong at " << i;
   }
-};
+}
+
+TEST(RLEv1, seekOverEmptyPresentStream) {
+  const char* buffer = nullptr;
+  std::unique_ptr<RleDecoder> rle =
+      createRleDecoder(std::unique_ptr<SeekableInputStream>
+		       (new SeekableArrayInputStream(buffer, 0, 1)),
+		       false, RleVersion_1, *getDefaultPool(),
+           getDefaultReaderMetrics());
+  std::list<uint64_t> position(2, 0);
+  PositionProvider location(position);
+  rle->seek(location);
+}
 
 }  // namespace orc