You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@orc.apache.org by ga...@apache.org on 2022/04/21 16:11:49 UTC
[orc] branch main updated: ORC-1150: [C++] Optimize RowReaderImpl::computeBatchSize() by pre-computation
This is an automated email from the ASF dual-hosted git repository.
gangwu pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/orc.git
The following commit(s) were added to refs/heads/main by this push:
new 7f07491fe ORC-1150: [C++] Optimize RowReaderImpl::computeBatchSize() by pre-computation
7f07491fe is described below
commit 7f07491feb3f30cc63c47e672116c75465e05575
Author: Quanlong Huang <hu...@gmail.com>
AuthorDate: Fri Apr 22 00:11:44 2022 +0800
ORC-1150: [C++] Optimize RowReaderImpl::computeBatchSize() by pre-computation
This closes #1087
---
c++/src/Reader.cc | 59 +++++++++++++++++++++++--------------------
c++/src/Reader.hh | 4 +--
c++/src/sargs/SargsApplier.cc | 28 +++++++++++++-------
c++/src/sargs/SargsApplier.hh | 17 ++++++++-----
c++/test/TestReader.cc | 46 ++++++++++++++++-----------------
c++/test/TestSargsApplier.cc | 12 ++++-----
tools/test/TestFileScan.cc | 2 +-
7 files changed, 93 insertions(+), 75 deletions(-)
diff --git a/c++/src/Reader.cc b/c++/src/Reader.cc
index b79f2132c..580326726 100644
--- a/c++/src/Reader.cc
+++ b/c++/src/Reader.cc
@@ -1110,7 +1110,7 @@ namespace orc {
currentRowInStripe = advanceToNextRowGroup(currentRowInStripe,
rowsInCurrentStripe,
footer->rowindexstride(),
- sargsApplier->getRowGroups());
+ sargsApplier->getNextSkippedRows());
previousRow = firstRowOfStripe[currentStripe] + currentRowInStripe - 1;
if (currentRowInStripe > 0) {
seekToRowGroup(static_cast<uint32_t>(currentRowInStripe / footer->rowindexstride()));
@@ -1141,7 +1141,7 @@ namespace orc {
currentRowInStripe,
rowsInCurrentStripe,
footer->rowindexstride(),
- sargsApplier->getRowGroups());
+ sargsApplier->getNextSkippedRows());
}
data.numElements = rowsToRead;
if (rowsToRead == 0) {
@@ -1165,7 +1165,7 @@ namespace orc {
uint64_t nextRowToRead = advanceToNextRowGroup(currentRowInStripe,
rowsInCurrentStripe,
footer->rowindexstride(),
- sargsApplier->getRowGroups());
+ sargsApplier->getNextSkippedRows());
if (currentRowInStripe != nextRowToRead) {
// it is guaranteed to be at start of a row group
currentRowInStripe = nextRowToRead;
@@ -1186,21 +1186,18 @@ namespace orc {
uint64_t currentRowInStripe,
uint64_t rowsInCurrentStripe,
uint64_t rowIndexStride,
- const std::vector<bool>& includedRowGroups) {
+ const std::vector<uint64_t>& nextSkippedRows) {
// In case of PPD, batch size should be aware of row group boundaries. If only a subset of row
// groups are selected then marker position is set to the end of range (subset of row groups
// within stripe).
uint64_t endRowInStripe = rowsInCurrentStripe;
- if (!includedRowGroups.empty()) {
- endRowInStripe = currentRowInStripe;
- uint32_t rg = static_cast<uint32_t>(currentRowInStripe / rowIndexStride);
- for (; rg < includedRowGroups.size(); ++rg) {
- if (!includedRowGroups[rg]) {
- break;
- } else {
- endRowInStripe = std::min(rowsInCurrentStripe, (rg + 1) * rowIndexStride);
- }
- }
+ uint64_t groupsInStripe = nextSkippedRows.size();
+ if (groupsInStripe > 0) {
+ auto rg = static_cast<uint32_t>(currentRowInStripe / rowIndexStride);
+ if (rg >= groupsInStripe) return 0;
+ uint64_t nextSkippedRow = nextSkippedRows[rg];
+ if (nextSkippedRow == 0) return 0;
+ endRowInStripe = nextSkippedRow;
}
return std::min(requestedSize, endRowInStripe - currentRowInStripe);
}
@@ -1208,19 +1205,27 @@ namespace orc {
uint64_t RowReaderImpl::advanceToNextRowGroup(uint64_t currentRowInStripe,
uint64_t rowsInCurrentStripe,
uint64_t rowIndexStride,
- const std::vector<bool>& includedRowGroups) {
- if (!includedRowGroups.empty()) {
- uint32_t rg = static_cast<uint32_t>(currentRowInStripe / rowIndexStride);
- for (; rg < includedRowGroups.size(); ++rg) {
- if (includedRowGroups[rg]) {
- return currentRowInStripe;
- } else {
- // advance to start of next row group
- currentRowInStripe = (rg + 1) * rowIndexStride;
- }
- }
- }
- return std::min(currentRowInStripe, rowsInCurrentStripe);
+ const std::vector<uint64_t>& nextSkippedRows) {
+ auto groupsInStripe = nextSkippedRows.size();
+ if (groupsInStripe == 0) {
+ // No PPD, keeps using the current row in stripe
+ return std::min(currentRowInStripe, rowsInCurrentStripe);
+ }
+ auto rg = static_cast<uint32_t>(currentRowInStripe / rowIndexStride);
+ if (rg >= groupsInStripe) {
+ // Points to the end of the stripe
+ return rowsInCurrentStripe;
+ }
+ if (nextSkippedRows[rg] != 0) {
+ // Current row group is selected
+ return currentRowInStripe;
+ }
+ // Advance to the next selected row group
+ while (rg < groupsInStripe && nextSkippedRows[rg] == 0) ++rg;
+ if (rg < groupsInStripe) {
+ return rg * rowIndexStride;
+ }
+ return rowsInCurrentStripe;
}
std::unique_ptr<ColumnVectorBatch> RowReaderImpl::createRowBatch
diff --git a/c++/src/Reader.hh b/c++/src/Reader.hh
index 0facafc44..255bb8d25 100644
--- a/c++/src/Reader.hh
+++ b/c++/src/Reader.hh
@@ -175,13 +175,13 @@ namespace orc {
uint64_t currentRowInStripe,
uint64_t rowsInCurrentStripe,
uint64_t rowIndexStride,
- const std::vector<bool>& includedRowGroups);
+ const std::vector<uint64_t>& nextSkippedRows);
// Skip non-selected rows
static uint64_t advanceToNextRowGroup(uint64_t currentRowInStripe,
uint64_t rowsInCurrentStripe,
uint64_t rowIndexStride,
- const std::vector<bool>& includedRowGroups);
+ const std::vector<uint64_t>& nextSkippedRows);
friend class TestRowReader_advanceToNextRowGroup_Test;
friend class TestRowReader_computeBatchSize_Test;
diff --git a/c++/src/sargs/SargsApplier.cc b/c++/src/sargs/SargsApplier.cc
index b4a736753..42a554f5c 100644
--- a/c++/src/sargs/SargsApplier.cc
+++ b/c++/src/sargs/SargsApplier.cc
@@ -71,7 +71,7 @@ namespace orc {
// init state of each row group
uint64_t groupsInStripe =
(rowsInStripe + mRowIndexStride - 1) / mRowIndexStride;
- mRowGroups.resize(groupsInStripe, true);
+ mNextSkippedRows.resize(groupsInStripe);
mTotalRowsInStripe = rowsInStripe;
// row indexes do not exist, simply read all rows
@@ -85,7 +85,10 @@ namespace orc {
leaves.size(), TruthValue::YES_NO_NULL);
mHasSelected = false;
mHasSkipped = false;
- for (size_t rowGroup = 0; rowGroup != groupsInStripe; ++rowGroup) {
+ uint64_t nextSkippedRowGroup = groupsInStripe;
+ size_t rowGroup = groupsInStripe;
+ do {
+ --rowGroup;
for (size_t pred = 0; pred != leaves.size(); ++pred) {
uint64_t columnIdx = mFilterColumns[pred];
auto rowIndexIter = rowIndexes.find(columnIdx);
@@ -110,14 +113,21 @@ namespace orc {
}
}
- mRowGroups[rowGroup] = isNeeded(mSearchArgument->evaluate(leafValues));
- mHasSelected = mHasSelected || mRowGroups[rowGroup];
- mHasSkipped = mHasSkipped || (!mRowGroups[rowGroup]);
- }
+ bool needed = isNeeded(mSearchArgument->evaluate(leafValues));
+ if (!needed) {
+ mNextSkippedRows[rowGroup] = 0;
+ nextSkippedRowGroup = rowGroup;
+ } else {
+ mNextSkippedRows[rowGroup] = (nextSkippedRowGroup == groupsInStripe) ?
+ rowsInStripe : (nextSkippedRowGroup * mRowIndexStride);
+ }
+ mHasSelected |= needed;
+ mHasSkipped |= !needed;
+ } while (rowGroup != 0);
// update stats
mStats.first = std::accumulate(
- mRowGroups.cbegin(), mRowGroups.cend(), mStats.first,
+ mNextSkippedRows.cbegin(), mNextSkippedRows.cend(), mStats.first,
[](bool rg, uint64_t s) { return rg ? 1 : 0 + s; });
mStats.second += groupsInStripe;
@@ -156,8 +166,8 @@ namespace orc {
bool ret = evaluateColumnStatistics(stripeStats.colstats());
if (!ret) {
- // reset mRowGroups when the current stripe does not satisfy the PPD
- mRowGroups.clear();
+ // reset mNextSkippedRows when the current stripe does not satisfy the PPD
+ mNextSkippedRows.clear();
}
return ret;
}
diff --git a/c++/src/sargs/SargsApplier.hh b/c++/src/sargs/SargsApplier.hh
index 1842828d5..d8bdf852d 100644
--- a/c++/src/sargs/SargsApplier.hh
+++ b/c++/src/sargs/SargsApplier.hh
@@ -60,10 +60,11 @@ namespace orc {
const std::map<uint32_t, BloomFilterIndex>& bloomFilters);
/**
- * Return a vector of bool for each row group for their selection
- * in the last evaluation
+ * Return a vector of the next skipped row for each RowGroup. Each value is the row id
+ * in stripe. 0 means the current RowGroup is entirely skipped.
+ * Only valid after invoking pickRowGroups().
*/
- const std::vector<bool>& getRowGroups() const { return mRowGroups; }
+ const std::vector<uint64_t>& getNextSkippedRows() const { return mNextSkippedRows; }
/**
* Indicate whether any row group is selected in the last evaluation
@@ -80,8 +81,8 @@ namespace orc {
*/
bool hasSelectedFrom(uint64_t currentRowInStripe) const {
uint64_t rg = currentRowInStripe / mRowIndexStride;
- for (; rg < mRowGroups.size(); ++rg) {
- if (mRowGroups[rg]) {
+ for (; rg < mNextSkippedRows.size(); ++rg) {
+ if (mNextSkippedRows[rg]) {
return true;
}
}
@@ -111,8 +112,10 @@ namespace orc {
// column ids for each predicate leaf in the search argument
std::vector<uint64_t> mFilterColumns;
- // store results of last call of pickRowGroups
- std::vector<bool> mRowGroups;
+ // Map from RowGroup index to the next skipped row of the selected range it
+ // locates. If the RowGroup is not selected, set the value to 0.
+ // Calculated in pickRowGroups().
+ std::vector<uint64_t> mNextSkippedRows;
uint64_t mTotalRowsInStripe;
bool mHasSelected;
bool mHasSkipped;
diff --git a/c++/test/TestReader.cc b/c++/test/TestReader.cc
index 4044e0399..237deab05 100644
--- a/c++/test/TestReader.cc
+++ b/c++/test/TestReader.cc
@@ -58,55 +58,55 @@ namespace orc {
TEST(TestRowReader, computeBatchSize) {
uint64_t rowIndexStride = 100;
uint64_t rowsInCurrentStripe = 100 * 8 + 50;
- std::vector<bool> includedRowGroups =
- { false, false, true, true, false, false, true, true, false };
+ std::vector<uint64_t> nextSkippedRows =
+ { 0, 0, 400, 400, 0, 0, 800, 800, 0 };
EXPECT_EQ(0, RowReaderImpl::computeBatchSize(
- 1024, 0, rowsInCurrentStripe, rowIndexStride, includedRowGroups));
+ 1024, 0, rowsInCurrentStripe, rowIndexStride, nextSkippedRows));
EXPECT_EQ(0, RowReaderImpl::computeBatchSize(
- 1024, 50, rowsInCurrentStripe, rowIndexStride, includedRowGroups));
+ 1024, 50, rowsInCurrentStripe, rowIndexStride, nextSkippedRows));
EXPECT_EQ(200, RowReaderImpl::computeBatchSize(
- 1024, 200, rowsInCurrentStripe, rowIndexStride, includedRowGroups));
+ 1024, 200, rowsInCurrentStripe, rowIndexStride, nextSkippedRows));
EXPECT_EQ(150, RowReaderImpl::computeBatchSize(
- 1024, 250, rowsInCurrentStripe, rowIndexStride, includedRowGroups));
+ 1024, 250, rowsInCurrentStripe, rowIndexStride, nextSkippedRows));
EXPECT_EQ(0, RowReaderImpl::computeBatchSize(
- 1024, 550, rowsInCurrentStripe, rowIndexStride, includedRowGroups));
+ 1024, 550, rowsInCurrentStripe, rowIndexStride, nextSkippedRows));
EXPECT_EQ(100, RowReaderImpl::computeBatchSize(
- 1024, 700, rowsInCurrentStripe, rowIndexStride, includedRowGroups));
+ 1024, 700, rowsInCurrentStripe, rowIndexStride, nextSkippedRows));
EXPECT_EQ(50, RowReaderImpl::computeBatchSize(
- 50, 700, rowsInCurrentStripe, rowIndexStride, includedRowGroups));
+ 50, 700, rowsInCurrentStripe, rowIndexStride, nextSkippedRows));
EXPECT_EQ(0, RowReaderImpl::computeBatchSize(
- 50, 810, rowsInCurrentStripe, rowIndexStride, includedRowGroups));
+ 50, 810, rowsInCurrentStripe, rowIndexStride, nextSkippedRows));
EXPECT_EQ(0, RowReaderImpl::computeBatchSize(
- 50, 900, rowsInCurrentStripe, rowIndexStride, includedRowGroups));
+ 50, 900, rowsInCurrentStripe, rowIndexStride, nextSkippedRows));
}
TEST(TestRowReader, advanceToNextRowGroup) {
uint64_t rowIndexStride = 100;
uint64_t rowsInCurrentStripe = 100 * 8 + 50;
- std::vector<bool> includedRowGroups =
- { false, false, true, true, false, false, true, true, false };
+ std::vector<uint64_t> nextSkippedRows =
+ { 0, 0, 400, 400, 0, 0, 800, 800, 0 };
EXPECT_EQ(200, RowReaderImpl::advanceToNextRowGroup(
- 0, rowsInCurrentStripe, rowIndexStride, includedRowGroups));
+ 0, rowsInCurrentStripe, rowIndexStride, nextSkippedRows));
EXPECT_EQ(200, RowReaderImpl::advanceToNextRowGroup(
- 150, rowsInCurrentStripe, rowIndexStride, includedRowGroups));
+ 150, rowsInCurrentStripe, rowIndexStride, nextSkippedRows));
EXPECT_EQ(250, RowReaderImpl::advanceToNextRowGroup(
- 250, rowsInCurrentStripe, rowIndexStride, includedRowGroups));
+ 250, rowsInCurrentStripe, rowIndexStride, nextSkippedRows));
EXPECT_EQ(350, RowReaderImpl::advanceToNextRowGroup(
- 350, rowsInCurrentStripe, rowIndexStride, includedRowGroups));
+ 350, rowsInCurrentStripe, rowIndexStride, nextSkippedRows));
EXPECT_EQ(350, RowReaderImpl::advanceToNextRowGroup(
- 350, rowsInCurrentStripe, rowIndexStride, includedRowGroups));
+ 350, rowsInCurrentStripe, rowIndexStride, nextSkippedRows));
EXPECT_EQ(600, RowReaderImpl::advanceToNextRowGroup(
- 500, rowsInCurrentStripe, rowIndexStride, includedRowGroups));
+ 500, rowsInCurrentStripe, rowIndexStride, nextSkippedRows));
EXPECT_EQ(699, RowReaderImpl::advanceToNextRowGroup(
- 699, rowsInCurrentStripe, rowIndexStride, includedRowGroups));
+ 699, rowsInCurrentStripe, rowIndexStride, nextSkippedRows));
EXPECT_EQ(799, RowReaderImpl::advanceToNextRowGroup(
- 799, rowsInCurrentStripe, rowIndexStride, includedRowGroups));
+ 799, rowsInCurrentStripe, rowIndexStride, nextSkippedRows));
EXPECT_EQ(850, RowReaderImpl::advanceToNextRowGroup(
- 800, rowsInCurrentStripe, rowIndexStride, includedRowGroups));
+ 800, rowsInCurrentStripe, rowIndexStride, nextSkippedRows));
EXPECT_EQ(850, RowReaderImpl::advanceToNextRowGroup(
- 900, rowsInCurrentStripe, rowIndexStride, includedRowGroups));
+ 900, rowsInCurrentStripe, rowIndexStride, nextSkippedRows));
}
void CheckFileWithSargs(const char* fileName, const char* softwareVersion) {
diff --git a/c++/test/TestSargsApplier.cc b/c++/test/TestSargsApplier.cc
index 2ec8c3cb8..74fcae297 100644
--- a/c++/test/TestSargsApplier.cc
+++ b/c++/test/TestSargsApplier.cc
@@ -112,12 +112,12 @@ namespace orc {
// evaluate row group index
SargsApplier applier(*type, sarg.get(), 1000, WriterVersion_ORC_135);
EXPECT_TRUE(applier.pickRowGroups(4000, rowIndexes, {}));
- std::vector<bool> rowgroups = applier.getRowGroups();
- EXPECT_EQ(4, rowgroups.size());
- EXPECT_EQ(false, rowgroups[0]);
- EXPECT_EQ(false, rowgroups[1]);
- EXPECT_EQ(false, rowgroups[2]);
- EXPECT_EQ(true, rowgroups[3]);
+ const auto& nextSkippedRows = applier.getNextSkippedRows();
+ EXPECT_EQ(4, nextSkippedRows.size());
+ EXPECT_EQ(0, nextSkippedRows[0]);
+ EXPECT_EQ(0, nextSkippedRows[1]);
+ EXPECT_EQ(0, nextSkippedRows[2]);
+ EXPECT_EQ(4000, nextSkippedRows[3]);
}
TEST(TestSargsApplier, testStripeAndFileStats) {
diff --git a/tools/test/TestFileScan.cc b/tools/test/TestFileScan.cc
index 0aaa689d9..c9c1eacc1 100644
--- a/tools/test/TestFileScan.cc
+++ b/tools/test/TestFileScan.cc
@@ -206,7 +206,7 @@ void checkForError(const std::string& filename, const std::string& error_msg) {
std::string error;
EXPECT_EQ(1, runProgram({pgm, filename}, output, error));
EXPECT_EQ("", output);
- EXPECT_NE(std::string::npos, error.find(error_msg));
+ EXPECT_NE(std::string::npos, error.find(error_msg)) << error;
}
TEST (TestFileScan, testErrorHandling) {