You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@orc.apache.org by GitBox <gi...@apache.org> on 2021/03/03 12:06:43 UTC

[GitHub] [orc] pgaref commented on a change in pull request #648: ORC-751: [C++] Implement Predicate Pushdown for C++ Reader

pgaref commented on a change in pull request #648:
URL: https://github.com/apache/orc/pull/648#discussion_r586336023



##########
File path: c++/include/orc/Reader.hh
##########
@@ -538,6 +549,12 @@ namespace orc {
      */
     virtual void seekToRow(uint64_t rowNumber) = 0;
 
+    /**
+     * If PPD is enabled, returns true and store number of selected RGs and
+     * number of evaluated RGs into the stats pair; otherwise returns false.
+     */
+    virtual bool getPPDStats(std::pair<uint64_t, uint64_t>& stats) const = 0;

Review comment:
       I like this -- just wondering if we should do it in a follow up and create a Java equivalent?
   Idealy one would like a complete set of stats like: total vs read Stripes, total vs read Row groups and total vs read Rows/bytes.

##########
File path: c++/src/Reader.cc
##########
@@ -68,6 +68,12 @@ namespace orc {
       return columnPath.substr(0, columnPath.length() - 1);
   }
 
+  WriterVersion getWriterVersionImpl(const FileContents * contents) {
+    if (!contents->postscript->has_writerversion()) {
+      return WriterVersion_ORIGINAL;

Review comment:
       Just a reminder to check why this is not FUTURE as in Java equivalent

##########
File path: c++/src/Reader.cc
##########
@@ -293,33 +308,43 @@ namespace orc {
     previousRow = rowNumber;
     startNextStripe();
 
-    uint64_t rowsToSkip = currentRowInStripe;
+    // when predicate push down is enabled, above call to startNextStripe()
+    // will move current row to 1st matching row group; here we only need
+    // to deal with the case when PPD is not enabled.
+    if (!sargsApplier) {

Review comment:
       Maybe move this part on the startNextStripe() method?

##########
File path: c++/src/Reader.cc
##########
@@ -892,29 +939,67 @@ namespace orc {
 
   void RowReaderImpl::startNextStripe() {
     reader.reset(); // ColumnReaders use lots of memory; free old memory first
-    currentStripeInfo = footer->stripes(static_cast<int>(currentStripe));
-    uint64_t fileLength = contents->stream->getLength();
-    if (currentStripeInfo.offset() + currentStripeInfo.indexlength() +
+    rowIndexes.clear();
+    bloomFilterIndex.clear();
+
+    do {
+      currentStripeInfo = footer->stripes(static_cast<int>(currentStripe));
+      uint64_t fileLength = contents->stream->getLength();
+      if (currentStripeInfo.offset() + currentStripeInfo.indexlength() +
         currentStripeInfo.datalength() + currentStripeInfo.footerlength() >= fileLength) {
-      std::stringstream msg;
-      msg << "Malformed StripeInformation at stripe index " << currentStripe << ": fileLength="
-          << fileLength << ", StripeInfo=(offset=" << currentStripeInfo.offset() << ", indexLength="
-          << currentStripeInfo.indexlength() << ", dataLength=" << currentStripeInfo.datalength()
-          << ", footerLength=" << currentStripeInfo.footerlength() << ")";
-      throw ParseError(msg.str());
+        std::stringstream msg;
+        msg << "Malformed StripeInformation at stripe index " << currentStripe << ": fileLength="
+            << fileLength << ", StripeInfo=(offset=" << currentStripeInfo.offset() << ", indexLength="
+            << currentStripeInfo.indexlength() << ", dataLength=" << currentStripeInfo.datalength()
+            << ", footerLength=" << currentStripeInfo.footerlength() << ")";
+        throw ParseError(msg.str());
+      }
+      currentStripeFooter = getStripeFooter(currentStripeInfo, *contents.get());
+      rowsInCurrentStripe = currentStripeInfo.numberofrows();
+
+      if (sargsApplier) {
+        // read row group statistics and bloom filters of current stripe
+        loadStripeIndex();
+
+        // select row groups to read in the current stripe
+        sargsApplier->pickRowGroups(rowsInCurrentStripe,
+                                    rowIndexes,
+                                    bloomFilterIndex);
+        if (sargsApplier->hasSelectedFrom(currentRowInStripe)) {
+          // current stripe has at least one row group matching the predicate
+          break;
+        } else {
+          // advance to next stripe when current stripe has no matching rows
+          currentStripe += 1;
+          currentRowInStripe = 0;
+        }
+      }
+    } while (sargsApplier && currentStripe < lastStripe);
+
+    if (currentStripe < lastStripe) {
+      const Timezone& writerTimezone =

Review comment:
       Lets add a but more doc here

##########
File path: c++/src/Reader.cc
##########
@@ -944,13 +1042,67 @@ namespace orc {
     // update row number
     previousRow = firstRowOfStripe[currentStripe] + currentRowInStripe;
     currentRowInStripe += rowsToRead;
+
+    // check if we need to advance to next selected row group
+    if (sargsApplier) {
+      uint64_t nextRowToRead = advanceToNextRowGroup(currentRowInStripe,
+                                                     rowsInCurrentStripe,
+                                                     footer->rowindexstride(),
+                                                     sargsApplier->getRowGroups());
+      if (currentRowInStripe != nextRowToRead) {
+        // it is guaranteed to be at start of a row group
+        currentRowInStripe = nextRowToRead;
+        if (currentRowInStripe < rowsInCurrentStripe) {
+          seekToRowGroup(static_cast<uint32_t>(currentRowInStripe / footer->rowindexstride()));
+        }
+      }
+    }
+
     if (currentRowInStripe >= rowsInCurrentStripe) {
       currentStripe += 1;
       currentRowInStripe = 0;
     }
     return rowsToRead != 0;
   }
 
+  uint64_t RowReaderImpl::computeBatchSize(uint64_t requestedSize,
+                                           uint64_t currentRowInStripe,
+                                           uint64_t rowsInCurrentStripe,
+                                           uint64_t rowIndexStride,
+                                           const std::vector<bool>& includedRowGroups) {
+    uint64_t endRowInStripe = rowsInCurrentStripe;

Review comment:
       I would definitely copy the marker explanation comment here with JIRA  from the Java side -- would also try to follow a similar logic: if (ncludedRowGroups.empty()) batchsize = else ...
   Adding a Debug log would also make sense

##########
File path: c++/src/sargs/SargsApplier.cc
##########
@@ -25,7 +26,13 @@ namespace orc {
                                     const std::string& colName) {
     for (uint64_t i = 0; i != type.getSubtypeCount(); ++i) {
       if (type.getFieldName(i) == colName) {
-        return type.getSubtype(i)->getColumnId();
+        if (type.getKind() == CHAR || type.getKind() == VARCHAR) {

Review comment:
       Shall we leave this to the [consumer](https://github.com/apache/orc/blob/295ea5be6c2ae643bb3c3ca6db7b018aeb51bebc/java/core/src/java/org/apache/orc/impl/SchemaEvolution.java#L378)? We dont have anything similar for the Java side.
   We probably should address this as part of ORC-612




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org