You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@orc.apache.org by GitBox <gi...@apache.org> on 2021/03/08 15:19:13 UTC

[GitHub] [orc] wgtmac commented on a change in pull request #648: ORC-751: [C++] Implement Predicate Pushdown for C++ Reader

wgtmac commented on a change in pull request #648:
URL: https://github.com/apache/orc/pull/648#discussion_r589472878



##########
File path: c++/include/orc/Reader.hh
##########
@@ -538,6 +549,12 @@ namespace orc {
      */
     virtual void seekToRow(uint64_t rowNumber) = 0;
 
+    /**
+     * If PPD is enabled, returns true and store number of selected RGs and
+     * number of evaluated RGs into the stats pair; otherwise returns false.
+     */
+    virtual bool getPPDStats(std::pair<uint64_t, uint64_t>& stats) const = 0;

Review comment:
       That sounds good. We should think carefully what stats are required and what kind of API to provide.

##########
File path: c++/src/Reader.cc
##########
@@ -68,6 +68,12 @@ namespace orc {
       return columnPath.substr(0, columnPath.length() - 1);
   }
 
+  WriterVersion getWriterVersionImpl(const FileContents * contents) {
+    if (!contents->postscript->has_writerversion()) {
+      return WriterVersion_ORIGINAL;

Review comment:
       Sure.

##########
File path: c++/src/Reader.cc
##########
@@ -944,13 +1042,67 @@ namespace orc {
     // update row number
     previousRow = firstRowOfStripe[currentStripe] + currentRowInStripe;
     currentRowInStripe += rowsToRead;
+
+    // check if we need to advance to next selected row group
+    if (sargsApplier) {
+      uint64_t nextRowToRead = advanceToNextRowGroup(currentRowInStripe,
+                                                     rowsInCurrentStripe,
+                                                     footer->rowindexstride(),
+                                                     sargsApplier->getRowGroups());
+      if (currentRowInStripe != nextRowToRead) {
+        // it is guaranteed to be at start of a row group
+        currentRowInStripe = nextRowToRead;
+        if (currentRowInStripe < rowsInCurrentStripe) {
+          seekToRowGroup(static_cast<uint32_t>(currentRowInStripe / footer->rowindexstride()));
+        }
+      }
+    }
+
     if (currentRowInStripe >= rowsInCurrentStripe) {
       currentStripe += 1;
       currentRowInStripe = 0;
     }
     return rowsToRead != 0;
   }
 
+  uint64_t RowReaderImpl::computeBatchSize(uint64_t requestedSize,
+                                           uint64_t currentRowInStripe,
+                                           uint64_t rowsInCurrentStripe,
+                                           uint64_t rowIndexStride,
+                                           const std::vector<bool>& includedRowGroups) {
+    uint64_t endRowInStripe = rowsInCurrentStripe;

Review comment:
       Added a comment here. The logic is slightily different with the java side so I'd prefer to keep the logic here.

##########
File path: c++/src/sargs/SargsApplier.cc
##########
@@ -25,7 +26,13 @@ namespace orc {
                                     const std::string& colName) {
     for (uint64_t i = 0; i != type.getSubtypeCount(); ++i) {
       if (type.getFieldName(i) == colName) {
-        return type.getSubtype(i)->getColumnId();
+        if (type.getKind() == CHAR || type.getKind() == VARCHAR) {

Review comment:
       Sure

##########
File path: c++/src/Reader.cc
##########
@@ -293,33 +308,43 @@ namespace orc {
     previousRow = rowNumber;
     startNextStripe();
 
-    uint64_t rowsToSkip = currentRowInStripe;
+    // when predicate push down is enabled, above call to startNextStripe()
+    // will move current row to 1st matching row group; here we only need
+    // to deal with the case when PPD is not enabled.
+    if (!sargsApplier) {

Review comment:
       startNextStripe() simply does its job to start a next stripe. this logic better stays here.

##########
File path: c++/src/Reader.cc
##########
@@ -892,29 +939,67 @@ namespace orc {
 
   void RowReaderImpl::startNextStripe() {
     reader.reset(); // ColumnReaders use lots of memory; free old memory first
-    currentStripeInfo = footer->stripes(static_cast<int>(currentStripe));
-    uint64_t fileLength = contents->stream->getLength();
-    if (currentStripeInfo.offset() + currentStripeInfo.indexlength() +
+    rowIndexes.clear();
+    bloomFilterIndex.clear();
+
+    do {
+      currentStripeInfo = footer->stripes(static_cast<int>(currentStripe));
+      uint64_t fileLength = contents->stream->getLength();
+      if (currentStripeInfo.offset() + currentStripeInfo.indexlength() +
         currentStripeInfo.datalength() + currentStripeInfo.footerlength() >= fileLength) {
-      std::stringstream msg;
-      msg << "Malformed StripeInformation at stripe index " << currentStripe << ": fileLength="
-          << fileLength << ", StripeInfo=(offset=" << currentStripeInfo.offset() << ", indexLength="
-          << currentStripeInfo.indexlength() << ", dataLength=" << currentStripeInfo.datalength()
-          << ", footerLength=" << currentStripeInfo.footerlength() << ")";
-      throw ParseError(msg.str());
+        std::stringstream msg;
+        msg << "Malformed StripeInformation at stripe index " << currentStripe << ": fileLength="
+            << fileLength << ", StripeInfo=(offset=" << currentStripeInfo.offset() << ", indexLength="
+            << currentStripeInfo.indexlength() << ", dataLength=" << currentStripeInfo.datalength()
+            << ", footerLength=" << currentStripeInfo.footerlength() << ")";
+        throw ParseError(msg.str());
+      }
+      currentStripeFooter = getStripeFooter(currentStripeInfo, *contents.get());
+      rowsInCurrentStripe = currentStripeInfo.numberofrows();
+
+      if (sargsApplier) {
+        // read row group statistics and bloom filters of current stripe
+        loadStripeIndex();
+
+        // select row groups to read in the current stripe
+        sargsApplier->pickRowGroups(rowsInCurrentStripe,
+                                    rowIndexes,
+                                    bloomFilterIndex);
+        if (sargsApplier->hasSelectedFrom(currentRowInStripe)) {
+          // current stripe has at least one row group matching the predicate
+          break;
+        } else {
+          // advance to next stripe when current stripe has no matching rows
+          currentStripe += 1;
+          currentRowInStripe = 0;
+        }
+      }
+    } while (sargsApplier && currentStripe < lastStripe);
+
+    if (currentStripe < lastStripe) {
+      const Timezone& writerTimezone =

Review comment:
       Done

##########
File path: c++/src/Reader.hh
##########
@@ -142,6 +143,30 @@ namespace orc {
 
     // row index of current stripe with column id as the key
     std::unordered_map<uint64_t, proto::RowIndex> rowIndexes;
+    std::map<uint32_t, BloomFilterIndex> bloomFilterIndex;

Review comment:
       it is used to hold bloom filter index in Reader.cc (e.g. line 378)




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org