You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@orc.apache.org by om...@apache.org on 2019/07/03 16:26:52 UTC
[orc] branch master updated: ORC-513: [C++] Improve
RowReaderImpl::seekToRow performance
This is an automated email from the ASF dual-hosted git repository.
omalley pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/orc.git
The following commit(s) were added to refs/heads/master by this push:
new c1fa2c1 ORC-513: [C++] Improve RowReaderImpl::seekToRow performance
c1fa2c1 is described below
commit c1fa2c13e2d5155edc0ff520f498575cc8fba404
Author: Gang Wu <ga...@alibaba-inc.com>
AuthorDate: Sat Jun 15 15:28:39 2019 +0800
ORC-513: [C++] Improve RowReaderImpl::seekToRow performance
Automatically levarage row group index to seek. Optimized for selected
columns only.
Fixes #401
---
c++/src/ColumnReader.cc | 137 ++++++++++++++++++++++++++++++++++++++++++++++++
c++/src/ColumnReader.hh | 7 +++
c++/src/Reader.cc | 70 ++++++++++++++++++++++++-
c++/src/Reader.hh | 9 ++++
4 files changed, 222 insertions(+), 1 deletion(-)
diff --git a/c++/src/ColumnReader.cc b/c++/src/ColumnReader.cc
index 573ec89..ab526a5 100644
--- a/c++/src/ColumnReader.cc
+++ b/c++/src/ColumnReader.cc
@@ -114,6 +114,13 @@ namespace orc {
rowBatch.hasNulls = false;
}
+ void ColumnReader::seekToRowGroup(
+ std::unordered_map<uint64_t, PositionProvider>& positions) {
+ if (notNullDecoder.get()) {
+ notNullDecoder->seek(positions.at(columnId));
+ }
+ }
+
/**
* Expand an array of bytes in place to the corresponding array of longs.
* Has to work backwards so that they data isn't clobbered during the
@@ -141,6 +148,9 @@ namespace orc {
void next(ColumnVectorBatch& rowBatch,
uint64_t numValues,
char* notNull) override;
+
+ void seekToRowGroup(
+ std::unordered_map<uint64_t, PositionProvider>& positions) override;
};
BooleanColumnReader::BooleanColumnReader(const Type& type,
@@ -175,6 +185,12 @@ namespace orc {
expandBytesToLongs(ptr, numValues);
}
+ void BooleanColumnReader::seekToRowGroup(
+ std::unordered_map<uint64_t, PositionProvider>& positions) {
+ ColumnReader::seekToRowGroup(positions);
+ rle->seek(positions.at(columnId));
+ }
+
class ByteColumnReader: public ColumnReader {
private:
std::unique_ptr<orc::ByteRleDecoder> rle;
@@ -188,6 +204,9 @@ namespace orc {
void next(ColumnVectorBatch& rowBatch,
uint64_t numValues,
char* notNull) override;
+
+ void seekToRowGroup(
+ std::unordered_map<uint64_t, PositionProvider>& positions) override;
};
ByteColumnReader::ByteColumnReader(const Type& type,
@@ -222,6 +241,12 @@ namespace orc {
expandBytesToLongs(ptr, numValues);
}
+ void ByteColumnReader::seekToRowGroup(
+ std::unordered_map<uint64_t, PositionProvider>& positions) {
+ ColumnReader::seekToRowGroup(positions);
+ rle->seek(positions.at(columnId));
+ }
+
class IntegerColumnReader: public ColumnReader {
protected:
std::unique_ptr<orc::RleDecoder> rle;
@@ -235,6 +260,9 @@ namespace orc {
void next(ColumnVectorBatch& rowBatch,
uint64_t numValues,
char* notNull) override;
+
+ void seekToRowGroup(
+ std::unordered_map<uint64_t, PositionProvider>& positions) override;
};
IntegerColumnReader::IntegerColumnReader(const Type& type,
@@ -266,6 +294,12 @@ namespace orc {
numValues, rowBatch.hasNulls ? rowBatch.notNull.data() : nullptr);
}
+ void IntegerColumnReader::seekToRowGroup(
+ std::unordered_map<uint64_t, PositionProvider>& positions) {
+ ColumnReader::seekToRowGroup(positions);
+ rle->seek(positions.at(columnId));
+ }
+
class TimestampColumnReader: public ColumnReader {
private:
std::unique_ptr<orc::RleDecoder> secondsRle;
@@ -282,6 +316,9 @@ namespace orc {
void next(ColumnVectorBatch& rowBatch,
uint64_t numValues,
char* notNull) override;
+
+ void seekToRowGroup(
+ std::unordered_map<uint64_t, PositionProvider>& positions) override;
};
@@ -344,6 +381,13 @@ namespace orc {
}
}
+ void TimestampColumnReader::seekToRowGroup(
+ std::unordered_map<uint64_t, PositionProvider>& positions) {
+ ColumnReader::seekToRowGroup(positions);
+ secondsRle->seek(positions.at(columnId));
+ nanoRle->seek(positions.at(columnId));
+ }
+
class DoubleColumnReader: public ColumnReader {
public:
DoubleColumnReader(const Type& type, StripeStreams& stripe);
@@ -355,6 +399,9 @@ namespace orc {
uint64_t numValues,
char* notNull) override;
+ void seekToRowGroup(
+ std::unordered_map<uint64_t, PositionProvider>& positions) override;
+
private:
std::unique_ptr<SeekableInputStream> inputStream;
TypeKind columnKind;
@@ -483,6 +530,12 @@ namespace orc {
}
}
+ void DoubleColumnReader::seekToRowGroup(
+ std::unordered_map<uint64_t, PositionProvider>& positions) {
+ ColumnReader::seekToRowGroup(positions);
+ inputStream->seek(positions.at(columnId));
+ }
+
class StringDictionaryColumnReader: public ColumnReader {
private:
std::shared_ptr<StringDictionary> dictionary;
@@ -501,6 +554,9 @@ namespace orc {
void nextEncoded(ColumnVectorBatch& rowBatch,
uint64_t numValues,
char* notNull) override;
+
+ void seekToRowGroup(
+ std::unordered_map<uint64_t, PositionProvider>& positions) override;
};
StringDictionaryColumnReader::StringDictionaryColumnReader
@@ -599,6 +655,13 @@ namespace orc {
rle->next(batch.index.data(), numValues, notNull);
}
+ void StringDictionaryColumnReader::seekToRowGroup(
+ std::unordered_map<uint64_t, PositionProvider>& positions) {
+ ColumnReader::seekToRowGroup(positions);
+ rle->seek(positions.at(columnId));
+ }
+
+
class StringDirectColumnReader: public ColumnReader {
private:
std::unique_ptr<RleDecoder> lengthRle;
@@ -625,6 +688,9 @@ namespace orc {
void next(ColumnVectorBatch& rowBatch,
uint64_t numValues,
char *notNull) override;
+
+ void seekToRowGroup(
+ std::unordered_map<uint64_t, PositionProvider>& positions) override;
};
StringDirectColumnReader::StringDirectColumnReader
@@ -760,6 +826,13 @@ namespace orc {
}
}
+ void StringDirectColumnReader::seekToRowGroup(
+ std::unordered_map<uint64_t, PositionProvider>& positions) {
+ ColumnReader::seekToRowGroup(positions);
+ blobStream->seek(positions.at(columnId));
+ lengthRle->seek(positions.at(columnId));
+ }
+
class StructColumnReader: public ColumnReader {
private:
std::vector<ColumnReader*> children;
@@ -778,6 +851,9 @@ namespace orc {
uint64_t numValues,
char *notNull) override;
+ void seekToRowGroup(
+ std::unordered_map<uint64_t, PositionProvider>& positions) override;
+
private:
template<bool encoded>
void nextInternal(ColumnVectorBatch& rowBatch,
@@ -852,6 +928,16 @@ namespace orc {
}
}
+ void StructColumnReader::seekToRowGroup(
+ std::unordered_map<uint64_t, PositionProvider>& positions) {
+ ColumnReader::seekToRowGroup(positions);
+
+ for(std::vector<ColumnReader*>::iterator ptr = children.begin();
+ ptr != children.end();
+ ++ptr) {
+ (*ptr)->seekToRowGroup(positions);
+ }
+ }
class ListColumnReader: public ColumnReader {
private:
@@ -872,6 +958,9 @@ namespace orc {
uint64_t numValues,
char *notNull) override;
+ void seekToRowGroup(
+ std::unordered_map<uint64_t, PositionProvider>& positions) override;
+
private:
template<bool encoded>
void nextInternal(ColumnVectorBatch& rowBatch,
@@ -973,6 +1062,15 @@ namespace orc {
}
}
+ void ListColumnReader::seekToRowGroup(
+ std::unordered_map<uint64_t, PositionProvider>& positions) {
+ ColumnReader::seekToRowGroup(positions);
+ rle->seek(positions.at(columnId));
+ if (child.get()) {
+ child->seekToRowGroup(positions);
+ }
+ }
+
class MapColumnReader: public ColumnReader {
private:
std::unique_ptr<ColumnReader> keyReader;
@@ -993,6 +1091,9 @@ namespace orc {
uint64_t numValues,
char *notNull) override;
+ void seekToRowGroup(
+ std::unordered_map<uint64_t, PositionProvider>& positions) override;
+
private:
template<bool encoded>
void nextInternal(ColumnVectorBatch& rowBatch,
@@ -1114,6 +1215,18 @@ namespace orc {
}
}
+ void MapColumnReader::seekToRowGroup(
+ std::unordered_map<uint64_t, PositionProvider>& positions) {
+ ColumnReader::seekToRowGroup(positions);
+ rle->seek(positions.at(columnId));
+ if (keyReader.get()) {
+ keyReader->seekToRowGroup(positions);
+ }
+ if (elementReader.get()) {
+ elementReader->seekToRowGroup(positions);
+ }
+ }
+
class UnionColumnReader: public ColumnReader {
private:
std::unique_ptr<ByteRleDecoder> rle;
@@ -1135,6 +1248,9 @@ namespace orc {
uint64_t numValues,
char *notNull) override;
+ void seekToRowGroup(
+ std::unordered_map<uint64_t, PositionProvider>& positions) override;
+
private:
template<bool encoded>
void nextInternal(ColumnVectorBatch& rowBatch,
@@ -1246,6 +1362,17 @@ namespace orc {
}
}
+ void UnionColumnReader::seekToRowGroup(
+ std::unordered_map<uint64_t, PositionProvider>& positions) {
+ ColumnReader::seekToRowGroup(positions);
+ rle->seek(positions.at(columnId));
+ for(size_t i = 0; i < numChildren; ++i) {
+ if (childrenReader[i] != nullptr) {
+ childrenReader[i]->seekToRowGroup(positions);
+ }
+ }
+ }
+
/**
* Destructively convert the number from zigzag encoding to the
* natural signed representation.
@@ -1322,6 +1449,9 @@ namespace orc {
void next(ColumnVectorBatch& rowBatch,
uint64_t numValues,
char *notNull) override;
+
+ void seekToRowGroup(
+ std::unordered_map<uint64_t, PositionProvider>& positions) override;
};
const uint32_t Decimal64ColumnReader::MAX_PRECISION_64;
const uint32_t Decimal64ColumnReader::MAX_PRECISION_128;
@@ -1429,6 +1559,13 @@ namespace orc {
}
}
+ void Decimal64ColumnReader::seekToRowGroup(
+ std::unordered_map<uint64_t, PositionProvider>& positions) {
+ ColumnReader::seekToRowGroup(positions);
+ valueStream->seek(positions.at(columnId));
+ scaleDecoder->seek(positions.at(columnId));
+ }
+
class Decimal128ColumnReader: public Decimal64ColumnReader {
public:
Decimal128ColumnReader(const Type& type, StripeStreams& stipe);
diff --git a/c++/src/ColumnReader.hh b/c++/src/ColumnReader.hh
index ca65872..d3c810e 100644
--- a/c++/src/ColumnReader.hh
+++ b/c++/src/ColumnReader.hh
@@ -135,6 +135,13 @@ namespace orc {
next(rowBatch, numValues, notNull);
}
+ /**
+ * Seek to beginning of a row group in the current stripe
+ * @param positions a list of PositionProviders storing the positions
+ */
+ virtual void seekToRowGroup(
+ std::unordered_map<uint64_t, PositionProvider>& positions);
+
};
/**
diff --git a/c++/src/Reader.cc b/c++/src/Reader.cc
index c5627a7..a814fbc 100644
--- a/c++/src/Reader.cc
+++ b/c++/src/Reader.cc
@@ -292,7 +292,75 @@ namespace orc {
currentRowInStripe = rowNumber - firstRowOfStripe[currentStripe];
previousRow = rowNumber;
startNextStripe();
- reader->skip(currentRowInStripe);
+
+ uint64_t rowsToSkip = currentRowInStripe;
+
+ if (footer->rowindexstride() > 0 &&
+ currentStripeInfo.indexlength() > 0) {
+ uint32_t rowGroupId =
+ static_cast<uint32_t>(currentRowInStripe / footer->rowindexstride());
+ rowsToSkip -= rowGroupId * footer->rowindexstride();
+
+ if (rowGroupId != 0) {
+ seekToRowGroup(rowGroupId);
+ }
+ }
+
+ reader->skip(rowsToSkip);
+ }
+
+ void RowReaderImpl::seekToRowGroup(uint32_t rowGroupEntryId) {
+ // reset all previous row indexes
+ rowIndexes.clear();
+
+ // obtain row indexes for selected columns
+ uint64_t offset = currentStripeInfo.offset();
+ for (int i = 0; i < currentStripeFooter.streams_size(); ++i) {
+ const proto::Stream& pbStream = currentStripeFooter.streams(i);
+ uint64_t colId = pbStream.column();
+ if (selectedColumns[colId] && pbStream.has_kind()
+ && pbStream.kind() == proto::Stream_Kind_ROW_INDEX) {
+ std::unique_ptr<SeekableInputStream> inStream =
+ createDecompressor(getCompression(),
+ std::unique_ptr<SeekableInputStream>
+ (new SeekableFileInputStream
+ (contents->stream.get(),
+ offset,
+ pbStream.length(),
+ *contents->pool)),
+ getCompressionSize(),
+ *contents->pool);
+
+ proto::RowIndex rowIndex;
+ if (!rowIndex.ParseFromZeroCopyStream(inStream.get())) {
+ throw ParseError("Failed to parse the row index");
+ }
+
+ rowIndexes[colId] = rowIndex;
+ }
+ offset += pbStream.length();
+ }
+
+ // store positions for selected columns
+ std::vector<std::list<uint64_t>> positions;
+ // store position providers for selected colimns
+ std::unordered_map<uint64_t, PositionProvider> positionProviders;
+
+ for (const auto& rowIndex : rowIndexes) {
+ uint64_t colId = rowIndex.first;
+ const proto::RowIndexEntry& entry =
+ rowIndex.second.entry(static_cast<int32_t>(rowGroupEntryId));
+
+ // copy index positions for a specific column
+ positions.push_back({});
+ auto& position = positions.back();
+ for (int pos = 0; pos != entry.positions_size(); ++pos) {
+ position.push_back(entry.positions(pos));
+ }
+ positionProviders.emplace(std::make_pair(colId, PositionProvider(position)));
+ }
+
+ reader->seekToRowGroup(positionProviders);
}
const FileContents& RowReaderImpl::getFileContents() const {
diff --git a/c++/src/Reader.hh b/c++/src/Reader.hh
index 75eb0bb..a381956 100644
--- a/c++/src/Reader.hh
+++ b/c++/src/Reader.hh
@@ -140,6 +140,15 @@ namespace orc {
// internal methods
void startNextStripe();
+ // row index of current stripe with column id as the key
+ std::unordered_map<uint64_t, proto::RowIndex> rowIndexes;
+
+ /**
+ * Seek to the start of a row group in the current stripe
+ * @param rowGroupEntryId the row group id to seek to
+ */
+ void seekToRowGroup(uint32_t rowGroupEntryId);
+
public:
/**
* Constructor that lets the user specify additional options.