You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@orc.apache.org by om...@apache.org on 2019/07/03 16:26:52 UTC

[orc] branch master updated: ORC-513: [C++] Improve RowReaderImpl::seekToRow performance

This is an automated email from the ASF dual-hosted git repository.

omalley pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/orc.git


The following commit(s) were added to refs/heads/master by this push:
     new c1fa2c1  ORC-513: [C++] Improve RowReaderImpl::seekToRow performance
c1fa2c1 is described below

commit c1fa2c13e2d5155edc0ff520f498575cc8fba404
Author: Gang Wu <ga...@alibaba-inc.com>
AuthorDate: Sat Jun 15 15:28:39 2019 +0800

    ORC-513: [C++] Improve RowReaderImpl::seekToRow performance
    
    Automatically levarage row group index to seek. Optimized for selected
    columns only.
    
    Fixes #401
---
 c++/src/ColumnReader.cc | 137 ++++++++++++++++++++++++++++++++++++++++++++++++
 c++/src/ColumnReader.hh |   7 +++
 c++/src/Reader.cc       |  70 ++++++++++++++++++++++++-
 c++/src/Reader.hh       |   9 ++++
 4 files changed, 222 insertions(+), 1 deletion(-)

diff --git a/c++/src/ColumnReader.cc b/c++/src/ColumnReader.cc
index 573ec89..ab526a5 100644
--- a/c++/src/ColumnReader.cc
+++ b/c++/src/ColumnReader.cc
@@ -114,6 +114,13 @@ namespace orc {
     rowBatch.hasNulls = false;
   }
 
+  void ColumnReader::seekToRowGroup(
+    std::unordered_map<uint64_t, PositionProvider>& positions) {
+    if (notNullDecoder.get()) {
+      notNullDecoder->seek(positions.at(columnId));
+    }
+  }
+
   /**
    * Expand an array of bytes in place to the corresponding array of longs.
    * Has to work backwards so that they data isn't clobbered during the
@@ -141,6 +148,9 @@ namespace orc {
     void next(ColumnVectorBatch& rowBatch,
               uint64_t numValues,
               char* notNull) override;
+
+    void seekToRowGroup(
+      std::unordered_map<uint64_t, PositionProvider>& positions) override;
   };
 
   BooleanColumnReader::BooleanColumnReader(const Type& type,
@@ -175,6 +185,12 @@ namespace orc {
     expandBytesToLongs(ptr, numValues);
   }
 
+  void BooleanColumnReader::seekToRowGroup(
+    std::unordered_map<uint64_t, PositionProvider>& positions) {
+    ColumnReader::seekToRowGroup(positions);
+    rle->seek(positions.at(columnId));
+  }
+
   class ByteColumnReader: public ColumnReader {
   private:
     std::unique_ptr<orc::ByteRleDecoder> rle;
@@ -188,6 +204,9 @@ namespace orc {
     void next(ColumnVectorBatch& rowBatch,
               uint64_t numValues,
               char* notNull) override;
+
+    void seekToRowGroup(
+      std::unordered_map<uint64_t, PositionProvider>& positions) override;
   };
 
   ByteColumnReader::ByteColumnReader(const Type& type,
@@ -222,6 +241,12 @@ namespace orc {
     expandBytesToLongs(ptr, numValues);
   }
 
+  void ByteColumnReader::seekToRowGroup(
+    std::unordered_map<uint64_t, PositionProvider>& positions) {
+    ColumnReader::seekToRowGroup(positions);
+    rle->seek(positions.at(columnId));
+  }
+
   class IntegerColumnReader: public ColumnReader {
   protected:
     std::unique_ptr<orc::RleDecoder> rle;
@@ -235,6 +260,9 @@ namespace orc {
     void next(ColumnVectorBatch& rowBatch,
               uint64_t numValues,
               char* notNull) override;
+
+    void seekToRowGroup(
+      std::unordered_map<uint64_t, PositionProvider>& positions) override;
   };
 
   IntegerColumnReader::IntegerColumnReader(const Type& type,
@@ -266,6 +294,12 @@ namespace orc {
               numValues, rowBatch.hasNulls ? rowBatch.notNull.data() : nullptr);
   }
 
+  void IntegerColumnReader::seekToRowGroup(
+    std::unordered_map<uint64_t, PositionProvider>& positions) {
+    ColumnReader::seekToRowGroup(positions);
+    rle->seek(positions.at(columnId));
+  }
+
   class TimestampColumnReader: public ColumnReader {
   private:
     std::unique_ptr<orc::RleDecoder> secondsRle;
@@ -282,6 +316,9 @@ namespace orc {
     void next(ColumnVectorBatch& rowBatch,
               uint64_t numValues,
               char* notNull) override;
+
+    void seekToRowGroup(
+      std::unordered_map<uint64_t, PositionProvider>& positions) override;
   };
 
 
@@ -344,6 +381,13 @@ namespace orc {
     }
   }
 
+  void TimestampColumnReader::seekToRowGroup(
+    std::unordered_map<uint64_t, PositionProvider>& positions) {
+    ColumnReader::seekToRowGroup(positions);
+    secondsRle->seek(positions.at(columnId));
+    nanoRle->seek(positions.at(columnId));
+  }
+
   class DoubleColumnReader: public ColumnReader {
   public:
     DoubleColumnReader(const Type& type, StripeStreams& stripe);
@@ -355,6 +399,9 @@ namespace orc {
               uint64_t numValues,
               char* notNull) override;
 
+    void seekToRowGroup(
+      std::unordered_map<uint64_t, PositionProvider>& positions) override;
+
   private:
     std::unique_ptr<SeekableInputStream> inputStream;
     TypeKind columnKind;
@@ -483,6 +530,12 @@ namespace orc {
     }
   }
 
+  void DoubleColumnReader::seekToRowGroup(
+    std::unordered_map<uint64_t, PositionProvider>& positions) {
+    ColumnReader::seekToRowGroup(positions);
+    inputStream->seek(positions.at(columnId));
+  }
+
   class StringDictionaryColumnReader: public ColumnReader {
   private:
     std::shared_ptr<StringDictionary> dictionary;
@@ -501,6 +554,9 @@ namespace orc {
     void nextEncoded(ColumnVectorBatch& rowBatch,
                       uint64_t numValues,
                       char* notNull) override;
+
+    void seekToRowGroup(
+      std::unordered_map<uint64_t, PositionProvider>& positions) override;
   };
 
   StringDictionaryColumnReader::StringDictionaryColumnReader
@@ -599,6 +655,13 @@ namespace orc {
     rle->next(batch.index.data(), numValues, notNull);
   }
 
+  void StringDictionaryColumnReader::seekToRowGroup(
+    std::unordered_map<uint64_t, PositionProvider>& positions) {
+    ColumnReader::seekToRowGroup(positions);
+    rle->seek(positions.at(columnId));
+  }
+
+
   class StringDirectColumnReader: public ColumnReader {
   private:
     std::unique_ptr<RleDecoder> lengthRle;
@@ -625,6 +688,9 @@ namespace orc {
     void next(ColumnVectorBatch& rowBatch,
               uint64_t numValues,
               char *notNull) override;
+
+    void seekToRowGroup(
+      std::unordered_map<uint64_t, PositionProvider>& positions) override;
   };
 
   StringDirectColumnReader::StringDirectColumnReader
@@ -760,6 +826,13 @@ namespace orc {
     }
   }
 
+  void StringDirectColumnReader::seekToRowGroup(
+    std::unordered_map<uint64_t, PositionProvider>& positions) {
+    ColumnReader::seekToRowGroup(positions);
+    blobStream->seek(positions.at(columnId));
+    lengthRle->seek(positions.at(columnId));
+  }
+
   class StructColumnReader: public ColumnReader {
   private:
     std::vector<ColumnReader*> children;
@@ -778,6 +851,9 @@ namespace orc {
               uint64_t numValues,
               char *notNull) override;
 
+    void seekToRowGroup(
+      std::unordered_map<uint64_t, PositionProvider>& positions) override;
+
   private:
     template<bool encoded>
     void nextInternal(ColumnVectorBatch& rowBatch,
@@ -852,6 +928,16 @@ namespace orc {
     }
   }
 
+  void StructColumnReader::seekToRowGroup(
+    std::unordered_map<uint64_t, PositionProvider>& positions) {
+    ColumnReader::seekToRowGroup(positions);
+
+    for(std::vector<ColumnReader*>::iterator ptr = children.begin();
+        ptr != children.end();
+        ++ptr) {
+      (*ptr)->seekToRowGroup(positions);
+    }
+  }
 
   class ListColumnReader: public ColumnReader {
   private:
@@ -872,6 +958,9 @@ namespace orc {
               uint64_t numValues,
               char *notNull) override;
 
+    void seekToRowGroup(
+      std::unordered_map<uint64_t, PositionProvider>& positions) override;
+
   private:
     template<bool encoded>
     void nextInternal(ColumnVectorBatch& rowBatch,
@@ -973,6 +1062,15 @@ namespace orc {
     }
   }
 
+  void ListColumnReader::seekToRowGroup(
+    std::unordered_map<uint64_t, PositionProvider>& positions) {
+    ColumnReader::seekToRowGroup(positions);
+    rle->seek(positions.at(columnId));
+    if (child.get()) {
+      child->seekToRowGroup(positions);
+    }
+  }
+
   class MapColumnReader: public ColumnReader {
   private:
     std::unique_ptr<ColumnReader> keyReader;
@@ -993,6 +1091,9 @@ namespace orc {
                      uint64_t numValues,
                      char *notNull) override;
 
+    void seekToRowGroup(
+      std::unordered_map<uint64_t, PositionProvider>& positions) override;
+
   private:
     template<bool encoded>
     void nextInternal(ColumnVectorBatch& rowBatch,
@@ -1114,6 +1215,18 @@ namespace orc {
     }
   }
 
+  void MapColumnReader::seekToRowGroup(
+    std::unordered_map<uint64_t, PositionProvider>& positions) {
+    ColumnReader::seekToRowGroup(positions);
+    rle->seek(positions.at(columnId));
+    if (keyReader.get()) {
+      keyReader->seekToRowGroup(positions);
+    }
+    if (elementReader.get()) {
+      elementReader->seekToRowGroup(positions);
+    }
+  }
+
   class UnionColumnReader: public ColumnReader {
   private:
     std::unique_ptr<ByteRleDecoder> rle;
@@ -1135,6 +1248,9 @@ namespace orc {
                      uint64_t numValues,
                      char *notNull) override;
 
+    void seekToRowGroup(
+      std::unordered_map<uint64_t, PositionProvider>& positions) override;
+
   private:
     template<bool encoded>
     void nextInternal(ColumnVectorBatch& rowBatch,
@@ -1246,6 +1362,17 @@ namespace orc {
     }
   }
 
+  void UnionColumnReader::seekToRowGroup(
+    std::unordered_map<uint64_t, PositionProvider>& positions) {
+    ColumnReader::seekToRowGroup(positions);
+    rle->seek(positions.at(columnId));
+    for(size_t i = 0; i < numChildren; ++i) {
+      if (childrenReader[i] != nullptr) {
+        childrenReader[i]->seekToRowGroup(positions);
+      }
+    }
+  }
+
   /**
    * Destructively convert the number from zigzag encoding to the
    * natural signed representation.
@@ -1322,6 +1449,9 @@ namespace orc {
     void next(ColumnVectorBatch& rowBatch,
               uint64_t numValues,
               char *notNull) override;
+
+    void seekToRowGroup(
+      std::unordered_map<uint64_t, PositionProvider>& positions) override;
   };
   const uint32_t Decimal64ColumnReader::MAX_PRECISION_64;
   const uint32_t Decimal64ColumnReader::MAX_PRECISION_128;
@@ -1429,6 +1559,13 @@ namespace orc {
     }
   }
 
+  void Decimal64ColumnReader::seekToRowGroup(
+    std::unordered_map<uint64_t, PositionProvider>& positions) {
+    ColumnReader::seekToRowGroup(positions);
+    valueStream->seek(positions.at(columnId));
+    scaleDecoder->seek(positions.at(columnId));
+  }
+
   class Decimal128ColumnReader: public Decimal64ColumnReader {
   public:
     Decimal128ColumnReader(const Type& type, StripeStreams& stipe);
diff --git a/c++/src/ColumnReader.hh b/c++/src/ColumnReader.hh
index ca65872..d3c810e 100644
--- a/c++/src/ColumnReader.hh
+++ b/c++/src/ColumnReader.hh
@@ -135,6 +135,13 @@ namespace orc {
       next(rowBatch, numValues, notNull);
     }
 
+    /**
+     * Seek to beginning of a row group in the current stripe
+     * @param positions a list of PositionProviders storing the positions
+     */
+    virtual void seekToRowGroup(
+      std::unordered_map<uint64_t, PositionProvider>& positions);
+
   };
 
   /**
diff --git a/c++/src/Reader.cc b/c++/src/Reader.cc
index c5627a7..a814fbc 100644
--- a/c++/src/Reader.cc
+++ b/c++/src/Reader.cc
@@ -292,7 +292,75 @@ namespace orc {
     currentRowInStripe = rowNumber - firstRowOfStripe[currentStripe];
     previousRow = rowNumber;
     startNextStripe();
-    reader->skip(currentRowInStripe);
+
+    uint64_t rowsToSkip = currentRowInStripe;
+
+    if (footer->rowindexstride() > 0 &&
+        currentStripeInfo.indexlength() > 0) {
+      uint32_t rowGroupId =
+        static_cast<uint32_t>(currentRowInStripe / footer->rowindexstride());
+      rowsToSkip -= rowGroupId * footer->rowindexstride();
+
+      if (rowGroupId != 0) {
+        seekToRowGroup(rowGroupId);
+      }
+    }
+
+    reader->skip(rowsToSkip);
+  }
+
+  void RowReaderImpl::seekToRowGroup(uint32_t rowGroupEntryId) {
+    // reset all previous row indexes
+    rowIndexes.clear();
+
+    // obtain row indexes for selected columns
+    uint64_t offset = currentStripeInfo.offset();
+    for (int i = 0; i < currentStripeFooter.streams_size(); ++i) {
+      const proto::Stream& pbStream = currentStripeFooter.streams(i);
+      uint64_t colId = pbStream.column();
+      if (selectedColumns[colId] && pbStream.has_kind()
+          && pbStream.kind() == proto::Stream_Kind_ROW_INDEX) {
+        std::unique_ptr<SeekableInputStream> inStream =
+          createDecompressor(getCompression(),
+                             std::unique_ptr<SeekableInputStream>
+                               (new SeekableFileInputStream
+                                  (contents->stream.get(),
+                                   offset,
+                                   pbStream.length(),
+                                   *contents->pool)),
+                             getCompressionSize(),
+                             *contents->pool);
+
+        proto::RowIndex rowIndex;
+        if (!rowIndex.ParseFromZeroCopyStream(inStream.get())) {
+          throw ParseError("Failed to parse the row index");
+        }
+
+        rowIndexes[colId] = rowIndex;
+      }
+      offset += pbStream.length();
+    }
+
+    // store positions for selected columns
+    std::vector<std::list<uint64_t>> positions;
+    // store position providers for selected colimns
+    std::unordered_map<uint64_t, PositionProvider> positionProviders;
+
+    for (const auto& rowIndex : rowIndexes) {
+      uint64_t colId = rowIndex.first;
+      const proto::RowIndexEntry& entry =
+        rowIndex.second.entry(static_cast<int32_t>(rowGroupEntryId));
+
+      // copy index positions for a specific column
+      positions.push_back({});
+      auto& position = positions.back();
+      for (int pos = 0; pos != entry.positions_size(); ++pos) {
+        position.push_back(entry.positions(pos));
+      }
+      positionProviders.emplace(std::make_pair(colId, PositionProvider(position)));
+    }
+
+    reader->seekToRowGroup(positionProviders);
   }
 
   const FileContents& RowReaderImpl::getFileContents() const {
diff --git a/c++/src/Reader.hh b/c++/src/Reader.hh
index 75eb0bb..a381956 100644
--- a/c++/src/Reader.hh
+++ b/c++/src/Reader.hh
@@ -140,6 +140,15 @@ namespace orc {
     // internal methods
     void startNextStripe();
 
+    // row index of current stripe with column id as the key
+    std::unordered_map<uint64_t, proto::RowIndex> rowIndexes;
+
+    /**
+     * Seek to the start of a row group in the current stripe
+     * @param rowGroupEntryId the row group id to seek to
+     */
+    void seekToRowGroup(uint32_t rowGroupEntryId);
+
   public:
    /**
     * Constructor that lets the user specify additional options.