You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@orc.apache.org by do...@apache.org on 2022/04/28 20:10:25 UTC

[orc] branch branch-1.7 updated: ORC-1160: [C++] Fix seekToRow can't seek within selected row group (#1102)

This is an automated email from the ASF dual-hosted git repository.

dongjoon pushed a commit to branch branch-1.7
in repository https://gitbox.apache.org/repos/asf/orc.git


The following commit(s) were added to refs/heads/branch-1.7 by this push:
     new 2d4cafaa2 ORC-1160: [C++] Fix seekToRow can't seek within selected row group (#1102)
2d4cafaa2 is described below

commit 2d4cafaa29295eb168428e6773f0934ea229db49
Author: Quanlong Huang <hu...@gmail.com>
AuthorDate: Fri Apr 29 04:10:20 2022 +0800

    ORC-1160: [C++] Fix seekToRow can't seek within selected row group (#1102)
    
    ### What changes were proposed in this pull request?
    
    This cherrypicks #1100 and resolves the conflicts.
    
    ### Why are the changes needed?
    
    This fixes ORC-1160 in the 1.7 branch.
    
    ### How was this patch tested?
    
    New tests are added in #1100.
---
 c++/src/Reader.cc                 |  28 +++----
 c++/test/TestPredicatePushdown.cc | 155 +++++++++++++++++++++++++++++++++++++-
 2 files changed, 166 insertions(+), 17 deletions(-)

diff --git a/c++/src/Reader.cc b/c++/src/Reader.cc
index 6644d1e1c..82001b9f7 100644
--- a/c++/src/Reader.cc
+++ b/c++/src/Reader.cc
@@ -350,26 +350,28 @@ namespace orc {
     previousRow = rowNumber;
     startNextStripe();
 
-    // when predicate push down is enabled, above call to startNextStripe()
-    // will move current row to 1st matching row group; here we only need
-    // to deal with the case when PPD is not enabled.
-    if (!sargsApplier) {
-      uint64_t rowsToSkip = currentRowInStripe;
-
-      if (footer->rowindexstride() > 0 &&
-          currentStripeInfo.indexlength() > 0) {
+    uint64_t rowsToSkip = currentRowInStripe;
+    auto rowIndexStride = footer->rowindexstride();
+    // seek to the target row group if row indexes exists
+    if (rowIndexStride > 0 && currentStripeInfo.indexlength() > 0) {
+      // when predicate push down is enabled, above call to startNextStripe()
+      // will move current row to 1st matching row group; here we only need
+      // to deal with the case when PPD is not enabled.
+      if (!sargsApplier) {
         if (rowIndexes.empty()) {
           loadStripeIndex();
         }
-        uint32_t rowGroupId =
-          static_cast<uint32_t>(currentRowInStripe / footer->rowindexstride());
-        rowsToSkip -= static_cast<uint64_t>(rowGroupId) * footer->rowindexstride();
-
+        auto rowGroupId = static_cast<uint32_t>(rowsToSkip / rowIndexStride);
         if (rowGroupId != 0) {
           seekToRowGroup(rowGroupId);
         }
       }
-
+      // skip leading rows in the target row group
+      rowsToSkip %= rowIndexStride;
+    }
+    // 'reader' is reset in startNextStripe(). It could be nullptr if 'rowsToSkip' is 0,
+    // e.g. when startNextStripe() skips all remaining rows of the file.
+    if (rowsToSkip > 0) {
       reader->skip(rowsToSkip);
     }
   }
diff --git a/c++/test/TestPredicatePushdown.cc b/c++/test/TestPredicatePushdown.cc
index 41d0b532f..31ca0dffe 100644
--- a/c++/test/TestPredicatePushdown.cc
+++ b/c++/test/TestPredicatePushdown.cc
@@ -26,7 +26,7 @@ namespace orc {
 
   static const int DEFAULT_MEM_STREAM_SIZE = 10 * 1024 * 1024; // 10M
 
-  void createMemTestFile(MemoryOutputStream& memStream) {
+  void createMemTestFile(MemoryOutputStream& memStream, uint64_t rowIndexStride) {
     MemoryPool * pool = getDefaultPool();
     auto type = std::unique_ptr<Type>(Type::buildTypeFromString(
       "struct<int1:bigint,string1:string>"));
@@ -35,7 +35,7 @@ namespace orc {
       .setCompressionBlockSize(1024)
       .setCompression(CompressionKind_NONE)
       .setMemoryPool(pool)
-      .setRowIndexStride(1000);
+      .setRowIndexStride(rowIndexStride);
 
     auto writer = createWriter(*type, &memStream, options);
     auto batch = writer->createRowBatch(3500);
@@ -223,10 +223,96 @@ namespace orc {
     }
   }
 
+  void TestSeekWithPredicates(Reader* reader, uint64_t seekRowNumber) {
+    // Build search argument (x < 300000) for column 'int1'. Only the first row group
+    // will be selected. It has 1000 rows: (0, "0"), (300, "10"), (600, "20"), ...,
+    // (299700, "9990").
+    std::unique_ptr<SearchArgument> sarg = SearchArgumentFactory::newBuilder()
+        ->lessThan("int1", PredicateDataType::LONG,
+                   Literal(static_cast<int64_t>(300000)))
+        .build();
+    RowReaderOptions rowReaderOpts;
+    rowReaderOpts.searchArgument(std::move(sarg));
+    auto rowReader = reader->createRowReader(rowReaderOpts);
+    auto readBatch = rowReader->createRowBatch(2000);
+    auto& batch0 = dynamic_cast<StructVectorBatch&>(*readBatch);
+    auto& batch1 = dynamic_cast<LongVectorBatch&>(*batch0.fields[0]);
+    auto& batch2 = dynamic_cast<StringVectorBatch&>(*batch0.fields[1]);
+
+    rowReader->seekToRow(seekRowNumber);
+    if (seekRowNumber >= 1000) {
+      // Seek advance the first row group will go to the end of file
+      EXPECT_FALSE(rowReader->next(*readBatch));
+      EXPECT_EQ(0, readBatch->numElements);
+      EXPECT_EQ(3500, rowReader->getRowNumber());
+      return;
+    }
+    EXPECT_TRUE(rowReader->next(*readBatch));
+    EXPECT_EQ(1000 - seekRowNumber, readBatch->numElements);
+    EXPECT_EQ(seekRowNumber, rowReader->getRowNumber());
+    for (uint64_t i = 0; i < readBatch->numElements; ++i) {
+      EXPECT_EQ(300 * (i + seekRowNumber), batch1.data[i]);
+      EXPECT_EQ(std::to_string(10 * (i + seekRowNumber)),
+                std::string(batch2.data[i], static_cast<size_t>(batch2.length[i])));
+    }
+    EXPECT_FALSE(rowReader->next(*readBatch));
+    EXPECT_EQ(3500, rowReader->getRowNumber());
+  }
+
+  void TestMultipleSeeksWithPredicates(Reader* reader) {
+    // Build search argument (x >= 300000 AND x < 600000) for column 'int1'. Only the 2nd
+    // row group will be selected.
+    std::unique_ptr<SearchArgument> sarg = SearchArgumentFactory::newBuilder()
+        ->startAnd()
+        .startNot()
+        .lessThan("int1", PredicateDataType::LONG,
+                  Literal(static_cast<int64_t>(300000L)))
+        .end()
+        .lessThan("int1", PredicateDataType::LONG,
+                  Literal(static_cast<int64_t>(600000L)))
+        .end()
+        .build();
+    RowReaderOptions rowReaderOpts;
+    rowReaderOpts.searchArgument(std::move(sarg));
+    auto rowReader = reader->createRowReader(rowReaderOpts);
+
+    // Read only one row after each seek
+    auto readBatch = rowReader->createRowBatch(1);
+    auto& batch0 = dynamic_cast<StructVectorBatch&>(*readBatch);
+    auto& batch1 = dynamic_cast<LongVectorBatch&>(*batch0.fields[0]);
+    auto& batch2 = dynamic_cast<StringVectorBatch&>(*batch0.fields[1]);
+
+    // Seek within the 1st row group will go to the start of the 2nd row group
+    rowReader->seekToRow(10);
+    EXPECT_TRUE(rowReader->next(*readBatch));
+    EXPECT_EQ(1000, rowReader->getRowNumber()) << "Should start at the 2nd row group";
+    EXPECT_EQ(1, readBatch->numElements);
+    EXPECT_EQ(300000, batch1.data[0]);
+    EXPECT_EQ("10000", std::string(batch2.data[0], static_cast<size_t>(batch2.length[0])));
+
+    // Seek within the 2nd row group (1000 rows) which is selected by the search argument
+    uint64_t seekRowNum[] = {1001, 1010, 1100, 1500, 1999};
+    for (uint64_t pos : seekRowNum) {
+      rowReader->seekToRow(pos);
+      EXPECT_TRUE(rowReader->next(*readBatch));
+      EXPECT_EQ(pos, rowReader->getRowNumber());
+      EXPECT_EQ(1, readBatch->numElements);
+      EXPECT_EQ(300 * pos, batch1.data[0]);
+      EXPECT_EQ(std::to_string(10 * pos),
+                std::string(batch2.data[0], static_cast<size_t>(batch2.length[0])));
+    }
+
+    // Seek advance the 2nd row group will go to the end of file
+    rowReader->seekToRow(2000);
+    EXPECT_FALSE(rowReader->next(*readBatch));
+    EXPECT_EQ(3500, rowReader->getRowNumber());
+    EXPECT_EQ(0, readBatch->numElements);
+  }
+
   TEST(TestPredicatePushdown, testPredicatePushdown) {
     MemoryOutputStream memStream(DEFAULT_MEM_STREAM_SIZE);
-    MemoryPool * pool = getDefaultPool();
-    createMemTestFile(memStream);
+    MemoryPool* pool = getDefaultPool();
+    createMemTestFile(memStream, 1000);
     std::unique_ptr<InputStream> inStream(new MemoryInputStream (
       memStream.getData(), memStream.getLength()));
     ReaderOptions readerOptions;
@@ -237,5 +323,66 @@ namespace orc {
     TestRangePredicates(reader.get());
     TestNoRowsSelected(reader.get());
     TestOrPredicates(reader.get());
+
+    uint64_t seekRowNumbers[] = {0, 10, 100, 500, 999, 1000, 1001, 4000};
+    for (uint64_t seekRowNumber : seekRowNumbers) {
+      TestSeekWithPredicates(reader.get(), seekRowNumber);
+    }
+
+    TestMultipleSeeksWithPredicates(reader.get());
+  }
+
+  void TestMultipleSeeksWithoutRowIndexes(Reader* reader, bool createSarg) {
+    RowReaderOptions rowReaderOpts;
+    if (createSarg) {
+      // Build search argument x < 300000 for column 'int1'. All rows will be selected
+      // since there are no row indexes in the file.
+      std::unique_ptr<SearchArgument> sarg = SearchArgumentFactory::newBuilder()
+          ->lessThan("int1", PredicateDataType::LONG,
+                     Literal(static_cast<int64_t>(300000L)))
+          .build();
+      rowReaderOpts.searchArgument(std::move(sarg));
+    }
+    auto rowReader = reader->createRowReader(rowReaderOpts);
+
+    // Read only one row after each seek
+    auto readBatch = rowReader->createRowBatch(1);
+    auto& batch0 = dynamic_cast<StructVectorBatch&>(*readBatch);
+    auto& batch1 = dynamic_cast<LongVectorBatch&>(*batch0.fields[0]);
+    auto& batch2 = dynamic_cast<StringVectorBatch&>(*batch0.fields[1]);
+
+    // Seeks within the file
+    uint64_t seekRowNum[] = {0, 1, 100, 999, 1001, 1010, 1100, 1500, 1999, 3000, 3499};
+    for (uint64_t pos : seekRowNum) {
+      rowReader->seekToRow(pos);
+      EXPECT_TRUE(rowReader->next(*readBatch));
+      EXPECT_EQ(pos, rowReader->getRowNumber());
+      EXPECT_EQ(1, readBatch->numElements);
+      EXPECT_EQ(300 * pos, batch1.data[0]);
+      EXPECT_EQ(std::to_string(10 * pos),
+                std::string(batch2.data[0], static_cast<size_t>(batch2.length[0])));
+    }
+
+    // Seek advance the end of file
+    rowReader->seekToRow(4000);
+    EXPECT_FALSE(rowReader->next(*readBatch));
+    EXPECT_EQ(3500, rowReader->getRowNumber());
+    EXPECT_EQ(0, readBatch->numElements);
+  }
+
+  TEST(TestPredicatePushdown, testPredicatePushdownWithoutRowIndexes) {
+    MemoryOutputStream memStream(DEFAULT_MEM_STREAM_SIZE);
+    MemoryPool* pool = getDefaultPool();
+    // Create the file with rowIndexStride=0, so there are no row groups or row indexes.
+    createMemTestFile(memStream, 0);
+    std::unique_ptr<InputStream> inStream(new MemoryInputStream (
+      memStream.getData(), memStream.getLength()));
+    ReaderOptions readerOptions;
+    readerOptions.setMemoryPool(*pool);
+    std::unique_ptr<Reader> reader = createReader(std::move(inStream), readerOptions);
+    EXPECT_EQ(3500, reader->getNumberOfRows());
+
+    TestMultipleSeeksWithoutRowIndexes(reader.get(), true);
+    TestMultipleSeeksWithoutRowIndexes(reader.get(), false);
   }
 }  // namespace orc