You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@orc.apache.org by ga...@apache.org on 2022/04/21 16:11:49 UTC

[orc] branch main updated: ORC-1150: [C++] Optimize RowReaderImpl::computeBatchSize() by pre-computation

This is an automated email from the ASF dual-hosted git repository.

gangwu pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/orc.git


The following commit(s) were added to refs/heads/main by this push:
     new 7f07491fe ORC-1150: [C++] Optimize RowReaderImpl::computeBatchSize() by pre-computation
7f07491fe is described below

commit 7f07491feb3f30cc63c47e672116c75465e05575
Author: Quanlong Huang <hu...@gmail.com>
AuthorDate: Fri Apr 22 00:11:44 2022 +0800

    ORC-1150: [C++] Optimize RowReaderImpl::computeBatchSize() by pre-computation
    
    This closes #1087
---
 c++/src/Reader.cc             | 59 +++++++++++++++++++++++--------------------
 c++/src/Reader.hh             |  4 +--
 c++/src/sargs/SargsApplier.cc | 28 +++++++++++++-------
 c++/src/sargs/SargsApplier.hh | 17 ++++++++-----
 c++/test/TestReader.cc        | 46 ++++++++++++++++-----------------
 c++/test/TestSargsApplier.cc  | 12 ++++-----
 tools/test/TestFileScan.cc    |  2 +-
 7 files changed, 93 insertions(+), 75 deletions(-)

diff --git a/c++/src/Reader.cc b/c++/src/Reader.cc
index b79f2132c..580326726 100644
--- a/c++/src/Reader.cc
+++ b/c++/src/Reader.cc
@@ -1110,7 +1110,7 @@ namespace orc {
         currentRowInStripe = advanceToNextRowGroup(currentRowInStripe,
                                                    rowsInCurrentStripe,
                                                    footer->rowindexstride(),
-                                                   sargsApplier->getRowGroups());
+                                                   sargsApplier->getNextSkippedRows());
         previousRow = firstRowOfStripe[currentStripe] + currentRowInStripe - 1;
         if (currentRowInStripe > 0) {
           seekToRowGroup(static_cast<uint32_t>(currentRowInStripe / footer->rowindexstride()));
@@ -1141,7 +1141,7 @@ namespace orc {
                                     currentRowInStripe,
                                     rowsInCurrentStripe,
                                     footer->rowindexstride(),
-                                    sargsApplier->getRowGroups());
+                                    sargsApplier->getNextSkippedRows());
     }
     data.numElements = rowsToRead;
     if (rowsToRead == 0) {
@@ -1165,7 +1165,7 @@ namespace orc {
       uint64_t nextRowToRead = advanceToNextRowGroup(currentRowInStripe,
                                                      rowsInCurrentStripe,
                                                      footer->rowindexstride(),
-                                                     sargsApplier->getRowGroups());
+                                                     sargsApplier->getNextSkippedRows());
       if (currentRowInStripe != nextRowToRead) {
         // it is guaranteed to be at start of a row group
         currentRowInStripe = nextRowToRead;
@@ -1186,21 +1186,18 @@ namespace orc {
                                            uint64_t currentRowInStripe,
                                            uint64_t rowsInCurrentStripe,
                                            uint64_t rowIndexStride,
-                                           const std::vector<bool>& includedRowGroups) {
+                                           const std::vector<uint64_t>& nextSkippedRows) {
     // In case of PPD, batch size should be aware of row group boundaries. If only a subset of row
     // groups are selected then marker position is set to the end of range (subset of row groups
     // within stripe).
     uint64_t endRowInStripe = rowsInCurrentStripe;
-    if (!includedRowGroups.empty()) {
-      endRowInStripe = currentRowInStripe;
-      uint32_t rg = static_cast<uint32_t>(currentRowInStripe / rowIndexStride);
-      for (; rg < includedRowGroups.size(); ++rg) {
-        if (!includedRowGroups[rg]) {
-          break;
-        } else {
-          endRowInStripe = std::min(rowsInCurrentStripe, (rg + 1) * rowIndexStride);
-        }
-      }
+    uint64_t groupsInStripe = nextSkippedRows.size();
+    if (groupsInStripe > 0) {
+      auto rg = static_cast<uint32_t>(currentRowInStripe / rowIndexStride);
+      if (rg >= groupsInStripe) return 0;
+      uint64_t nextSkippedRow = nextSkippedRows[rg];
+      if (nextSkippedRow == 0) return 0;
+      endRowInStripe = nextSkippedRow;
     }
     return std::min(requestedSize, endRowInStripe - currentRowInStripe);
   }
@@ -1208,19 +1205,27 @@ namespace orc {
   uint64_t RowReaderImpl::advanceToNextRowGroup(uint64_t currentRowInStripe,
                                                 uint64_t rowsInCurrentStripe,
                                                 uint64_t rowIndexStride,
-                                                const std::vector<bool>& includedRowGroups) {
-    if (!includedRowGroups.empty()) {
-      uint32_t rg = static_cast<uint32_t>(currentRowInStripe / rowIndexStride);
-      for (; rg < includedRowGroups.size(); ++rg) {
-        if (includedRowGroups[rg]) {
-          return currentRowInStripe;
-        } else {
-          // advance to start of next row group
-          currentRowInStripe = (rg + 1) * rowIndexStride;
-        }
-      }
-    }
-    return std::min(currentRowInStripe, rowsInCurrentStripe);
+                                                const std::vector<uint64_t>& nextSkippedRows) {
+    auto groupsInStripe = nextSkippedRows.size();
+    if (groupsInStripe == 0) {
+      // No PPD, keeps using the current row in stripe
+      return std::min(currentRowInStripe, rowsInCurrentStripe);
+    }
+    auto rg = static_cast<uint32_t>(currentRowInStripe / rowIndexStride);
+    if (rg >= groupsInStripe) {
+      // Points to the end of the stripe
+      return rowsInCurrentStripe;
+    }
+    if (nextSkippedRows[rg] != 0) {
+      // Current row group is selected
+      return currentRowInStripe;
+    }
+    // Advance to the next selected row group
+    while (rg < groupsInStripe && nextSkippedRows[rg] == 0) ++rg;
+    if (rg < groupsInStripe) {
+      return rg * rowIndexStride;
+    }
+    return rowsInCurrentStripe;
   }
 
   std::unique_ptr<ColumnVectorBatch> RowReaderImpl::createRowBatch
diff --git a/c++/src/Reader.hh b/c++/src/Reader.hh
index 0facafc44..255bb8d25 100644
--- a/c++/src/Reader.hh
+++ b/c++/src/Reader.hh
@@ -175,13 +175,13 @@ namespace orc {
                                      uint64_t currentRowInStripe,
                                      uint64_t rowsInCurrentStripe,
                                      uint64_t rowIndexStride,
-                                     const std::vector<bool>& includedRowGroups);
+                                     const std::vector<uint64_t>& nextSkippedRows);
 
     // Skip non-selected rows
     static uint64_t advanceToNextRowGroup(uint64_t currentRowInStripe,
                                           uint64_t rowsInCurrentStripe,
                                           uint64_t rowIndexStride,
-                                          const std::vector<bool>& includedRowGroups);
+                                          const std::vector<uint64_t>& nextSkippedRows);
 
     friend class TestRowReader_advanceToNextRowGroup_Test;
     friend class TestRowReader_computeBatchSize_Test;
diff --git a/c++/src/sargs/SargsApplier.cc b/c++/src/sargs/SargsApplier.cc
index b4a736753..42a554f5c 100644
--- a/c++/src/sargs/SargsApplier.cc
+++ b/c++/src/sargs/SargsApplier.cc
@@ -71,7 +71,7 @@ namespace orc {
     // init state of each row group
     uint64_t groupsInStripe =
       (rowsInStripe + mRowIndexStride - 1) / mRowIndexStride;
-    mRowGroups.resize(groupsInStripe, true);
+    mNextSkippedRows.resize(groupsInStripe);
     mTotalRowsInStripe = rowsInStripe;
 
     // row indexes do not exist, simply read all rows
@@ -85,7 +85,10 @@ namespace orc {
       leaves.size(), TruthValue::YES_NO_NULL);
     mHasSelected = false;
     mHasSkipped = false;
-    for (size_t rowGroup = 0; rowGroup != groupsInStripe; ++rowGroup) {
+    uint64_t nextSkippedRowGroup = groupsInStripe;
+    size_t rowGroup = groupsInStripe;
+    do {
+      --rowGroup;
       for (size_t pred = 0; pred != leaves.size(); ++pred) {
         uint64_t columnIdx = mFilterColumns[pred];
         auto rowIndexIter = rowIndexes.find(columnIdx);
@@ -110,14 +113,21 @@ namespace orc {
         }
       }
 
-      mRowGroups[rowGroup] = isNeeded(mSearchArgument->evaluate(leafValues));
-      mHasSelected = mHasSelected || mRowGroups[rowGroup];
-      mHasSkipped = mHasSkipped || (!mRowGroups[rowGroup]);
-    }
+      bool needed = isNeeded(mSearchArgument->evaluate(leafValues));
+      if (!needed) {
+        mNextSkippedRows[rowGroup] = 0;
+        nextSkippedRowGroup = rowGroup;
+      } else {
+        mNextSkippedRows[rowGroup] = (nextSkippedRowGroup == groupsInStripe) ?
+                                     rowsInStripe : (nextSkippedRowGroup * mRowIndexStride);
+      }
+      mHasSelected |= needed;
+      mHasSkipped |= !needed;
+    } while (rowGroup != 0);
 
     // update stats
     mStats.first = std::accumulate(
-      mRowGroups.cbegin(), mRowGroups.cend(), mStats.first,
+      mNextSkippedRows.cbegin(), mNextSkippedRows.cend(), mStats.first,
       [](bool rg, uint64_t s) { return rg ? 1 : 0 + s; });
     mStats.second += groupsInStripe;
 
@@ -156,8 +166,8 @@ namespace orc {
 
     bool ret = evaluateColumnStatistics(stripeStats.colstats());
     if (!ret) {
-      // reset mRowGroups when the current stripe does not satisfy the PPD
-      mRowGroups.clear();
+      // reset mNextSkippedRows when the current stripe does not satisfy the PPD
+      mNextSkippedRows.clear();
     }
     return ret;
   }
diff --git a/c++/src/sargs/SargsApplier.hh b/c++/src/sargs/SargsApplier.hh
index 1842828d5..d8bdf852d 100644
--- a/c++/src/sargs/SargsApplier.hh
+++ b/c++/src/sargs/SargsApplier.hh
@@ -60,10 +60,11 @@ namespace orc {
                       const std::map<uint32_t, BloomFilterIndex>& bloomFilters);
 
     /**
-     * Return a vector of bool for each row group for their selection
-     * in the last evaluation
+     * Return a vector of the next skipped row for each RowGroup. Each value is the row id
+     * in stripe. 0 means the current RowGroup is entirely skipped.
+     * Only valid after invoking pickRowGroups().
      */
-    const std::vector<bool>& getRowGroups() const { return mRowGroups; }
+    const std::vector<uint64_t>& getNextSkippedRows() const { return mNextSkippedRows; }
 
     /**
      * Indicate whether any row group is selected in the last evaluation
@@ -80,8 +81,8 @@ namespace orc {
      */
     bool hasSelectedFrom(uint64_t currentRowInStripe) const {
       uint64_t rg = currentRowInStripe / mRowIndexStride;
-      for (; rg < mRowGroups.size(); ++rg) {
-        if (mRowGroups[rg]) {
+      for (; rg < mNextSkippedRows.size(); ++rg) {
+        if (mNextSkippedRows[rg]) {
           return true;
         }
       }
@@ -111,8 +112,10 @@ namespace orc {
     // column ids for each predicate leaf in the search argument
     std::vector<uint64_t> mFilterColumns;
 
-    // store results of last call of pickRowGroups
-    std::vector<bool> mRowGroups;
+    // Map from RowGroup index to the next skipped row of the selected range it
+    // locates. If the RowGroup is not selected, set the value to 0.
+    // Calculated in pickRowGroups().
+    std::vector<uint64_t> mNextSkippedRows;
     uint64_t mTotalRowsInStripe;
     bool mHasSelected;
     bool mHasSkipped;
diff --git a/c++/test/TestReader.cc b/c++/test/TestReader.cc
index 4044e0399..237deab05 100644
--- a/c++/test/TestReader.cc
+++ b/c++/test/TestReader.cc
@@ -58,55 +58,55 @@ namespace orc {
   TEST(TestRowReader, computeBatchSize) {
     uint64_t rowIndexStride = 100;
     uint64_t rowsInCurrentStripe = 100 * 8 + 50;
-    std::vector<bool> includedRowGroups =
-      { false, false, true, true, false, false, true, true, false };
+    std::vector<uint64_t> nextSkippedRows =
+      { 0, 0, 400, 400, 0, 0, 800, 800, 0 };
 
     EXPECT_EQ(0, RowReaderImpl::computeBatchSize(
-      1024, 0, rowsInCurrentStripe, rowIndexStride, includedRowGroups));
+      1024, 0, rowsInCurrentStripe, rowIndexStride, nextSkippedRows));
     EXPECT_EQ(0, RowReaderImpl::computeBatchSize(
-      1024, 50, rowsInCurrentStripe, rowIndexStride, includedRowGroups));
+      1024, 50, rowsInCurrentStripe, rowIndexStride, nextSkippedRows));
     EXPECT_EQ(200, RowReaderImpl::computeBatchSize(
-      1024, 200, rowsInCurrentStripe, rowIndexStride, includedRowGroups));
+      1024, 200, rowsInCurrentStripe, rowIndexStride, nextSkippedRows));
     EXPECT_EQ(150, RowReaderImpl::computeBatchSize(
-      1024, 250, rowsInCurrentStripe, rowIndexStride, includedRowGroups));
+      1024, 250, rowsInCurrentStripe, rowIndexStride, nextSkippedRows));
     EXPECT_EQ(0, RowReaderImpl::computeBatchSize(
-      1024, 550, rowsInCurrentStripe, rowIndexStride, includedRowGroups));
+      1024, 550, rowsInCurrentStripe, rowIndexStride, nextSkippedRows));
     EXPECT_EQ(100, RowReaderImpl::computeBatchSize(
-      1024, 700, rowsInCurrentStripe, rowIndexStride, includedRowGroups));
+      1024, 700, rowsInCurrentStripe, rowIndexStride, nextSkippedRows));
     EXPECT_EQ(50, RowReaderImpl::computeBatchSize(
-      50, 700, rowsInCurrentStripe, rowIndexStride, includedRowGroups));
+      50, 700, rowsInCurrentStripe, rowIndexStride, nextSkippedRows));
     EXPECT_EQ(0, RowReaderImpl::computeBatchSize(
-      50, 810, rowsInCurrentStripe, rowIndexStride, includedRowGroups));
+      50, 810, rowsInCurrentStripe, rowIndexStride, nextSkippedRows));
     EXPECT_EQ(0, RowReaderImpl::computeBatchSize(
-      50, 900, rowsInCurrentStripe, rowIndexStride, includedRowGroups));
+      50, 900, rowsInCurrentStripe, rowIndexStride, nextSkippedRows));
   }
 
   TEST(TestRowReader, advanceToNextRowGroup) {
     uint64_t rowIndexStride = 100;
     uint64_t rowsInCurrentStripe = 100 * 8 + 50;
-    std::vector<bool> includedRowGroups =
-      { false, false, true, true, false, false, true, true, false };
+    std::vector<uint64_t> nextSkippedRows =
+      { 0, 0, 400, 400, 0, 0, 800, 800, 0 };
 
     EXPECT_EQ(200, RowReaderImpl::advanceToNextRowGroup(
-      0, rowsInCurrentStripe, rowIndexStride, includedRowGroups));
+      0, rowsInCurrentStripe, rowIndexStride, nextSkippedRows));
     EXPECT_EQ(200, RowReaderImpl::advanceToNextRowGroup(
-      150, rowsInCurrentStripe, rowIndexStride, includedRowGroups));
+      150, rowsInCurrentStripe, rowIndexStride, nextSkippedRows));
     EXPECT_EQ(250, RowReaderImpl::advanceToNextRowGroup(
-      250, rowsInCurrentStripe, rowIndexStride, includedRowGroups));
+      250, rowsInCurrentStripe, rowIndexStride, nextSkippedRows));
     EXPECT_EQ(350, RowReaderImpl::advanceToNextRowGroup(
-      350, rowsInCurrentStripe, rowIndexStride, includedRowGroups));
+      350, rowsInCurrentStripe, rowIndexStride, nextSkippedRows));
     EXPECT_EQ(350, RowReaderImpl::advanceToNextRowGroup(
-      350, rowsInCurrentStripe, rowIndexStride, includedRowGroups));
+      350, rowsInCurrentStripe, rowIndexStride, nextSkippedRows));
     EXPECT_EQ(600, RowReaderImpl::advanceToNextRowGroup(
-      500, rowsInCurrentStripe, rowIndexStride, includedRowGroups));
+      500, rowsInCurrentStripe, rowIndexStride, nextSkippedRows));
     EXPECT_EQ(699, RowReaderImpl::advanceToNextRowGroup(
-      699, rowsInCurrentStripe, rowIndexStride, includedRowGroups));
+      699, rowsInCurrentStripe, rowIndexStride, nextSkippedRows));
     EXPECT_EQ(799, RowReaderImpl::advanceToNextRowGroup(
-      799, rowsInCurrentStripe, rowIndexStride, includedRowGroups));
+      799, rowsInCurrentStripe, rowIndexStride, nextSkippedRows));
     EXPECT_EQ(850, RowReaderImpl::advanceToNextRowGroup(
-      800, rowsInCurrentStripe, rowIndexStride, includedRowGroups));
+      800, rowsInCurrentStripe, rowIndexStride, nextSkippedRows));
     EXPECT_EQ(850, RowReaderImpl::advanceToNextRowGroup(
-      900, rowsInCurrentStripe, rowIndexStride, includedRowGroups));
+      900, rowsInCurrentStripe, rowIndexStride, nextSkippedRows));
   }
 
   void CheckFileWithSargs(const char* fileName, const char* softwareVersion) {
diff --git a/c++/test/TestSargsApplier.cc b/c++/test/TestSargsApplier.cc
index 2ec8c3cb8..74fcae297 100644
--- a/c++/test/TestSargsApplier.cc
+++ b/c++/test/TestSargsApplier.cc
@@ -112,12 +112,12 @@ namespace orc {
     // evaluate row group index
     SargsApplier applier(*type, sarg.get(), 1000, WriterVersion_ORC_135);
     EXPECT_TRUE(applier.pickRowGroups(4000, rowIndexes, {}));
-    std::vector<bool> rowgroups = applier.getRowGroups();
-    EXPECT_EQ(4, rowgroups.size());
-    EXPECT_EQ(false, rowgroups[0]);
-    EXPECT_EQ(false, rowgroups[1]);
-    EXPECT_EQ(false, rowgroups[2]);
-    EXPECT_EQ(true, rowgroups[3]);
+    const auto& nextSkippedRows = applier.getNextSkippedRows();
+    EXPECT_EQ(4, nextSkippedRows.size());
+    EXPECT_EQ(0, nextSkippedRows[0]);
+    EXPECT_EQ(0, nextSkippedRows[1]);
+    EXPECT_EQ(0, nextSkippedRows[2]);
+    EXPECT_EQ(4000, nextSkippedRows[3]);
   }
 
   TEST(TestSargsApplier, testStripeAndFileStats) {
diff --git a/tools/test/TestFileScan.cc b/tools/test/TestFileScan.cc
index 0aaa689d9..c9c1eacc1 100644
--- a/tools/test/TestFileScan.cc
+++ b/tools/test/TestFileScan.cc
@@ -206,7 +206,7 @@ void checkForError(const std::string& filename, const std::string& error_msg) {
   std::string error;
   EXPECT_EQ(1, runProgram({pgm, filename}, output, error));
   EXPECT_EQ("", output);
-  EXPECT_NE(std::string::npos, error.find(error_msg));
+  EXPECT_NE(std::string::npos, error.find(error_msg)) << error;
 }
 
 TEST (TestFileScan, testErrorHandling) {