You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@orc.apache.org by do...@apache.org on 2022/04/13 17:58:21 UTC

[orc] branch main updated: ORC-969: [C++] Evaluate SearchArguments using file and stripe level stats #1073

This is an automated email from the ASF dual-hosted git repository.

dongjoon pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/orc.git


The following commit(s) were added to refs/heads/main by this push:
     new cbf00628c ORC-969: [C++] Evaluate SearchArguments using file and stripe level stats #1073
cbf00628c is described below

commit cbf00628c4d5ec8287eae419edf7e5dede4d07cf
Author: mingshen.zx <mi...@alibaba-inc.com>
AuthorDate: Tue Mar 29 15:17:20 2022 +0800

    ORC-969: [C++] Evaluate SearchArguments using file and stripe level stats #1073
    
    PPD use file stats and stripe stats to filter file content.
    
    To make better use of file and stripe stats.
    
    Use the UT testStripeAndFileStats test.
    
    Closes #1073
    
    Signed-off-by: Dongjoon Hyun <do...@apache.org>
---
 c++/src/Reader.cc                 | 59 ++++++++++++++++++---------
 c++/src/Reader.hh                 |  2 +-
 c++/src/sargs/SargsApplier.cc     | 53 +++++++++++++++++++++++-
 c++/src/sargs/SargsApplier.hh     | 20 +++++++++
 c++/test/TestPredicatePushdown.cc | 85 +++++++++++++++++++++++++++++++++++++++
 c++/test/TestSargsApplier.cc      | 71 ++++++++++++++++++++++++++++++++
 6 files changed, 270 insertions(+), 20 deletions(-)

diff --git a/c++/src/Reader.cc b/c++/src/Reader.cc
index 34cc95052..b79f2132c 100644
--- a/c++/src/Reader.cc
+++ b/c++/src/Reader.cc
@@ -591,8 +591,8 @@ namespace orc {
     if (!isMetadataLoaded) {
       readMetadata();
     }
-    return metadata.get() == nullptr ? 0 :
-      static_cast<uint64_t>(metadata->stripestats_size());
+    return contents->metadata == nullptr ? 0 :
+      static_cast<uint64_t>(contents->metadata->stripestats_size());
   }
 
   std::unique_ptr<StripeInformation>
@@ -767,11 +767,11 @@ namespace orc {
     if (!isMetadataLoaded) {
       readMetadata();
     }
-    if (metadata.get() == nullptr) {
+    if (contents->metadata == nullptr) {
       throw std::logic_error("No stripe statistics in file");
     }
     size_t num_cols = static_cast<size_t>(
-                          metadata->stripestats(
+                          contents->metadata->stripestats(
                               static_cast<int>(stripeIndex)).colstats_size());
     std::vector<std::vector<proto::ColumnStatistics> > indexStats(num_cols);
 
@@ -788,7 +788,7 @@ namespace orc {
         getLocalTimezone();
     StatContext statContext(hasCorrectStatistics(), &writerTZ);
     return std::unique_ptr<StripeStatistics>
-           (new StripeStatisticsImpl(metadata->stripestats(static_cast<int>(stripeIndex)),
+           (new StripeStatisticsImpl(contents->metadata->stripestats(static_cast<int>(stripeIndex)),
                                                    indexStats, statContext));
   }
 
@@ -831,8 +831,8 @@ namespace orc {
                                                           *contents->pool)),
                            contents->blockSize,
                            *contents->pool);
-      metadata.reset(new proto::Metadata());
-      if (!metadata->ParseFromZeroCopyStream(pbStream.get())) {
+      contents->metadata.reset(new proto::Metadata());
+      if (!contents->metadata->ParseFromZeroCopyStream(pbStream.get())) {
         throw ParseError("Failed to parse the metadata");
       }
     }
@@ -860,6 +860,10 @@ namespace orc {
 
   std::unique_ptr<RowReader> ReaderImpl::createRowReader(
            const RowReaderOptions& opts) const {
+    if (opts.getSearchArgument() && !isMetadataLoaded) {
+      // load stripe statistics for PPD
+      readMetadata();
+    }
     return std::unique_ptr<RowReader>(new RowReaderImpl(contents, opts));
   }
 
@@ -1034,6 +1038,15 @@ namespace orc {
     rowIndexes.clear();
     bloomFilterIndex.clear();
 
+    // evaluate file statistics if it exists
+    if (sargsApplier && !sargsApplier->evaluateFileStatistics(*footer)) {
+      // skip the entire file
+      currentStripe = lastStripe;
+      currentRowInStripe = 0;
+      rowsInCurrentStripe = 0;
+      return;
+    }
+
     do {
       currentStripeInfo = footer->stripes(static_cast<int>(currentStripe));
       uint64_t fileLength = contents->stream->getLength();
@@ -1050,16 +1063,26 @@ namespace orc {
       rowsInCurrentStripe = currentStripeInfo.numberofrows();
 
       if (sargsApplier) {
-        // read row group statistics and bloom filters of current stripe
-        loadStripeIndex();
-
-        // select row groups to read in the current stripe
-        sargsApplier->pickRowGroups(rowsInCurrentStripe,
-                                    rowIndexes,
-                                    bloomFilterIndex);
-        if (sargsApplier->hasSelectedFrom(currentRowInStripe)) {
-          // current stripe has at least one row group matching the predicate
-          break;
+        bool isStripeNeeded = true;
+        if (contents->metadata) {
+          const auto& currentStripeStats =
+            contents->metadata->stripestats(static_cast<int>(currentStripe));
+          // skip this stripe after stats fail to satisfy sargs
+          isStripeNeeded = sargsApplier->evaluateStripeStatistics(currentStripeStats);
+        }
+
+        if (isStripeNeeded) {
+          // read row group statistics and bloom filters of current stripe
+          loadStripeIndex();
+
+          // select row groups to read in the current stripe
+          sargsApplier->pickRowGroups(rowsInCurrentStripe,
+                                      rowIndexes,
+                                      bloomFilterIndex);
+          if (sargsApplier->hasSelectedFrom(currentRowInStripe)) {
+            // current stripe has at least one row group matching the predicate
+            break;
+          }
         } else {
           // advance to next stripe when current stripe has no matching rows
           currentStripe += 1;
@@ -1113,7 +1136,7 @@ namespace orc {
     uint64_t rowsToRead =
       std::min(static_cast<uint64_t>(data.capacity),
                rowsInCurrentStripe - currentRowInStripe);
-    if (sargsApplier) {
+    if (sargsApplier && rowsToRead > 0) {
       rowsToRead = computeBatchSize(rowsToRead,
                                     currentRowInStripe,
                                     rowsInCurrentStripe,
diff --git a/c++/src/Reader.hh b/c++/src/Reader.hh
index 54dbe339b..0facafc44 100644
--- a/c++/src/Reader.hh
+++ b/c++/src/Reader.hh
@@ -66,6 +66,7 @@ namespace orc {
     /// Decimal64 in ORCv2 uses RLE to store values. This flag indicates whether
     /// this new encoding is used.
     bool isDecimalAsLong;
+    std::unique_ptr<proto::Metadata> metadata;
   };
 
   proto::StripeFooter getStripeFooter(const proto::StripeInformation& info,
@@ -254,7 +255,6 @@ namespace orc {
                                std::vector<std::vector<proto::ColumnStatistics> >* indexStats) const;
 
     // metadata
-    mutable std::unique_ptr<proto::Metadata> metadata;
     mutable bool isMetadataLoaded;
    public:
     /**
diff --git a/c++/src/sargs/SargsApplier.cc b/c++/src/sargs/SargsApplier.cc
index f99499e77..b4a736753 100644
--- a/c++/src/sargs/SargsApplier.cc
+++ b/c++/src/sargs/SargsApplier.cc
@@ -46,7 +46,9 @@ namespace orc {
                              , mSearchArgument(searchArgument)
                              , mRowIndexStride(rowIndexStride)
                              , mWriterVersion(writerVersion)
-                             , mStats(0, 0) {
+                             , mStats(0, 0)
+                             , mHasEvaluatedFileStats(false)
+                             , mFileStatsEvalResult(true) {
     const SearchArgumentImpl * sargs =
       dynamic_cast<const SearchArgumentImpl *>(mSearchArgument);
 
@@ -122,4 +124,53 @@ namespace orc {
     return mHasSelected;
   }
 
+  bool SargsApplier::evaluateColumnStatistics(
+                                    const PbColumnStatistics& colStats) const {
+    const SearchArgumentImpl * sargs =
+      dynamic_cast<const SearchArgumentImpl *>(mSearchArgument);
+    if (sargs == nullptr) {
+      throw InvalidArgument("Failed to cast to SearchArgumentImpl");
+    }
+
+    const std::vector<PredicateLeaf>& leaves = sargs->getLeaves();
+    std::vector<TruthValue> leafValues(
+      leaves.size(), TruthValue::YES_NO_NULL);
+
+    for (size_t pred = 0; pred != leaves.size(); ++pred) {
+      uint64_t columnId = mFilterColumns[pred];
+      if (columnId != INVALID_COLUMN_ID &&
+          colStats.size() > static_cast<int>(columnId)) {
+        leafValues[pred] = leaves[pred].evaluate(
+          mWriterVersion, colStats.Get(static_cast<int>(columnId)), nullptr);
+      }
+    }
+
+    return isNeeded(mSearchArgument->evaluate(leafValues));
+  }
+
+  bool SargsApplier::evaluateStripeStatistics(
+                            const proto::StripeStatistics& stripeStats) {
+    if (stripeStats.colstats_size() == 0) {
+      return true;
+    }
+
+    bool ret = evaluateColumnStatistics(stripeStats.colstats());
+    if (!ret) {
+      // reset mRowGroups when the current stripe does not satisfy the PPD
+      mRowGroups.clear();
+    }
+    return ret;
+  }
+
+  bool SargsApplier::evaluateFileStatistics(const proto::Footer& footer) {
+    if (!mHasEvaluatedFileStats) {
+      if (footer.statistics_size() == 0) {
+        mFileStatsEvalResult = true;
+      } else {
+        mFileStatsEvalResult = evaluateColumnStatistics(footer.statistics());
+      }
+      mHasEvaluatedFileStats = true;
+    }
+    return mFileStatsEvalResult;
+  }
 }
diff --git a/c++/src/sargs/SargsApplier.hh b/c++/src/sargs/SargsApplier.hh
index 39650f1fa..1842828d5 100644
--- a/c++/src/sargs/SargsApplier.hh
+++ b/c++/src/sargs/SargsApplier.hh
@@ -37,6 +37,18 @@ namespace orc {
                  uint64_t rowIndexStride,
                  WriterVersion writerVersion);
 
+    /**
+     * Evaluate search argument on file statistics
+     * @return true if file statistics satisfy the sargs
+     */
+    bool evaluateFileStatistics(const proto::Footer& footer);
+
+    /**
+     * Evaluate search argument on stripe statistics
+     * @return true if stripe statistics satisfy the sargs
+     */
+    bool evaluateStripeStatistics(const proto::StripeStatistics& stripeStats);
+
     /**
      * TODO: use proto::RowIndex and proto::BloomFilter to do the evaluation
      * Pick the row groups that we need to load from the current stripe.
@@ -81,6 +93,11 @@ namespace orc {
     }
 
   private:
+    // evaluate column statistics in the form of protobuf::RepeatedPtrField
+    typedef ::google::protobuf::RepeatedPtrField<proto::ColumnStatistics>
+      PbColumnStatistics;
+    bool evaluateColumnStatistics(const PbColumnStatistics& colStats) const;
+
     friend class TestSargsApplier_findColumnTest_Test;
     friend class TestSargsApplier_findArrayColumnTest_Test;
     friend class TestSargsApplier_findMapColumnTest_Test;
@@ -101,6 +118,9 @@ namespace orc {
     bool mHasSkipped;
     // keep stats of selected RGs and evaluated RGs
     std::pair<uint64_t, uint64_t> mStats;
+    // store result of file stats evaluation
+    bool mHasEvaluatedFileStats;
+    bool mFileStatsEvalResult;
   };
 
 }
diff --git a/c++/test/TestPredicatePushdown.cc b/c++/test/TestPredicatePushdown.cc
index 41d0b532f..6bd81f091 100644
--- a/c++/test/TestPredicatePushdown.cc
+++ b/c++/test/TestPredicatePushdown.cc
@@ -238,4 +238,89 @@ namespace orc {
     TestNoRowsSelected(reader.get());
     TestOrPredicates(reader.get());
   }
+
+  void TestNoRowsSelectedWithFileStats(Reader* reader) {
+    std::unique_ptr<SearchArgument> sarg =
+      SearchArgumentFactory::newBuilder()
+        ->startAnd()
+        .lessThan("col1", PredicateDataType::LONG,
+                  Literal(static_cast<int64_t>(0)))
+        .end()
+        .build();
+
+    RowReaderOptions rowReaderOpts;
+    rowReaderOpts.searchArgument(std::move(sarg));
+    auto rowReader = reader->createRowReader(rowReaderOpts);
+
+    auto readBatch = rowReader->createRowBatch(2000);
+    EXPECT_EQ(false, rowReader->next(*readBatch));
+  }
+
+  void TestSelectedWithStripeStats(Reader* reader) {
+    std::unique_ptr<SearchArgument> sarg =
+      SearchArgumentFactory::newBuilder()
+          ->startAnd()
+          .between("col1",
+                   PredicateDataType::LONG,
+                   Literal(static_cast<int64_t>(3500)),
+                   Literal(static_cast<int64_t>(7000)))
+          .end()
+          .build();
+
+    RowReaderOptions rowReaderOpts;
+    rowReaderOpts.searchArgument(std::move(sarg));
+    auto rowReader = reader->createRowReader(rowReaderOpts);
+
+    auto readBatch = rowReader->createRowBatch(2000);
+    EXPECT_EQ(true, rowReader->next(*readBatch));
+    // test previous row number
+    EXPECT_EQ(3500, rowReader->getRowNumber());
+    EXPECT_EQ(2000, readBatch->numElements);
+    auto& batch0 = dynamic_cast<StructVectorBatch&>(*readBatch);
+    auto& batch1 = dynamic_cast<LongVectorBatch&>(*batch0.fields[0]);
+    for (uint64_t i = 0; i < 2000; ++i) {
+      EXPECT_EQ(i + 3500 , batch1.data[i]);
+    }
+  }
+
+  TEST(TestPredicatePushdown, testStripeAndFileStats) {
+    MemoryOutputStream memStream(DEFAULT_MEM_STREAM_SIZE);
+    MemoryPool * pool = getDefaultPool();
+    auto type = std::unique_ptr<Type>(Type::buildTypeFromString(
+      "struct<col1:bigint>"));
+    WriterOptions options;
+    options.setStripeSize(1)
+      .setCompressionBlockSize(1024)
+      .setCompression(CompressionKind_NONE)
+      .setMemoryPool(pool)
+      .setRowIndexStride(1000);
+
+    auto writer = createWriter(*type, &memStream, options);
+    auto batch = writer->createRowBatch(3500);
+    auto& structBatch = dynamic_cast<StructVectorBatch&>(*batch);
+    auto& longBatch = dynamic_cast<LongVectorBatch&>(*structBatch.fields[0]);
+
+    // stripe 1 : 0 <= col1 < 3500
+    // stripe 2 : 3500<= col1 < 7000
+    uint64_t stripeCount = 2;
+    for (uint64_t currentStripe = 0; currentStripe < stripeCount; ++currentStripe) {
+      for (uint64_t i = 0; i < 3500; ++i) {
+        longBatch.data[i] = static_cast<int64_t>(i + currentStripe * 3500);
+      }
+      structBatch.numElements = 3500;
+      longBatch.numElements = 3500;
+      writer->add(*batch);
+    }
+    writer->close();
+    std::unique_ptr<InputStream> inStream(new MemoryInputStream (
+      memStream.getData(), memStream.getLength()));
+    ReaderOptions readerOptions;
+    readerOptions.setMemoryPool(*pool);
+    std::unique_ptr<Reader> reader = createReader(std::move(inStream), readerOptions);
+    EXPECT_EQ(7000, reader->getNumberOfRows());
+    EXPECT_EQ(stripeCount, reader->getNumberOfStripes());
+
+    TestNoRowsSelectedWithFileStats(reader.get());
+    TestSelectedWithStripeStats(reader.get());
+  }
 }  // namespace orc
diff --git a/c++/test/TestSargsApplier.cc b/c++/test/TestSargsApplier.cc
index 151635897..2ec8c3cb8 100644
--- a/c++/test/TestSargsApplier.cc
+++ b/c++/test/TestSargsApplier.cc
@@ -120,4 +120,75 @@ namespace orc {
     EXPECT_EQ(true, rowgroups[3]);
   }
 
+  TEST(TestSargsApplier, testStripeAndFileStats) {
+    auto type = std::unique_ptr<Type>(
+      Type::buildTypeFromString("struct<x:int,y:int>"));
+    auto sarg = SearchArgumentFactory::newBuilder()
+      ->startAnd()
+      .equals(
+              "x",
+              PredicateDataType::LONG,
+              Literal(static_cast<int64_t>(20)))
+      .equals(
+              "y",
+              PredicateDataType::LONG,
+              Literal(static_cast<int64_t>(40)))
+      .end()
+      .build();
+    // Test stripe stats 0 <= x <= 10 and 0 <= y <= 50
+    {
+      orc::proto::StripeStatistics stripeStats;
+      proto::ColumnStatistics structStatistics;
+      structStatistics.set_hasnull(false);
+      *stripeStats.add_colstats() = structStatistics;
+      *stripeStats.add_colstats() = createIntStats(0L, 10L);
+      *stripeStats.add_colstats() = createIntStats(0L, 50L);
+      SargsApplier applier(*type, sarg.get(), 1000, WriterVersion_ORC_135);
+      EXPECT_FALSE(applier.evaluateStripeStatistics(stripeStats));
+    }
+    // Test stripe stats 0 <= x <= 50 and 0 <= y <= 50
+    {
+      orc::proto::StripeStatistics stripeStats;
+      proto::ColumnStatistics structStatistics;
+      structStatistics.set_hasnull(false);
+      *stripeStats.add_colstats() = structStatistics;
+      *stripeStats.add_colstats() = createIntStats(0L, 50L);
+      *stripeStats.add_colstats() = createIntStats(0L, 50L);
+      SargsApplier applier(*type, sarg.get(), 1000, WriterVersion_ORC_135);
+      EXPECT_TRUE(applier.evaluateStripeStatistics(stripeStats));
+    }
+    // Test file stats 0 <= x <= 10 and 0 <= y <= 50
+    {
+      orc::proto::Footer footer;
+      proto::ColumnStatistics structStatistics;
+      structStatistics.set_hasnull(false);
+      *footer.add_statistics() = structStatistics;
+      *footer.add_statistics() = createIntStats(0L, 10L);
+      *footer.add_statistics() = createIntStats(0L, 50L);
+      SargsApplier applier(*type, sarg.get(), 1000, WriterVersion_ORC_135);
+      EXPECT_FALSE(applier.evaluateFileStatistics(footer));
+    }
+    // Test file stats 0 <= x <= 50 and 0 <= y <= 30
+    {
+      orc::proto::Footer footer;
+      proto::ColumnStatistics structStatistics;
+      structStatistics.set_hasnull(false);
+      *footer.add_statistics() = structStatistics;
+      *footer.add_statistics() = createIntStats(0L, 50L);
+      *footer.add_statistics() = createIntStats(0L, 30L);
+      SargsApplier applier(*type, sarg.get(), 1000, WriterVersion_ORC_135);
+      EXPECT_FALSE(applier.evaluateFileStatistics(footer));
+    }
+    // Test file stats 0 <= x <= 50 and 0 <= y <= 50
+    {
+      orc::proto::Footer footer;
+      proto::ColumnStatistics structStatistics;
+      structStatistics.set_hasnull(false);
+      *footer.add_statistics() = structStatistics;
+      *footer.add_statistics() = createIntStats(0L, 50L);
+      *footer.add_statistics() = createIntStats(0L, 50L);
+      SargsApplier applier(*type, sarg.get(), 1000, WriterVersion_ORC_135);
+      EXPECT_TRUE(applier.evaluateFileStatistics(footer));
+    }
+  }
 }  // namespace orc