You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@orc.apache.org by do...@apache.org on 2022/04/13 17:58:21 UTC
[orc] branch main updated: ORC-969: [C++] Evaluate SearchArguments using file and stripe level stats #1073
This is an automated email from the ASF dual-hosted git repository.
dongjoon pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/orc.git
The following commit(s) were added to refs/heads/main by this push:
new cbf00628c ORC-969: [C++] Evaluate SearchArguments using file and stripe level stats #1073
cbf00628c is described below
commit cbf00628c4d5ec8287eae419edf7e5dede4d07cf
Author: mingshen.zx <mi...@alibaba-inc.com>
AuthorDate: Tue Mar 29 15:17:20 2022 +0800
ORC-969: [C++] Evaluate SearchArguments using file and stripe level stats #1073
PPD use file stats and stripe stats to filter file content.
To make better use of file and stripe stats.
Use the UT testStripeAndFileStats test.
Closes #1073
Signed-off-by: Dongjoon Hyun <do...@apache.org>
---
c++/src/Reader.cc | 59 ++++++++++++++++++---------
c++/src/Reader.hh | 2 +-
c++/src/sargs/SargsApplier.cc | 53 +++++++++++++++++++++++-
c++/src/sargs/SargsApplier.hh | 20 +++++++++
c++/test/TestPredicatePushdown.cc | 85 +++++++++++++++++++++++++++++++++++++++
c++/test/TestSargsApplier.cc | 71 ++++++++++++++++++++++++++++++++
6 files changed, 270 insertions(+), 20 deletions(-)
diff --git a/c++/src/Reader.cc b/c++/src/Reader.cc
index 34cc95052..b79f2132c 100644
--- a/c++/src/Reader.cc
+++ b/c++/src/Reader.cc
@@ -591,8 +591,8 @@ namespace orc {
if (!isMetadataLoaded) {
readMetadata();
}
- return metadata.get() == nullptr ? 0 :
- static_cast<uint64_t>(metadata->stripestats_size());
+ return contents->metadata == nullptr ? 0 :
+ static_cast<uint64_t>(contents->metadata->stripestats_size());
}
std::unique_ptr<StripeInformation>
@@ -767,11 +767,11 @@ namespace orc {
if (!isMetadataLoaded) {
readMetadata();
}
- if (metadata.get() == nullptr) {
+ if (contents->metadata == nullptr) {
throw std::logic_error("No stripe statistics in file");
}
size_t num_cols = static_cast<size_t>(
- metadata->stripestats(
+ contents->metadata->stripestats(
static_cast<int>(stripeIndex)).colstats_size());
std::vector<std::vector<proto::ColumnStatistics> > indexStats(num_cols);
@@ -788,7 +788,7 @@ namespace orc {
getLocalTimezone();
StatContext statContext(hasCorrectStatistics(), &writerTZ);
return std::unique_ptr<StripeStatistics>
- (new StripeStatisticsImpl(metadata->stripestats(static_cast<int>(stripeIndex)),
+ (new StripeStatisticsImpl(contents->metadata->stripestats(static_cast<int>(stripeIndex)),
indexStats, statContext));
}
@@ -831,8 +831,8 @@ namespace orc {
*contents->pool)),
contents->blockSize,
*contents->pool);
- metadata.reset(new proto::Metadata());
- if (!metadata->ParseFromZeroCopyStream(pbStream.get())) {
+ contents->metadata.reset(new proto::Metadata());
+ if (!contents->metadata->ParseFromZeroCopyStream(pbStream.get())) {
throw ParseError("Failed to parse the metadata");
}
}
@@ -860,6 +860,10 @@ namespace orc {
std::unique_ptr<RowReader> ReaderImpl::createRowReader(
const RowReaderOptions& opts) const {
+ if (opts.getSearchArgument() && !isMetadataLoaded) {
+ // load stripe statistics for PPD
+ readMetadata();
+ }
return std::unique_ptr<RowReader>(new RowReaderImpl(contents, opts));
}
@@ -1034,6 +1038,15 @@ namespace orc {
rowIndexes.clear();
bloomFilterIndex.clear();
+ // evaluate file statistics if it exists
+ if (sargsApplier && !sargsApplier->evaluateFileStatistics(*footer)) {
+ // skip the entire file
+ currentStripe = lastStripe;
+ currentRowInStripe = 0;
+ rowsInCurrentStripe = 0;
+ return;
+ }
+
do {
currentStripeInfo = footer->stripes(static_cast<int>(currentStripe));
uint64_t fileLength = contents->stream->getLength();
@@ -1050,16 +1063,26 @@ namespace orc {
rowsInCurrentStripe = currentStripeInfo.numberofrows();
if (sargsApplier) {
- // read row group statistics and bloom filters of current stripe
- loadStripeIndex();
-
- // select row groups to read in the current stripe
- sargsApplier->pickRowGroups(rowsInCurrentStripe,
- rowIndexes,
- bloomFilterIndex);
- if (sargsApplier->hasSelectedFrom(currentRowInStripe)) {
- // current stripe has at least one row group matching the predicate
- break;
+ bool isStripeNeeded = true;
+ if (contents->metadata) {
+ const auto& currentStripeStats =
+ contents->metadata->stripestats(static_cast<int>(currentStripe));
+ // skip this stripe after stats fail to satisfy sargs
+ isStripeNeeded = sargsApplier->evaluateStripeStatistics(currentStripeStats);
+ }
+
+ if (isStripeNeeded) {
+ // read row group statistics and bloom filters of current stripe
+ loadStripeIndex();
+
+ // select row groups to read in the current stripe
+ sargsApplier->pickRowGroups(rowsInCurrentStripe,
+ rowIndexes,
+ bloomFilterIndex);
+ if (sargsApplier->hasSelectedFrom(currentRowInStripe)) {
+ // current stripe has at least one row group matching the predicate
+ break;
+ }
} else {
// advance to next stripe when current stripe has no matching rows
currentStripe += 1;
@@ -1113,7 +1136,7 @@ namespace orc {
uint64_t rowsToRead =
std::min(static_cast<uint64_t>(data.capacity),
rowsInCurrentStripe - currentRowInStripe);
- if (sargsApplier) {
+ if (sargsApplier && rowsToRead > 0) {
rowsToRead = computeBatchSize(rowsToRead,
currentRowInStripe,
rowsInCurrentStripe,
diff --git a/c++/src/Reader.hh b/c++/src/Reader.hh
index 54dbe339b..0facafc44 100644
--- a/c++/src/Reader.hh
+++ b/c++/src/Reader.hh
@@ -66,6 +66,7 @@ namespace orc {
/// Decimal64 in ORCv2 uses RLE to store values. This flag indicates whether
/// this new encoding is used.
bool isDecimalAsLong;
+ std::unique_ptr<proto::Metadata> metadata;
};
proto::StripeFooter getStripeFooter(const proto::StripeInformation& info,
@@ -254,7 +255,6 @@ namespace orc {
std::vector<std::vector<proto::ColumnStatistics> >* indexStats) const;
// metadata
- mutable std::unique_ptr<proto::Metadata> metadata;
mutable bool isMetadataLoaded;
public:
/**
diff --git a/c++/src/sargs/SargsApplier.cc b/c++/src/sargs/SargsApplier.cc
index f99499e77..b4a736753 100644
--- a/c++/src/sargs/SargsApplier.cc
+++ b/c++/src/sargs/SargsApplier.cc
@@ -46,7 +46,9 @@ namespace orc {
, mSearchArgument(searchArgument)
, mRowIndexStride(rowIndexStride)
, mWriterVersion(writerVersion)
- , mStats(0, 0) {
+ , mStats(0, 0)
+ , mHasEvaluatedFileStats(false)
+ , mFileStatsEvalResult(true) {
const SearchArgumentImpl * sargs =
dynamic_cast<const SearchArgumentImpl *>(mSearchArgument);
@@ -122,4 +124,53 @@ namespace orc {
return mHasSelected;
}
+ bool SargsApplier::evaluateColumnStatistics(
+ const PbColumnStatistics& colStats) const {
+ const SearchArgumentImpl * sargs =
+ dynamic_cast<const SearchArgumentImpl *>(mSearchArgument);
+ if (sargs == nullptr) {
+ throw InvalidArgument("Failed to cast to SearchArgumentImpl");
+ }
+
+ const std::vector<PredicateLeaf>& leaves = sargs->getLeaves();
+ std::vector<TruthValue> leafValues(
+ leaves.size(), TruthValue::YES_NO_NULL);
+
+ for (size_t pred = 0; pred != leaves.size(); ++pred) {
+ uint64_t columnId = mFilterColumns[pred];
+ if (columnId != INVALID_COLUMN_ID &&
+ colStats.size() > static_cast<int>(columnId)) {
+ leafValues[pred] = leaves[pred].evaluate(
+ mWriterVersion, colStats.Get(static_cast<int>(columnId)), nullptr);
+ }
+ }
+
+ return isNeeded(mSearchArgument->evaluate(leafValues));
+ }
+
+ bool SargsApplier::evaluateStripeStatistics(
+ const proto::StripeStatistics& stripeStats) {
+ if (stripeStats.colstats_size() == 0) {
+ return true;
+ }
+
+ bool ret = evaluateColumnStatistics(stripeStats.colstats());
+ if (!ret) {
+ // reset mRowGroups when the current stripe does not satisfy the PPD
+ mRowGroups.clear();
+ }
+ return ret;
+ }
+
+ bool SargsApplier::evaluateFileStatistics(const proto::Footer& footer) {
+ if (!mHasEvaluatedFileStats) {
+ if (footer.statistics_size() == 0) {
+ mFileStatsEvalResult = true;
+ } else {
+ mFileStatsEvalResult = evaluateColumnStatistics(footer.statistics());
+ }
+ mHasEvaluatedFileStats = true;
+ }
+ return mFileStatsEvalResult;
+ }
}
diff --git a/c++/src/sargs/SargsApplier.hh b/c++/src/sargs/SargsApplier.hh
index 39650f1fa..1842828d5 100644
--- a/c++/src/sargs/SargsApplier.hh
+++ b/c++/src/sargs/SargsApplier.hh
@@ -37,6 +37,18 @@ namespace orc {
uint64_t rowIndexStride,
WriterVersion writerVersion);
+ /**
+ * Evaluate search argument on file statistics
+ * @return true if file statistics satisfy the sargs
+ */
+ bool evaluateFileStatistics(const proto::Footer& footer);
+
+ /**
+ * Evaluate search argument on stripe statistics
+ * @return true if stripe statistics satisfy the sargs
+ */
+ bool evaluateStripeStatistics(const proto::StripeStatistics& stripeStats);
+
/**
* TODO: use proto::RowIndex and proto::BloomFilter to do the evaluation
* Pick the row groups that we need to load from the current stripe.
@@ -81,6 +93,11 @@ namespace orc {
}
private:
+ // evaluate column statistics in the form of protobuf::RepeatedPtrField
+ typedef ::google::protobuf::RepeatedPtrField<proto::ColumnStatistics>
+ PbColumnStatistics;
+ bool evaluateColumnStatistics(const PbColumnStatistics& colStats) const;
+
friend class TestSargsApplier_findColumnTest_Test;
friend class TestSargsApplier_findArrayColumnTest_Test;
friend class TestSargsApplier_findMapColumnTest_Test;
@@ -101,6 +118,9 @@ namespace orc {
bool mHasSkipped;
// keep stats of selected RGs and evaluated RGs
std::pair<uint64_t, uint64_t> mStats;
+ // store result of file stats evaluation
+ bool mHasEvaluatedFileStats;
+ bool mFileStatsEvalResult;
};
}
diff --git a/c++/test/TestPredicatePushdown.cc b/c++/test/TestPredicatePushdown.cc
index 41d0b532f..6bd81f091 100644
--- a/c++/test/TestPredicatePushdown.cc
+++ b/c++/test/TestPredicatePushdown.cc
@@ -238,4 +238,89 @@ namespace orc {
TestNoRowsSelected(reader.get());
TestOrPredicates(reader.get());
}
+
+ void TestNoRowsSelectedWithFileStats(Reader* reader) {
+ std::unique_ptr<SearchArgument> sarg =
+ SearchArgumentFactory::newBuilder()
+ ->startAnd()
+ .lessThan("col1", PredicateDataType::LONG,
+ Literal(static_cast<int64_t>(0)))
+ .end()
+ .build();
+
+ RowReaderOptions rowReaderOpts;
+ rowReaderOpts.searchArgument(std::move(sarg));
+ auto rowReader = reader->createRowReader(rowReaderOpts);
+
+ auto readBatch = rowReader->createRowBatch(2000);
+ EXPECT_EQ(false, rowReader->next(*readBatch));
+ }
+
+ void TestSelectedWithStripeStats(Reader* reader) {
+ std::unique_ptr<SearchArgument> sarg =
+ SearchArgumentFactory::newBuilder()
+ ->startAnd()
+ .between("col1",
+ PredicateDataType::LONG,
+ Literal(static_cast<int64_t>(3500)),
+ Literal(static_cast<int64_t>(7000)))
+ .end()
+ .build();
+
+ RowReaderOptions rowReaderOpts;
+ rowReaderOpts.searchArgument(std::move(sarg));
+ auto rowReader = reader->createRowReader(rowReaderOpts);
+
+ auto readBatch = rowReader->createRowBatch(2000);
+ EXPECT_EQ(true, rowReader->next(*readBatch));
+ // test previous row number
+ EXPECT_EQ(3500, rowReader->getRowNumber());
+ EXPECT_EQ(2000, readBatch->numElements);
+ auto& batch0 = dynamic_cast<StructVectorBatch&>(*readBatch);
+ auto& batch1 = dynamic_cast<LongVectorBatch&>(*batch0.fields[0]);
+ for (uint64_t i = 0; i < 2000; ++i) {
+ EXPECT_EQ(i + 3500 , batch1.data[i]);
+ }
+ }
+
+ TEST(TestPredicatePushdown, testStripeAndFileStats) {
+ MemoryOutputStream memStream(DEFAULT_MEM_STREAM_SIZE);
+ MemoryPool * pool = getDefaultPool();
+ auto type = std::unique_ptr<Type>(Type::buildTypeFromString(
+ "struct<col1:bigint>"));
+ WriterOptions options;
+ options.setStripeSize(1)
+ .setCompressionBlockSize(1024)
+ .setCompression(CompressionKind_NONE)
+ .setMemoryPool(pool)
+ .setRowIndexStride(1000);
+
+ auto writer = createWriter(*type, &memStream, options);
+ auto batch = writer->createRowBatch(3500);
+ auto& structBatch = dynamic_cast<StructVectorBatch&>(*batch);
+ auto& longBatch = dynamic_cast<LongVectorBatch&>(*structBatch.fields[0]);
+
+ // stripe 1 : 0 <= col1 < 3500
+ // stripe 2 : 3500<= col1 < 7000
+ uint64_t stripeCount = 2;
+ for (uint64_t currentStripe = 0; currentStripe < stripeCount; ++currentStripe) {
+ for (uint64_t i = 0; i < 3500; ++i) {
+ longBatch.data[i] = static_cast<int64_t>(i + currentStripe * 3500);
+ }
+ structBatch.numElements = 3500;
+ longBatch.numElements = 3500;
+ writer->add(*batch);
+ }
+ writer->close();
+ std::unique_ptr<InputStream> inStream(new MemoryInputStream (
+ memStream.getData(), memStream.getLength()));
+ ReaderOptions readerOptions;
+ readerOptions.setMemoryPool(*pool);
+ std::unique_ptr<Reader> reader = createReader(std::move(inStream), readerOptions);
+ EXPECT_EQ(7000, reader->getNumberOfRows());
+ EXPECT_EQ(stripeCount, reader->getNumberOfStripes());
+
+ TestNoRowsSelectedWithFileStats(reader.get());
+ TestSelectedWithStripeStats(reader.get());
+ }
} // namespace orc
diff --git a/c++/test/TestSargsApplier.cc b/c++/test/TestSargsApplier.cc
index 151635897..2ec8c3cb8 100644
--- a/c++/test/TestSargsApplier.cc
+++ b/c++/test/TestSargsApplier.cc
@@ -120,4 +120,75 @@ namespace orc {
EXPECT_EQ(true, rowgroups[3]);
}
+ TEST(TestSargsApplier, testStripeAndFileStats) {
+ auto type = std::unique_ptr<Type>(
+ Type::buildTypeFromString("struct<x:int,y:int>"));
+ auto sarg = SearchArgumentFactory::newBuilder()
+ ->startAnd()
+ .equals(
+ "x",
+ PredicateDataType::LONG,
+ Literal(static_cast<int64_t>(20)))
+ .equals(
+ "y",
+ PredicateDataType::LONG,
+ Literal(static_cast<int64_t>(40)))
+ .end()
+ .build();
+ // Test stripe stats 0 <= x <= 10 and 0 <= y <= 50
+ {
+ orc::proto::StripeStatistics stripeStats;
+ proto::ColumnStatistics structStatistics;
+ structStatistics.set_hasnull(false);
+ *stripeStats.add_colstats() = structStatistics;
+ *stripeStats.add_colstats() = createIntStats(0L, 10L);
+ *stripeStats.add_colstats() = createIntStats(0L, 50L);
+ SargsApplier applier(*type, sarg.get(), 1000, WriterVersion_ORC_135);
+ EXPECT_FALSE(applier.evaluateStripeStatistics(stripeStats));
+ }
+ // Test stripe stats 0 <= x <= 50 and 0 <= y <= 50
+ {
+ orc::proto::StripeStatistics stripeStats;
+ proto::ColumnStatistics structStatistics;
+ structStatistics.set_hasnull(false);
+ *stripeStats.add_colstats() = structStatistics;
+ *stripeStats.add_colstats() = createIntStats(0L, 50L);
+ *stripeStats.add_colstats() = createIntStats(0L, 50L);
+ SargsApplier applier(*type, sarg.get(), 1000, WriterVersion_ORC_135);
+ EXPECT_TRUE(applier.evaluateStripeStatistics(stripeStats));
+ }
+ // Test file stats 0 <= x <= 10 and 0 <= y <= 50
+ {
+ orc::proto::Footer footer;
+ proto::ColumnStatistics structStatistics;
+ structStatistics.set_hasnull(false);
+ *footer.add_statistics() = structStatistics;
+ *footer.add_statistics() = createIntStats(0L, 10L);
+ *footer.add_statistics() = createIntStats(0L, 50L);
+ SargsApplier applier(*type, sarg.get(), 1000, WriterVersion_ORC_135);
+ EXPECT_FALSE(applier.evaluateFileStatistics(footer));
+ }
+ // Test file stats 0 <= x <= 50 and 0 <= y <= 30
+ {
+ orc::proto::Footer footer;
+ proto::ColumnStatistics structStatistics;
+ structStatistics.set_hasnull(false);
+ *footer.add_statistics() = structStatistics;
+ *footer.add_statistics() = createIntStats(0L, 50L);
+ *footer.add_statistics() = createIntStats(0L, 30L);
+ SargsApplier applier(*type, sarg.get(), 1000, WriterVersion_ORC_135);
+ EXPECT_FALSE(applier.evaluateFileStatistics(footer));
+ }
+ // Test file stats 0 <= x <= 50 and 0 <= y <= 50
+ {
+ orc::proto::Footer footer;
+ proto::ColumnStatistics structStatistics;
+ structStatistics.set_hasnull(false);
+ *footer.add_statistics() = structStatistics;
+ *footer.add_statistics() = createIntStats(0L, 50L);
+ *footer.add_statistics() = createIntStats(0L, 50L);
+ SargsApplier applier(*type, sarg.get(), 1000, WriterVersion_ORC_135);
+ EXPECT_TRUE(applier.evaluateFileStatistics(footer));
+ }
+ }
} // namespace orc