You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@orc.apache.org by om...@apache.org on 2017/01/04 19:23:08 UTC
[2/3] orc git commit: ORC-58. Split curent Reader into RowReader and
Reader (Deepak Majeti via omalley)
http://git-wip-us.apache.org/repos/asf/orc/blob/6bf63bb1/c++/src/Reader.cc
----------------------------------------------------------------------
diff --git a/c++/src/Reader.cc b/c++/src/Reader.cc
index 91f4ea1..3c618b2 100644
--- a/c++/src/Reader.cc
+++ b/c++/src/Reader.cc
@@ -16,21 +16,16 @@
* limitations under the License.
*/
-#include "orc/Int128.hh"
-#include "orc/OrcFile.hh"
-#include "orc/Reader.hh"
-#include "Adaptor.hh"
-#include "ColumnReader.hh"
-#include "Exceptions.hh"
-#include "RLE.hh"
-#include "TypeImpl.hh"
+#include "Options.hh"
+#include "Reader.hh"
+#include "Statistics.hh"
+#include "StripeStream.hh"
#include "wrap/coded-stream-wrapper.h"
#include <algorithm>
#include <iostream>
-#include <limits>
#include <memory>
#include <sstream>
#include <string>
@@ -80,1213 +75,160 @@ namespace orc {
return buffer.str();
}
- enum ColumnSelection {
- ColumnSelection_NONE = 0,
- ColumnSelection_NAMES = 1,
- ColumnSelection_FIELD_IDS = 2,
- ColumnSelection_TYPE_IDS = 3,
- };
-
- struct ReaderOptionsPrivate {
- ColumnSelection selection;
- std::list<uint64_t> includedColumnIndexes;
- std::list<std::string> includedColumnNames;
- uint64_t dataStart;
- uint64_t dataLength;
- uint64_t tailLocation;
- bool throwOnHive11DecimalOverflow;
- int32_t forcedScaleOnHive11Decimal;
- std::ostream* errorStream;
- MemoryPool* memoryPool;
- std::string serializedTail;
-
- ReaderOptionsPrivate() {
- selection = ColumnSelection_NONE;
- dataStart = 0;
- dataLength = std::numeric_limits<uint64_t>::max();
- tailLocation = std::numeric_limits<uint64_t>::max();
- throwOnHive11DecimalOverflow = true;
- forcedScaleOnHive11Decimal = 6;
- errorStream = &std::cerr;
- memoryPool = getDefaultPool();
+ uint64_t getCompressionBlockSize(const proto::PostScript& ps) {
+ if (ps.has_compressionblocksize()) {
+ return ps.compressionblocksize();
+ } else {
+ return 256 * 1024;
}
- };
-
- ReaderOptions::ReaderOptions():
- privateBits(std::unique_ptr<ReaderOptionsPrivate>
- (new ReaderOptionsPrivate())) {
- // PASS
}
- ReaderOptions::ReaderOptions(const ReaderOptions& rhs):
- privateBits(std::unique_ptr<ReaderOptionsPrivate>
- (new ReaderOptionsPrivate(*(rhs.privateBits.get())))) {
- // PASS
- }
-
- ReaderOptions::ReaderOptions(ReaderOptions& rhs) {
- // swap privateBits with rhs
- ReaderOptionsPrivate* l = privateBits.release();
- privateBits.reset(rhs.privateBits.release());
- rhs.privateBits.reset(l);
- }
-
- ReaderOptions& ReaderOptions::operator=(const ReaderOptions& rhs) {
- if (this != &rhs) {
- privateBits.reset(new ReaderOptionsPrivate(*(rhs.privateBits.get())));
+ CompressionKind convertCompressionKind(const proto::PostScript& ps) {
+ if (ps.has_compression()) {
+ return static_cast<CompressionKind>(ps.compression());
+ } else {
+ throw ParseError("Unknown compression type");
}
- return *this;
- }
-
- ReaderOptions::~ReaderOptions() {
- // PASS
- }
-
- ReaderOptions& ReaderOptions::include(const std::list<uint64_t>& include) {
- privateBits->selection = ColumnSelection_FIELD_IDS;
- privateBits->includedColumnIndexes.assign(include.begin(), include.end());
- privateBits->includedColumnNames.clear();
- return *this;
- }
-
- ReaderOptions& ReaderOptions::include(const std::list<std::string>& include) {
- privateBits->selection = ColumnSelection_NAMES;
- privateBits->includedColumnNames.assign(include.begin(), include.end());
- privateBits->includedColumnIndexes.clear();
- return *this;
- }
-
- ReaderOptions& ReaderOptions::includeTypes(const std::list<uint64_t>& types) {
- privateBits->selection = ColumnSelection_TYPE_IDS;
- privateBits->includedColumnIndexes.assign(types.begin(), types.end());
- privateBits->includedColumnNames.clear();
- return *this;
- }
-
- ReaderOptions& ReaderOptions::range(uint64_t offset,
- uint64_t length) {
- privateBits->dataStart = offset;
- privateBits->dataLength = length;
- return *this;
- }
-
- ReaderOptions& ReaderOptions::setTailLocation(uint64_t offset) {
- privateBits->tailLocation = offset;
- return *this;
- }
-
- ReaderOptions& ReaderOptions::setMemoryPool(MemoryPool& pool) {
- privateBits->memoryPool = &pool;
- return *this;
- }
-
- ReaderOptions& ReaderOptions::setSerializedFileTail(const std::string& value
- ) {
- privateBits->serializedTail = value;
- return *this;
- }
-
- MemoryPool* ReaderOptions::getMemoryPool() const{
- return privateBits->memoryPool;
- }
-
- bool ReaderOptions::getIndexesSet() const {
- return privateBits->selection == ColumnSelection_FIELD_IDS;
- }
-
- bool ReaderOptions::getTypeIdsSet() const {
- return privateBits->selection == ColumnSelection_TYPE_IDS;
- }
-
- const std::list<uint64_t>& ReaderOptions::getInclude() const {
- return privateBits->includedColumnIndexes;
- }
-
- bool ReaderOptions::getNamesSet() const {
- return privateBits->selection == ColumnSelection_NAMES;
- }
-
- const std::list<std::string>& ReaderOptions::getIncludeNames() const {
- return privateBits->includedColumnNames;
- }
-
- uint64_t ReaderOptions::getOffset() const {
- return privateBits->dataStart;
- }
-
- uint64_t ReaderOptions::getLength() const {
- return privateBits->dataLength;
}
- uint64_t ReaderOptions::getTailLocation() const {
- return privateBits->tailLocation;
- }
-
- ReaderOptions& ReaderOptions::throwOnHive11DecimalOverflow(bool shouldThrow){
- privateBits->throwOnHive11DecimalOverflow = shouldThrow;
- return *this;
- }
-
- bool ReaderOptions::getThrowOnHive11DecimalOverflow() const {
- return privateBits->throwOnHive11DecimalOverflow;
- }
-
- ReaderOptions& ReaderOptions::forcedScaleOnHive11Decimal(int32_t forcedScale
- ) {
- privateBits->forcedScaleOnHive11Decimal = forcedScale;
- return *this;
- }
-
- int32_t ReaderOptions::getForcedScaleOnHive11Decimal() const {
- return privateBits->forcedScaleOnHive11Decimal;
- }
-
- ReaderOptions& ReaderOptions::setErrorStream(std::ostream& stream) {
- privateBits->errorStream = &stream;
- return *this;
- }
-
- std::ostream* ReaderOptions::getErrorStream() const {
- return privateBits->errorStream;
- }
-
- std::string ReaderOptions::getSerializedFileTail() const {
- return privateBits->serializedTail;
- }
-
- StreamInformation::~StreamInformation() {
- // PASS
- }
-
- StripeInformation::~StripeInformation() {
- // PASS
- }
-
- class ColumnStatisticsImpl: public ColumnStatistics {
- private:
- uint64_t valueCount;
-
- public:
- ColumnStatisticsImpl(const proto::ColumnStatistics& stats);
- virtual ~ColumnStatisticsImpl();
-
- uint64_t getNumberOfValues() const override {
- return valueCount;
- }
-
- std::string toString() const override {
- std::ostringstream buffer;
- buffer << "Column has " << valueCount << " values" << std::endl;
- return buffer.str();
- }
- };
-
- class BinaryColumnStatisticsImpl: public BinaryColumnStatistics {
- private:
- bool _hasTotalLength;
- uint64_t valueCount;
- uint64_t totalLength;
-
- public:
- BinaryColumnStatisticsImpl(const proto::ColumnStatistics& stats,
- bool correctStats);
- virtual ~BinaryColumnStatisticsImpl();
-
- bool hasTotalLength() const override {
- return _hasTotalLength;
- }
- uint64_t getNumberOfValues() const override {
- return valueCount;
- }
-
- uint64_t getTotalLength() const override {
- if(_hasTotalLength){
- return totalLength;
- }else{
- throw ParseError("Total length is not defined.");
- }
- }
-
- std::string toString() const override {
- std::ostringstream buffer;
- buffer << "Data type: Binary" << std::endl
- << "Values: " << valueCount << std::endl;
- if(_hasTotalLength){
- buffer << "Total length: " << totalLength << std::endl;
- }else{
- buffer << "Total length: not defined" << std::endl;
- }
- return buffer.str();
- }
- };
-
- class BooleanColumnStatisticsImpl: public BooleanColumnStatistics {
- private:
- bool _hasCount;
- uint64_t valueCount;
- uint64_t trueCount;
-
- public:
- BooleanColumnStatisticsImpl(const proto::ColumnStatistics& stats, bool correctStats);
- virtual ~BooleanColumnStatisticsImpl();
-
- bool hasCount() const override {
- return _hasCount;
- }
-
- uint64_t getNumberOfValues() const override {
- return valueCount;
- }
-
- uint64_t getFalseCount() const override {
- if(_hasCount){
- return valueCount - trueCount;
- }else{
- throw ParseError("False count is not defined.");
- }
- }
-
- uint64_t getTrueCount() const override {
- if(_hasCount){
- return trueCount;
- }else{
- throw ParseError("True count is not defined.");
- }
- }
-
- std::string toString() const override {
- std::ostringstream buffer;
- buffer << "Data type: Boolean" << std::endl
- << "Values: " << valueCount << std::endl;
- if(_hasCount){
- buffer << "(true: " << trueCount << "; false: "
- << valueCount - trueCount << ")" << std::endl;
- } else {
- buffer << "(true: not defined; false: not defined)" << std::endl;
- buffer << "True and false count are not defined" << std::endl;
- }
- return buffer.str();
- }
- };
-
- class DateColumnStatisticsImpl: public DateColumnStatistics {
- private:
- bool _hasMinimum;
- bool _hasMaximum;
- uint64_t valueCount;
- int32_t minimum;
- int32_t maximum;
-
- public:
- DateColumnStatisticsImpl(const proto::ColumnStatistics& stats, bool correctStats);
- virtual ~DateColumnStatisticsImpl();
-
- bool hasMinimum() const override {
- return _hasMinimum;
- }
-
- bool hasMaximum() const override {
- return _hasMaximum;
- }
-
- uint64_t getNumberOfValues() const override {
- return valueCount;
- }
-
- int32_t getMinimum() const override {
- if(_hasMinimum){
- return minimum;
- }else{
- throw ParseError("Minimum is not defined.");
- }
- }
-
- int32_t getMaximum() const override {
- if(_hasMaximum){
- return maximum;
- }else{
- throw ParseError("Maximum is not defined.");
- }
- }
-
- std::string toString() const override {
- std::ostringstream buffer;
- buffer << "Data type: Date" << std::endl
- << "Values: " << valueCount << std::endl;
- if(_hasMinimum){
- buffer << "Minimum: " << minimum << std::endl;
- }else{
- buffer << "Minimum: not defined" << std::endl;
- }
-
- if(_hasMaximum){
- buffer << "Maximum: " << maximum << std::endl;
- }else{
- buffer << "Maximum: not defined" << std::endl;
- }
- return buffer.str();
- }
- };
-
- class DecimalColumnStatisticsImpl: public DecimalColumnStatistics {
- private:
- bool _hasMinimum;
- bool _hasMaximum;
- bool _hasSum;
- uint64_t valueCount;
- std::string minimum;
- std::string maximum;
- std::string sum;
-
- public:
- DecimalColumnStatisticsImpl(const proto::ColumnStatistics& stats, bool correctStats);
- virtual ~DecimalColumnStatisticsImpl();
-
- bool hasMinimum() const override {
- return _hasMinimum;
- }
-
- bool hasMaximum() const override {
- return _hasMaximum;
- }
-
- bool hasSum() const override {
- return _hasSum;
- }
-
- uint64_t getNumberOfValues() const override {
- return valueCount;
- }
-
- Decimal getMinimum() const override {
- if(_hasMinimum){
- return Decimal(minimum);
- }else{
- throw ParseError("Minimum is not defined.");
- }
- }
-
- Decimal getMaximum() const override {
- if(_hasMaximum){
- return Decimal(maximum);
- }else{
- throw ParseError("Maximum is not defined.");
- }
- }
-
- Decimal getSum() const override {
- if(_hasSum){
- return Decimal(sum);
- }else{
- throw ParseError("Sum is not defined.");
- }
- }
-
- std::string toString() const override {
- std::ostringstream buffer;
- buffer << "Data type: Decimal" << std::endl
- << "Values: " << valueCount << std::endl;
- if(_hasMinimum){
- buffer << "Minimum: " << minimum << std::endl;
- }else{
- buffer << "Minimum: not defined" << std::endl;
- }
-
- if(_hasMaximum){
- buffer << "Maximum: " << maximum << std::endl;
- }else{
- buffer << "Maximum: not defined" << std::endl;
- }
-
- if(_hasSum){
- buffer << "Sum: " << sum << std::endl;
- }else{
- buffer << "Sum: not defined" << std::endl;
- }
-
- return buffer.str();
- }
- };
-
- class DoubleColumnStatisticsImpl: public DoubleColumnStatistics {
- private:
- bool _hasMinimum;
- bool _hasMaximum;
- bool _hasSum;
- uint64_t valueCount;
- double minimum;
- double maximum;
- double sum;
-
- public:
- DoubleColumnStatisticsImpl(const proto::ColumnStatistics& stats);
- virtual ~DoubleColumnStatisticsImpl();
-
- bool hasMinimum() const override {
- return _hasMinimum;
- }
-
- bool hasMaximum() const override {
- return _hasMaximum;
- }
-
- bool hasSum() const override {
- return _hasSum;
- }
-
- uint64_t getNumberOfValues() const override {
- return valueCount;
- }
-
- double getMinimum() const override {
- if(_hasMinimum){
- return minimum;
- }else{
- throw ParseError("Minimum is not defined.");
- }
- }
-
- double getMaximum() const override {
- if(_hasMaximum){
- return maximum;
- }else{
- throw ParseError("Maximum is not defined.");
- }
- }
-
- double getSum() const override {
- if(_hasSum){
- return sum;
- }else{
- throw ParseError("Sum is not defined.");
- }
- }
-
- std::string toString() const override {
- std::ostringstream buffer;
- buffer << "Data type: Double" << std::endl
- << "Values: " << valueCount << std::endl;
- if(_hasMinimum){
- buffer << "Minimum: " << minimum << std::endl;
- }else{
- buffer << "Minimum: not defined" << std::endl;
- }
-
- if(_hasMaximum){
- buffer << "Maximum: " << maximum << std::endl;
- }else{
- buffer << "Maximum: not defined" << std::endl;
- }
-
- if(_hasSum){
- buffer << "Sum: " << sum << std::endl;
- }else{
- buffer << "Sum: not defined" << std::endl;
- }
- return buffer.str();
- }
- };
-
- class IntegerColumnStatisticsImpl: public IntegerColumnStatistics {
- private:
- bool _hasMinimum;
- bool _hasMaximum;
- bool _hasSum;
- uint64_t valueCount;
- int64_t minimum;
- int64_t maximum;
- int64_t sum;
-
- public:
- IntegerColumnStatisticsImpl(const proto::ColumnStatistics& stats);
- virtual ~IntegerColumnStatisticsImpl();
-
- bool hasMinimum() const override {
- return _hasMinimum;
- }
-
- bool hasMaximum() const override {
- return _hasMaximum;
- }
-
- bool hasSum() const override {
- return _hasSum;
- }
-
- uint64_t getNumberOfValues() const override {
- return valueCount;
- }
-
- int64_t getMinimum() const override {
- if(_hasMinimum){
- return minimum;
- }else{
- throw ParseError("Minimum is not defined.");
- }
- }
-
- int64_t getMaximum() const override {
- if(_hasMaximum){
- return maximum;
- }else{
- throw ParseError("Maximum is not defined.");
- }
- }
-
- int64_t getSum() const override {
- if(_hasSum){
- return sum;
- }else{
- throw ParseError("Sum is not defined.");
- }
- }
-
- std::string toString() const override {
- std::ostringstream buffer;
- buffer << "Data type: Integer" << std::endl
- << "Values: " << valueCount << std::endl;
- if(_hasMinimum){
- buffer << "Minimum: " << minimum << std::endl;
- }else{
- buffer << "Minimum: not defined" << std::endl;
- }
-
- if(_hasMaximum){
- buffer << "Maximum: " << maximum << std::endl;
- }else{
- buffer << "Maximum: not defined" << std::endl;
- }
-
- if(_hasSum){
- buffer << "Sum: " << sum << std::endl;
- }else{
- buffer << "Sum: not defined" << std::endl;
- }
- return buffer.str();
- }
- };
-
- class StringColumnStatisticsImpl: public StringColumnStatistics {
- private:
- bool _hasMinimum;
- bool _hasMaximum;
- bool _hasTotalLength;
- uint64_t valueCount;
- std::string minimum;
- std::string maximum;
- uint64_t totalLength;
-
- public:
- StringColumnStatisticsImpl(const proto::ColumnStatistics& stats, bool correctStats);
- virtual ~StringColumnStatisticsImpl();
-
- bool hasMinimum() const override {
- return _hasMinimum;
- }
-
- bool hasMaximum() const override {
- return _hasMaximum;
- }
-
- bool hasTotalLength() const override {
- return _hasTotalLength;
- }
-
- uint64_t getNumberOfValues() const override {
- return valueCount;
- }
-
- std::string getMinimum() const override {
- if(_hasMinimum){
- return minimum;
- }else{
- throw ParseError("Minimum is not defined.");
- }
- }
-
- std::string getMaximum() const override {
- if(_hasMaximum){
- return maximum;
- }else{
- throw ParseError("Maximum is not defined.");
- }
- }
-
- uint64_t getTotalLength() const override {
- if(_hasTotalLength){
- return totalLength;
- }else{
- throw ParseError("Total length is not defined.");
- }
- }
-
- std::string toString() const override {
- std::ostringstream buffer;
- buffer << "Data type: String" << std::endl
- << "Values: " << valueCount << std::endl;
- if(_hasMinimum){
- buffer << "Minimum: " << minimum << std::endl;
- }else{
- buffer << "Minimum is not defined" << std::endl;
- }
-
- if(_hasMaximum){
- buffer << "Maximum: " << maximum << std::endl;
- }else{
- buffer << "Maximum is not defined" << std::endl;
- }
-
- if(_hasTotalLength){
- buffer << "Total length: " << totalLength << std::endl;
- }else{
- buffer << "Total length is not defined" << std::endl;
- }
- return buffer.str();
- }
- };
-
- class TimestampColumnStatisticsImpl: public TimestampColumnStatistics {
- private:
- bool _hasMinimum;
- bool _hasMaximum;
- uint64_t valueCount;
- int64_t minimum;
- int64_t maximum;
-
- public:
- TimestampColumnStatisticsImpl(const proto::ColumnStatistics& stats,
- bool correctStats);
- virtual ~TimestampColumnStatisticsImpl();
-
- bool hasMinimum() const override {
- return _hasMinimum;
- }
-
- bool hasMaximum() const override {
- return _hasMaximum;
- }
-
- uint64_t getNumberOfValues() const override {
- return valueCount;
- }
-
- int64_t getMinimum() const override {
- if(_hasMinimum){
- return minimum;
- }else{
- throw ParseError("Minimum is not defined.");
- }
- }
-
- int64_t getMaximum() const override {
- if(_hasMaximum){
- return maximum;
- }else{
- throw ParseError("Maximum is not defined.");
- }
- }
-
- std::string toString() const override {
- std::ostringstream buffer;
- buffer << "Data type: Timestamp" << std::endl
- << "Values: " << valueCount << std::endl;
- if(_hasMinimum){
- buffer << "Minimum: " << minimum << std::endl;
- }else{
- buffer << "Minimum is not defined" << std::endl;
- }
-
- if(_hasMaximum){
- buffer << "Maximum: " << maximum << std::endl;
- }else{
- buffer << "Maximum is not defined" << std::endl;
+ std::string ColumnSelector::toDotColumnPath() {
+ if (columns.empty()) {
+ return std::string();
}
- return buffer.str();
- }
- };
-
- std::string streamKindToString(StreamKind kind) {
- switch (static_cast<int>(kind)) {
- case StreamKind_PRESENT:
- return "present";
- case StreamKind_DATA:
- return "data";
- case StreamKind_LENGTH:
- return "length";
- case StreamKind_DICTIONARY_DATA:
- return "dictionary";
- case StreamKind_DICTIONARY_COUNT:
- return "dictionary count";
- case StreamKind_SECONDARY:
- return "secondary";
- case StreamKind_ROW_INDEX:
- return "index";
- case StreamKind_BLOOM_FILTER:
- return "bloom";
- }
- std::stringstream buffer;
- buffer << "unknown - " << kind;
- return buffer.str();
- }
-
- std::string columnEncodingKindToString(ColumnEncodingKind kind) {
- switch (static_cast<int>(kind)) {
- case ColumnEncodingKind_DIRECT:
- return "direct";
- case ColumnEncodingKind_DICTIONARY:
- return "dictionary";
- case ColumnEncodingKind_DIRECT_V2:
- return "direct rle2";
- case ColumnEncodingKind_DICTIONARY_V2:
- return "dictionary rle2";
- }
- std::stringstream buffer;
- buffer << "unknown - " << kind;
- return buffer.str();
- }
-
- class StreamInformationImpl: public StreamInformation {
- private:
- StreamKind kind;
- uint64_t column;
- uint64_t offset;
- uint64_t length;
- public:
- StreamInformationImpl(uint64_t _offset,
- const proto::Stream& stream
- ): kind(static_cast<StreamKind>(stream.kind())),
- column(stream.column()),
- offset(_offset),
- length(stream.length()) {
- // PASS
- }
-
- ~StreamInformationImpl();
-
- StreamKind getKind() const override {
- return kind;
- }
-
- uint64_t getColumnId() const override {
- return column;
- }
-
- uint64_t getOffset() const override {
- return offset;
- }
-
- uint64_t getLength() const override {
- return length;
- }
- };
-
- StreamInformationImpl::~StreamInformationImpl() {
- // PASS
+ std::ostringstream columnStream;
+ std::copy(columns.begin(), columns.end(),
+ std::ostream_iterator<std::string>(columnStream, "."));
+ std::string columnPath = columnStream.str();
+ return columnPath.substr(0, columnPath.length() - 1);
}
- class StripeInformationImpl : public StripeInformation {
- uint64_t offset;
- uint64_t indexLength;
- uint64_t dataLength;
- uint64_t footerLength;
- uint64_t numRows;
- InputStream* stream;
- MemoryPool& memory;
- CompressionKind compression;
- uint64_t blockSize;
- mutable std::unique_ptr<proto::StripeFooter> stripeFooter;
- void ensureStripeFooterLoaded() const;
- public:
-
- StripeInformationImpl(uint64_t _offset,
- uint64_t _indexLength,
- uint64_t _dataLength,
- uint64_t _footerLength,
- uint64_t _numRows,
- InputStream* _stream,
- MemoryPool& _memory,
- CompressionKind _compression,
- uint64_t _blockSize
- ) : offset(_offset),
- indexLength(_indexLength),
- dataLength(_dataLength),
- footerLength(_footerLength),
- numRows(_numRows),
- stream(_stream),
- memory(_memory),
- compression(_compression),
- blockSize(_blockSize) {
- // PASS
- }
-
- virtual ~StripeInformationImpl() {
- // PASS
- }
-
- uint64_t getOffset() const override {
- return offset;
- }
-
- uint64_t getLength() const override {
- return indexLength + dataLength + footerLength;
- }
- uint64_t getIndexLength() const override {
- return indexLength;
- }
-
- uint64_t getDataLength()const override {
- return dataLength;
- }
-
- uint64_t getFooterLength() const override {
- return footerLength;
- }
-
- uint64_t getNumberOfRows() const override {
- return numRows;
- }
-
- uint64_t getNumberOfStreams() const override {
- ensureStripeFooterLoaded();
- return static_cast<uint64_t>(stripeFooter->streams_size());
- }
-
- std::unique_ptr<StreamInformation> getStreamInformation(uint64_t streamId
- ) const override;
-
- ColumnEncodingKind getColumnEncoding(uint64_t colId) const override {
- ensureStripeFooterLoaded();
- return static_cast<ColumnEncodingKind>(stripeFooter->
- columns(static_cast<int>(colId))
- .kind());
- }
-
- uint64_t getDictionarySize(uint64_t colId) const override {
- ensureStripeFooterLoaded();
- return static_cast<ColumnEncodingKind>(stripeFooter->
- columns(static_cast<int>(colId))
- .dictionarysize());
- }
- const std::string& getWriterTimezone() const override {
- ensureStripeFooterLoaded();
- return stripeFooter->writertimezone();
- }
- };
-
- void StripeInformationImpl::ensureStripeFooterLoaded() const {
- if (stripeFooter.get() == nullptr) {
- std::unique_ptr<SeekableInputStream> pbStream =
- createDecompressor(compression,
- std::unique_ptr<SeekableInputStream>
- (new SeekableFileInputStream(stream,
- offset +
- indexLength +
- dataLength,
- footerLength,
- memory)),
- blockSize,
- memory);
- stripeFooter.reset(new proto::StripeFooter());
- if (!stripeFooter->ParseFromZeroCopyStream(pbStream.get())) {
- throw ParseError("Failed to parse the stripe footer");
+ void ColumnSelector::selectChildren(std::vector<bool>& selectedColumns, const Type& type) {
+ size_t id = static_cast<size_t>(type.getColumnId());
+ if (!selectedColumns[id]) {
+ selectedColumns[id] = true;
+ for(size_t c = id; c <= type.getMaximumColumnId(); ++c){
+ selectedColumns[c] = true;
}
}
}
- std::unique_ptr<StreamInformation>
- StripeInformationImpl::getStreamInformation(uint64_t streamId) const {
- ensureStripeFooterLoaded();
- uint64_t streamOffset = offset;
- for(uint64_t s=0; s < streamId; ++s) {
- streamOffset += stripeFooter->streams(static_cast<int>(s)).length();
- }
- return ORC_UNIQUE_PTR<StreamInformation>
- (new StreamInformationImpl(streamOffset,
- stripeFooter->
- streams(static_cast<int>(streamId))));
- }
-
- ColumnStatistics* convertColumnStatistics(const proto::ColumnStatistics& s,
- bool correctStats) {
- if (s.has_intstatistics()) {
- return new IntegerColumnStatisticsImpl(s);
- } else if (s.has_doublestatistics()) {
- return new DoubleColumnStatisticsImpl(s);
- } else if (s.has_stringstatistics()) {
- return new StringColumnStatisticsImpl(s, correctStats);
- } else if (s.has_bucketstatistics()) {
- return new BooleanColumnStatisticsImpl(s, correctStats);
- } else if (s.has_decimalstatistics()) {
- return new DecimalColumnStatisticsImpl(s, correctStats);
- } else if (s.has_timestampstatistics()) {
- return new TimestampColumnStatisticsImpl(s, correctStats);
- } else if (s.has_datestatistics()) {
- return new DateColumnStatisticsImpl(s, correctStats);
- } else if (s.has_binarystatistics()) {
- return new BinaryColumnStatisticsImpl(s, correctStats);
- } else {
- return new ColumnStatisticsImpl(s);
- }
- }
-
- Statistics::~Statistics() {
- // PASS
- }
-
- class StatisticsImpl: public Statistics {
- private:
- std::list<ColumnStatistics*> colStats;
-
- // DELIBERATELY NOT IMPLEMENTED
- StatisticsImpl(const StatisticsImpl&);
- StatisticsImpl& operator=(const StatisticsImpl&);
-
- public:
- StatisticsImpl(const proto::StripeStatistics& stripeStats, bool correctStats) {
- for(int i = 0; i < stripeStats.colstats_size(); i++) {
- colStats.push_back(convertColumnStatistics
- (stripeStats.colstats(i), correctStats));
- }
- }
-
- StatisticsImpl(const proto::Footer& footer, bool correctStats) {
- for(int i = 0; i < footer.statistics_size(); i++) {
- colStats.push_back(convertColumnStatistics
- (footer.statistics(i), correctStats));
- }
- }
-
- virtual const ColumnStatistics* getColumnStatistics(uint32_t columnId
- ) const override {
- std::list<ColumnStatistics*>::const_iterator it = colStats.begin();
- std::advance(it, static_cast<int64_t>(columnId));
- return *it;
- }
-
- virtual ~StatisticsImpl();
-
- uint32_t getNumberOfColumns() const override {
- return static_cast<uint32_t>(colStats.size());
- }
- };
-
- StatisticsImpl::~StatisticsImpl() {
- for(std::list<ColumnStatistics*>::iterator ptr = colStats.begin();
- ptr != colStats.end();
- ++ptr) {
- delete *ptr;
+ /**
+ * Recurses over a type tree and selects the parents of every selected type.
+ * @return true if any child was selected.
+ */
+ bool ColumnSelector::selectParents(std::vector<bool>& selectedColumns, const Type& type) {
+ size_t id = static_cast<size_t>(type.getColumnId());
+ bool result = selectedColumns[id];
+ for(uint64_t c=0; c < type.getSubtypeCount(); ++c) {
+ result |= selectParents(selectedColumns, *type.getSubtype(c));
}
+ selectedColumns[id] = result;
+ return result;
}
- Reader::~Reader() {
- // PASS
- }
-
- static const uint64_t DIRECTORY_SIZE_GUESS = 16 * 1024;
-
- class ReaderImpl : public Reader {
- private:
- const Timezone& localTimezone;
-
- // inputs
- std::unique_ptr<InputStream> stream;
- ReaderOptions options;
- const uint64_t fileLength;
- const uint64_t postscriptLength;
- std::vector<bool> selectedColumns;
-
- // custom memory pool
- MemoryPool& memoryPool;
-
- // postscript
- std::unique_ptr<proto::PostScript> postscript;
- const uint64_t blockSize;
- const CompressionKind compression;
-
- // footer
- std::unique_ptr<proto::Footer> footer;
- DataBuffer<uint64_t> firstRowOfStripe;
- uint64_t numberOfStripes;
- std::unique_ptr<Type> schema;
- mutable std::unique_ptr<Type> selectedSchema;
-
- // metadata
- mutable std::unique_ptr<proto::Metadata> metadata;
- mutable bool isMetadataLoaded;
-
- // reading state
- uint64_t previousRow;
- uint64_t firstStripe;
- uint64_t currentStripe;
- uint64_t lastStripe; // the stripe AFTER the last one
- uint64_t currentRowInStripe;
- uint64_t rowsInCurrentStripe;
- proto::StripeInformation currentStripeInfo;
- proto::StripeFooter currentStripeFooter;
- std::unique_ptr<ColumnReader> reader;
- std::map<std::string, uint64_t> nameIdMap;
- std::map<uint64_t, const Type*> idTypeMap;
-
- // internal methods
- proto::StripeFooter getStripeFooter(const proto::StripeInformation& info);
- void startNextStripe();
- void checkOrcVersion();
- void readMetadata() const;
-
- // build map from type name and id, id to Type
- void buildTypeNameIdMap(const Type* type, std::vector<std::string>& columns);
- std::string toDotColumnPath(const std::vector<std::string>& columns);
-
- // Select the columns from the options object
- void updateSelected();
-
- // Select a field by name
- void updateSelectedByName(const std::string& name);
- // Select a field by id
- void updateSelectedByFieldId(uint64_t fieldId);
- // Select a type by id
- void updateSelectedByTypeId(uint64_t typeId);
-
- // Select all of the recursive children of the given type.
- void selectChildren(const Type& type);
-
- // For each child of type, select it if one of its children
- // is selected.
- bool selectParents(const Type& type);
-
- public:
- /**
- * Constructor that lets the user specify additional options.
- * @param stream the stream to read from
- * @param options options for reading
- * @param postscript the postscript for the file
- * @param footer the footer for the file
- * @param fileLength the length of the file in bytes
- * @param postscriptLength the length of the postscript in bytes
- */
- ReaderImpl(std::unique_ptr<InputStream> stream,
- const ReaderOptions& options,
- std::unique_ptr<proto::PostScript> postscript,
- std::unique_ptr<proto::Footer> footer,
- uint64_t fileLength,
- uint64_t postscriptLength);
-
- const ReaderOptions& getReaderOptions() const;
-
- CompressionKind getCompression() const override;
-
- std::string getFormatVersion() const override;
-
- WriterVersion getWriterVersion() const override;
-
- uint64_t getNumberOfRows() const override;
-
- uint64_t getRowIndexStride() const override;
-
- const std::string& getStreamName() const override;
-
- std::list<std::string> getMetadataKeys() const override;
-
- std::string getMetadataValue(const std::string& key) const override;
-
- bool hasMetadataValue(const std::string& key) const override;
-
- uint64_t getCompressionSize() const override;
-
- uint64_t getNumberOfStripes() const override;
-
- std::unique_ptr<StripeInformation> getStripe(uint64_t
- ) const override;
-
- uint64_t getNumberOfStripeStatistics() const override;
-
- std::unique_ptr<Statistics>
- getStripeStatistics(uint64_t stripeIndex) const override;
-
-
- uint64_t getContentLength() const override;
- uint64_t getStripeStatisticsLength() const override;
- uint64_t getFileFooterLength() const override;
- uint64_t getFilePostscriptLength() const override;
- uint64_t getFileLength() const override;
-
- std::unique_ptr<Statistics> getStatistics() const override;
-
- std::unique_ptr<ColumnStatistics> getColumnStatistics(uint32_t columnId
- ) const override;
-
- const Type& getType() const override;
-
- const Type& getSelectedType() const override;
-
- const std::vector<bool> getSelectedColumns() const override;
-
- std::unique_ptr<ColumnVectorBatch> createRowBatch(uint64_t size
- ) const override;
-
- bool next(ColumnVectorBatch& data) override;
-
- uint64_t getRowNumber() const override;
-
- void seekToRow(uint64_t rowNumber) override;
-
- MemoryPool* getMemoryPool() const ;
-
- bool hasCorrectStatistics() const override;
+ /**
+ * Recurses over a type tree and build two maps
+ * map<TypeName, TypeId>, map<TypeId, Type>
+ */
+ void ColumnSelector::buildTypeNameIdMap(const Type* type) {
+ // map<type_id, Type*>
+ idTypeMap[type->getColumnId()] = type;
- std::string getSerializedFileTail() const override;
+ if (STRUCT == type->getKind()) {
+ for (size_t i = 0; i < type->getSubtypeCount(); ++i) {
+ const std::string& fieldName = type->getFieldName(i);
+ columns.push_back(fieldName);
+ nameIdMap[toDotColumnPath()] = type->getSubtype(i)->getColumnId();
+ buildTypeNameIdMap(type->getSubtype(i));
+ columns.pop_back();
+ }
+ } else {
+ // other non-primitive type
+ for (size_t j = 0; j < type->getSubtypeCount(); ++j) {
+ buildTypeNameIdMap(type->getSubtype(j));
+ }
+ }
+ }
- uint64_t getMemoryUse(int stripeIx = -1) override;
- };
+ void ColumnSelector::updateSelected(std::vector<bool>& selectedColumns, const RowReaderOptions& options) {
+ selectedColumns.assign(static_cast<size_t>(contents->footer->types_size()), false);
+ if (contents->schema->getKind() == STRUCT && options.getIndexesSet()) {
+ for(std::list<uint64_t>::const_iterator field = options.getInclude().begin();
+ field != options.getInclude().end(); ++field) {
+ updateSelectedByFieldId(selectedColumns, *field);
+ }
+ } else if (contents->schema->getKind() == STRUCT && options.getNamesSet()) {
+ for(std::list<std::string>::const_iterator field = options.getIncludeNames().begin();
+ field != options.getIncludeNames().end(); ++field) {
+ updateSelectedByName(selectedColumns, *field);
+ }
+ } else if (options.getTypeIdsSet()) {
+ for(std::list<uint64_t>::const_iterator typeId = options.getInclude().begin();
+ typeId != options.getInclude().end(); ++typeId) {
+ updateSelectedByTypeId(selectedColumns, *typeId);
+ }
+ } else {
+ // default is to select all columns
+ std::fill(selectedColumns.begin(), selectedColumns.end(), true);
+ }
+ selectParents(selectedColumns, *contents->schema.get());
+ selectedColumns[0] = true; // column 0 is selected by default
+ }
- InputStream::~InputStream() {
- // PASS
- };
+ void ColumnSelector::updateSelectedByFieldId(std::vector<bool>& selectedColumns, uint64_t fieldId) {
+ if (fieldId < contents->schema->getSubtypeCount()) {
+ selectChildren(selectedColumns, *contents->schema->getSubtype(fieldId));
+ } else {
+ std::stringstream buffer;
+ buffer << "Invalid column selected " << fieldId << " out of "
+ << contents->schema->getSubtypeCount();
+ throw ParseError(buffer.str());
+ }
+ }
- uint64_t getCompressionBlockSize(const proto::PostScript& ps) {
- if (ps.has_compressionblocksize()) {
- return ps.compressionblocksize();
+ void ColumnSelector::updateSelectedByTypeId(std::vector<bool>& selectedColumns, uint64_t typeId) {
+ if (typeId < selectedColumns.size()) {
+ const Type& type = *idTypeMap[typeId];
+ selectChildren(selectedColumns, type);
} else {
- return 256 * 1024;
+ std::stringstream buffer;
+ buffer << "Invalid type id selected " << typeId << " out of "
+ << selectedColumns.size();
+ throw ParseError(buffer.str());
}
}
- CompressionKind convertCompressionKind(const proto::PostScript& ps) {
- if (ps.has_compression()) {
- return static_cast<CompressionKind>(ps.compression());
+ void ColumnSelector::updateSelectedByName(std::vector<bool>& selectedColumns, const std::string& fieldName) {
+ std::map<std::string, uint64_t>::const_iterator ite = nameIdMap.find(fieldName);
+ if (ite != nameIdMap.end()) {
+ updateSelectedByTypeId(selectedColumns, ite->second);
} else {
- throw ParseError("Unknown compression type");
+ throw ParseError("Invalid column selected " + fieldName);
}
}
- ReaderImpl::ReaderImpl(std::unique_ptr<InputStream> input,
- const ReaderOptions& opts,
- std::unique_ptr<proto::PostScript> _postscript,
- std::unique_ptr<proto::Footer> _footer,
- uint64_t _fileLength,
- uint64_t _postscriptLength
+ ColumnSelector::ColumnSelector(const FileContents* _contents): contents(_contents) {
+ buildTypeNameIdMap(contents->schema.get());
+ }
+
+ RowReaderImpl::RowReaderImpl(std::shared_ptr<FileContents> _contents,
+ const RowReaderOptions& opts
): localTimezone(getLocalTimezone()),
- stream(std::move(input)),
+ contents(_contents),
options(opts),
- fileLength(_fileLength),
- postscriptLength(_postscriptLength),
memoryPool(*opts.getMemoryPool()),
- postscript(std::move(_postscript)),
- blockSize(getCompressionBlockSize(*postscript)),
- compression(convertCompressionKind(*postscript)),
- footer(std::move(_footer)),
+ footer(contents->footer.get()),
firstRowOfStripe(memoryPool, 0) {
- isMetadataLoaded = false;
- checkOrcVersion();
+ uint64_t numberOfStripes;
numberOfStripes = static_cast<uint64_t>(footer->stripes_size());
- currentStripe = static_cast<uint64_t>(footer->stripes_size());
+ currentStripe = numberOfStripes;
lastStripe = 0;
currentRowInStripe = 0;
uint64_t rowTotal = 0;
- firstRowOfStripe.resize(static_cast<uint64_t>(footer->stripes_size()));
- for(size_t i=0; i < static_cast<size_t>(footer->stripes_size()); ++i) {
+ firstRowOfStripe.resize(numberOfStripes);
+ for(size_t i=0; i < numberOfStripes; ++i) {
firstRowOfStripe[i] = rowTotal;
proto::StripeInformation stripeInfo =
footer->stripes(static_cast<int>(i));
@@ -1306,48 +248,130 @@ namespace orc {
if (currentStripe == 0) {
previousRow = (std::numeric_limits<uint64_t>::max)();
- } else if (currentStripe ==
- static_cast<uint64_t>(footer->stripes_size())) {
+ } else if (currentStripe == numberOfStripes) {
previousRow = footer->numberofrows();
} else {
previousRow = firstRowOfStripe[firstStripe]-1;
}
- schema = convertType(footer->types(0), *footer);
- std::vector<std::string> columns;
- buildTypeNameIdMap(schema.get(), columns);
- updateSelected();
+ ColumnSelector column_selector(contents.get());
+ column_selector.updateSelected(selectedColumns, options);
}
- void ReaderImpl::updateSelected() {
- selectedColumns.assign(static_cast<size_t>(footer->types_size()), false);
- if (schema->getKind() == STRUCT && options.getIndexesSet()) {
- for(std::list<uint64_t>::const_iterator field = options.getInclude().begin();
- field != options.getInclude().end(); ++field) {
- updateSelectedByFieldId(*field);
- }
- } else if (schema->getKind() == STRUCT && options.getNamesSet()) {
- for(std::list<std::string>::const_iterator field = options.getIncludeNames().begin();
- field != options.getIncludeNames().end(); ++field) {
- updateSelectedByName(*field);
- }
- } else if (options.getTypeIdsSet()) {
- for(std::list<uint64_t>::const_iterator typeId = options.getInclude().begin();
- typeId != options.getInclude().end(); ++typeId) {
- updateSelectedByTypeId(*typeId);
- }
- } else {
- // default is to select all columns
- std::fill(selectedColumns.begin(), selectedColumns.end(), true);
+ const RowReaderOptions& RowReaderImpl::getRowReaderOptions() const {
+ return options;
+ }
+
+ CompressionKind RowReaderImpl::getCompression() const {
+ return contents->compression;
+ }
+
+ uint64_t RowReaderImpl::getCompressionSize() const {
+ return contents->blockSize;
+ }
+
+ const std::vector<bool> RowReaderImpl::getSelectedColumns() const {
+ return selectedColumns;
+ }
+
+ const Type& RowReaderImpl::getSelectedType() const {
+ if (selectedSchema.get() == nullptr) {
+ selectedSchema = buildSelectedType(contents->schema.get(),
+ selectedColumns);
}
- selectParents(*schema);
- selectedColumns[0] = true; // column 0 is selected by default
+ return *(selectedSchema.get());
+ }
+
+ uint64_t RowReaderImpl::getRowNumber() const {
+ return previousRow;
+ }
+
+ void RowReaderImpl::seekToRow(uint64_t rowNumber) {
+ // Empty file
+ if (lastStripe == 0) {
+ return;
+ }
+
+ // If we are reading only a portion of the file
+ // (bounded by firstStripe and lastStripe),
+ // seeking before or after the portion of interest should return no data.
+ // Implement this by setting previousRow to the number of rows in the file.
+
+ // seeking past lastStripe
+ uint64_t num_stripes = static_cast<uint64_t>(footer->stripes_size());
+ if ( (lastStripe == num_stripes
+ && rowNumber >= footer->numberofrows()) ||
+ (lastStripe < num_stripes
+ && rowNumber >= firstRowOfStripe[lastStripe]) ) {
+ currentStripe = num_stripes;
+ previousRow = footer->numberofrows();
+ return;
+ }
+
+ uint64_t seekToStripe = 0;
+ while (seekToStripe+1 < lastStripe &&
+ firstRowOfStripe[seekToStripe+1] <= rowNumber) {
+ seekToStripe++;
+ }
+
+ // seeking before the first stripe
+ if (seekToStripe < firstStripe) {
+ currentStripe = num_stripes;
+ previousRow = footer->numberofrows();
+ return;
+ }
+
+ currentStripe = seekToStripe;
+ currentRowInStripe = rowNumber - firstRowOfStripe[currentStripe];
+ previousRow = rowNumber;
+ startNextStripe();
+ reader->skip(currentRowInStripe);
+ }
+
+ proto::StripeFooter RowReaderImpl::getStripeFooter
+ (const proto::StripeInformation& info) {
+ uint64_t stripeFooterStart = info.offset() + info.indexlength() +
+ info.datalength();
+ uint64_t stripeFooterLength = info.footerlength();
+ std::unique_ptr<SeekableInputStream> pbStream =
+ createDecompressor(contents->compression,
+ std::unique_ptr<SeekableInputStream>
+ (new SeekableFileInputStream(contents->stream.get(),
+ stripeFooterStart,
+ stripeFooterLength,
+ memoryPool)),
+ contents->blockSize,
+ memoryPool);
+ proto::StripeFooter result;
+ if (!result.ParseFromZeroCopyStream(pbStream.get())) {
+ throw ParseError(std::string("bad StripeFooter from ") +
+ pbStream->getName());
+ }
+ return result;
+ }
+
+ ReaderImpl::ReaderImpl(std::shared_ptr<FileContents> _contents,
+ const ReaderOptions& opts,
+ uint64_t _fileLength,
+ uint64_t _postscriptLength
+ ): contents(std::move(_contents)),
+ options(opts),
+ fileLength(_fileLength),
+ postscriptLength(_postscriptLength),
+ memoryPool(*opts.getMemoryPool()),
+ footer(contents->footer.get()) {
+ isMetadataLoaded = false;
+ checkOrcVersion();
+ numberOfStripes = static_cast<uint64_t>(footer->stripes_size());
+ contents->schema = REDUNDANT_MOVE(convertType(footer->types(0), *footer));
+ contents->blockSize = getCompressionBlockSize(*contents->postscript);
+ contents->compression= convertCompressionKind(*contents->postscript);
}
std::string ReaderImpl::getSerializedFileTail() const {
proto::FileTail tail;
proto::PostScript *mutable_ps = tail.mutable_postscript();
- mutable_ps->CopyFrom(*postscript);
+ mutable_ps->CopyFrom(*contents->postscript);
proto::Footer *mutableFooter = tail.mutable_footer();
mutableFooter->CopyFrom(*footer);
tail.set_filelength(fileLength);
@@ -1364,11 +388,11 @@ namespace orc {
}
CompressionKind ReaderImpl::getCompression() const {
- return compression;
+ return contents->compression;
}
uint64_t ReaderImpl::getCompressionSize() const {
- return blockSize;
+ return contents->blockSize;
}
uint64_t ReaderImpl::getNumberOfStripes() const {
@@ -1398,19 +422,19 @@ namespace orc {
stripeInfo.datalength(),
stripeInfo.footerlength(),
stripeInfo.numberofrows(),
- stream.get(),
+ contents->stream.get(),
memoryPool,
- compression,
- blockSize));
+ contents->compression,
+ contents->blockSize));
}
std::string ReaderImpl::getFormatVersion() const {
std::stringstream result;
- for(int i=0; i < postscript->version_size(); ++i) {
+ for(int i=0; i < contents->postscript->version_size(); ++i) {
if (i != 0) {
result << ".";
}
- result << postscript->version(i);
+ result << contents->postscript->version(i);
}
return result.str();
}
@@ -1420,10 +444,10 @@ namespace orc {
}
WriterVersion ReaderImpl::getWriterVersion() const {
- if (!postscript->has_writerversion()) {
+ if (!contents->postscript->has_writerversion()) {
return WriterVersion_ORIGINAL;
}
- return static_cast<WriterVersion>(postscript->writerversion());
+ return static_cast<WriterVersion>(contents->postscript->writerversion());
}
uint64_t ReaderImpl::getContentLength() const {
@@ -1431,11 +455,11 @@ namespace orc {
}
uint64_t ReaderImpl::getStripeStatisticsLength() const {
- return postscript->metadatalength();
+ return contents->postscript->metadatalength();
}
uint64_t ReaderImpl::getFileFooterLength() const {
- return postscript->footerlength();
+ return contents->postscript->footerlength();
}
uint64_t ReaderImpl::getFilePostscriptLength() const {
@@ -1451,7 +475,7 @@ namespace orc {
}
const std::string& ReaderImpl::getStreamName() const {
- return stream->getName();
+ return contents->stream->getName();
}
std::list<std::string> ReaderImpl::getMetadataKeys() const {
@@ -1480,24 +504,22 @@ namespace orc {
return false;
}
- const std::vector<bool> ReaderImpl::getSelectedColumns() const {
- return selectedColumns;
- }
-
const Type& ReaderImpl::getType() const {
- return *(schema.get());
+ return *(contents->schema.get());
}
- const Type& ReaderImpl::getSelectedType() const {
- if (selectedSchema.get() == nullptr) {
- selectedSchema = buildSelectedType(schema.get(),
- selectedColumns);
+ std::unique_ptr<Statistics>
+ ReaderImpl::getStripeStatistics(uint64_t stripeIndex) const {
+ if (!isMetadataLoaded) {
+ readMetadata();
}
- return *(selectedSchema.get());
- }
-
- uint64_t ReaderImpl::getRowNumber() const {
- return previousRow;
+ if (metadata.get() == nullptr) {
+ throw std::logic_error("No stripe statistics in file");
+ }
+ return std::unique_ptr<Statistics>
+ (new StatisticsImpl(metadata->stripestats
+ (static_cast<int>(stripeIndex)),
+ hasCorrectStatistics()));
}
std::unique_ptr<Statistics> ReaderImpl::getStatistics() const {
@@ -1518,18 +540,18 @@ namespace orc {
}
void ReaderImpl::readMetadata() const {
- uint64_t metadataSize = postscript->metadatalength();
+ uint64_t metadataSize = contents->postscript->metadatalength();
uint64_t metadataStart = fileLength - metadataSize
- - postscript->footerlength() - postscriptLength - 1;
+ - contents->postscript->footerlength() - postscriptLength - 1;
if (metadataSize != 0) {
std::unique_ptr<SeekableInputStream> pbStream =
- createDecompressor(compression,
+ createDecompressor(contents->compression,
std::unique_ptr<SeekableInputStream>
- (new SeekableFileInputStream(stream.get(),
+ (new SeekableFileInputStream(contents->stream.get(),
metadataStart,
metadataSize,
memoryPool)),
- blockSize,
+ contents->blockSize,
memoryPool);
metadata.reset(new proto::Metadata());
if (!metadata->ParseFromZeroCopyStream(pbStream.get())) {
@@ -1537,125 +559,26 @@ namespace orc {
}
}
isMetadataLoaded = true;
- }
-
- std::unique_ptr<Statistics>
- ReaderImpl::getStripeStatistics(uint64_t stripeIndex) const {
- if (!isMetadataLoaded) {
- readMetadata();
- }
- if (metadata.get() == nullptr) {
- throw std::logic_error("No stripe statistics in file");
- }
- return std::unique_ptr<Statistics>
- (new StatisticsImpl(metadata->stripestats
- (static_cast<int>(stripeIndex)),
- hasCorrectStatistics()));
- }
-
-
- void ReaderImpl::seekToRow(uint64_t rowNumber) {
- // Empty file
- if (lastStripe == 0) {
- return;
- }
-
- // If we are reading only a portion of the file
- // (bounded by firstStripe and lastStripe),
- // seeking before or after the portion of interest should return no data.
- // Implement this by setting previousRow to the number of rows in the file.
-
- // seeking past lastStripe
- if ( (lastStripe == static_cast<uint64_t>(footer->stripes_size())
- && rowNumber >= footer->numberofrows()) ||
- (lastStripe < static_cast<uint64_t>(footer->stripes_size())
- && rowNumber >= firstRowOfStripe[lastStripe]) ) {
- currentStripe = static_cast<uint64_t>(footer->stripes_size());
- previousRow = footer->numberofrows();
- return;
- }
-
- uint64_t seekToStripe = 0;
- while (seekToStripe+1 < lastStripe &&
- firstRowOfStripe[seekToStripe+1] <= rowNumber) {
- seekToStripe++;
- }
-
- // seeking before the first stripe
- if (seekToStripe < firstStripe) {
- currentStripe = static_cast<uint64_t>(footer->stripes_size());
- previousRow = footer->numberofrows();
- return;
- }
-
- currentStripe = seekToStripe;
- currentRowInStripe = rowNumber - firstRowOfStripe[currentStripe];
- previousRow = rowNumber;
- startNextStripe();
- reader->skip(currentRowInStripe);
- }
-
- bool ReaderImpl::hasCorrectStatistics() const {
- return getWriterVersion() != WriterVersion_ORIGINAL;
- }
-
- proto::StripeFooter ReaderImpl::getStripeFooter
- (const proto::StripeInformation& info) {
- uint64_t stripeFooterStart = info.offset() + info.indexlength() +
- info.datalength();
- uint64_t stripeFooterLength = info.footerlength();
- std::unique_ptr<SeekableInputStream> pbStream =
- createDecompressor(compression,
- std::unique_ptr<SeekableInputStream>
- (new SeekableFileInputStream(stream.get(),
- stripeFooterStart,
- stripeFooterLength,
- memoryPool)),
- blockSize,
- memoryPool);
- proto::StripeFooter result;
- if (!result.ParseFromZeroCopyStream(pbStream.get())) {
- throw ParseError(std::string("bad StripeFooter from ") +
- pbStream->getName());
- }
- return result;
- }
-
- class StripeStreamsImpl: public StripeStreams {
- private:
- const ReaderImpl& reader;
- const proto::StripeFooter& footer;
- const uint64_t stripeStart;
- InputStream& input;
- MemoryPool& memoryPool;
- const Timezone& writerTimezone;
-
- public:
- StripeStreamsImpl(const ReaderImpl& reader,
- const proto::StripeFooter& footer,
- uint64_t stripeStart,
- InputStream& input,
- MemoryPool& memoryPool,
- const Timezone& writerTimezone);
-
- virtual ~StripeStreamsImpl();
-
- virtual const ReaderOptions& getReaderOptions() const override;
-
- virtual const std::vector<bool> getSelectedColumns() const override;
-
- virtual proto::ColumnEncoding getEncoding(uint64_t columnId
- ) const override;
+ }
- virtual std::unique_ptr<SeekableInputStream>
- getStream(uint64_t columnId,
- proto::Stream_Kind kind,
- bool shouldStream) const override;
+ bool ReaderImpl::hasCorrectStatistics() const {
+ return getWriterVersion() != WriterVersion_ORIGINAL;
+ }
- MemoryPool& getMemoryPool() const override;
+ void ReaderImpl::checkOrcVersion() {
+ std::string version = getFormatVersion();
+ if (version != "0.11" && version != "0.12") {
+ *(options.getErrorStream())
+ << "Warning: ORC file " << contents->stream->getName()
+ << " was written in an unknown format version "
+ << version << "\n";
+ }
+ }
- const Timezone& getWriterTimezone() const override;
- };
+ std::unique_ptr<RowReader> ReaderImpl::getRowReader(
+ const RowReaderOptions& opts) const {
+ return std::unique_ptr<RowReader>(new RowReaderImpl(contents, opts));
+ }
uint64_t maxStreamsForType(const proto::Type& type) {
switch (static_cast<int64_t>(type.kind())) {
@@ -1687,6 +610,66 @@ namespace orc {
}
uint64_t ReaderImpl::getMemoryUse(int stripeIx) {
+ std::vector<bool> selectedColumns;
+ selectedColumns.assign(static_cast<size_t>(contents->footer->types_size()), true);
+ return getMemoryUse(stripeIx, selectedColumns);
+ }
+
+ uint64_t ReaderImpl::getMemoryUseByFieldId(const std::list<uint64_t>& include, int stripeIx) {
+ std::vector<bool> selectedColumns;
+ selectedColumns.assign(static_cast<size_t>(contents->footer->types_size()), false);
+ ColumnSelector column_selector(contents.get());
+ if (contents->schema->getKind() == STRUCT && include.begin() != include.end()) {
+ for(std::list<uint64_t>::const_iterator field = include.begin();
+ field != include.end(); ++field) {
+ column_selector.updateSelectedByFieldId(selectedColumns, *field);
+ }
+ } else {
+ // default is to select all columns
+ std::fill(selectedColumns.begin(), selectedColumns.end(), true);
+ }
+ column_selector.selectParents(selectedColumns, *contents->schema.get());
+ selectedColumns[0] = true; // column 0 is selected by default
+ return getMemoryUse(stripeIx, selectedColumns);
+ }
+
+ uint64_t ReaderImpl::getMemoryUseByName(const std::list<std::string>& names, int stripeIx) {
+ std::vector<bool> selectedColumns;
+ selectedColumns.assign(static_cast<size_t>(contents->footer->types_size()), false);
+ ColumnSelector column_selector(contents.get());
+ if (contents->schema->getKind() == STRUCT && names.begin() != names.end()) {
+ for(std::list<std::string>::const_iterator field = names.begin();
+ field != names.end(); ++field) {
+ column_selector.updateSelectedByName(selectedColumns, *field);
+ }
+ } else {
+ // default is to select all columns
+ std::fill(selectedColumns.begin(), selectedColumns.end(), true);
+ }
+ column_selector.selectParents(selectedColumns, *contents->schema.get());
+ selectedColumns[0] = true; // column 0 is selected by default
+ return getMemoryUse(stripeIx, selectedColumns);
+ }
+
+ uint64_t ReaderImpl::getMemoryUseByTypeId(const std::list<uint64_t>& include, int stripeIx) {
+ std::vector<bool> selectedColumns;
+ selectedColumns.assign(static_cast<size_t>(contents->footer->types_size()), false);
+ ColumnSelector column_selector(contents.get());
+ if (include.begin() != include.end()) {
+ for(std::list<uint64_t>::const_iterator field = include.begin();
+ field != include.end(); ++field) {
+ column_selector.updateSelectedByTypeId(selectedColumns, *field);
+ }
+ } else {
+ // default is to select all columns
+ std::fill(selectedColumns.begin(), selectedColumns.end(), true);
+ }
+ column_selector.selectParents(selectedColumns, *contents->schema.get());
+ selectedColumns[0] = true; // column 0 is selected by default
+ return getMemoryUse(stripeIx, selectedColumns);
+ }
+
+ uint64_t ReaderImpl::getMemoryUse(int stripeIx, std::vector<bool>& selectedColumns) {
uint64_t maxDataLength = 0;
if (stripeIx >= 0 && stripeIx < footer->stripes_size()) {
@@ -1732,29 +715,29 @@ namespace orc {
*/
uint64_t memory = hasStringColumn ? 2 * maxDataLength :
std::min(uint64_t(maxDataLength),
- nSelectedStreams * stream->getNaturalReadSize());
+ nSelectedStreams * contents->stream->getNaturalReadSize());
// Do we need even more memory to read the footer or the metadata?
- if (memory < postscript->footerlength() + DIRECTORY_SIZE_GUESS) {
- memory = postscript->footerlength() + DIRECTORY_SIZE_GUESS;
+ if (memory < contents->postscript->footerlength() + DIRECTORY_SIZE_GUESS) {
+ memory = contents->postscript->footerlength() + DIRECTORY_SIZE_GUESS;
}
- if (memory < postscript->metadatalength()) {
- memory = postscript->metadatalength();
+ if (memory < contents->postscript->metadatalength()) {
+ memory = contents->postscript->metadatalength();
}
// Account for firstRowOfStripe.
- memory += firstRowOfStripe.capacity() * sizeof(uint64_t);
+ memory += static_cast<uint64_t>(footer->stripes_size()) * sizeof(uint64_t);
// Decompressors need buffers for each stream
uint64_t decompressorMemory = 0;
- if (compression != CompressionKind_NONE) {
+ if (contents->compression != CompressionKind_NONE) {
for (int i=0; i < footer->types_size(); i++) {
if (selectedColumns[static_cast<size_t>(i)]) {
const proto::Type& type = footer->types(i);
- decompressorMemory += maxStreamsForType(type) * blockSize;
+ decompressorMemory += maxStreamsForType(type) * contents->blockSize;
}
}
- if (compression == CompressionKind_SNAPPY) {
+ if (contents->compression == CompressionKind_SNAPPY) {
decompressorMemory *= 2; // Snappy decompressor uses a second buffer
}
}
@@ -1762,75 +745,7 @@ namespace orc {
return memory + decompressorMemory ;
}
- StripeStreamsImpl::StripeStreamsImpl(const ReaderImpl& _reader,
- const proto::StripeFooter& _footer,
- uint64_t _stripeStart,
- InputStream& _input,
- MemoryPool& _memoryPool,
- const Timezone& _writerTimezone
- ): reader(_reader),
- footer(_footer),
- stripeStart(_stripeStart),
- input(_input),
- memoryPool(_memoryPool),
- writerTimezone(_writerTimezone) {
- // PASS
- }
-
- StripeStreamsImpl::~StripeStreamsImpl() {
- // PASS
- }
-
- const ReaderOptions& StripeStreamsImpl::getReaderOptions() const {
- return reader.getReaderOptions();
- }
-
- const std::vector<bool> StripeStreamsImpl::getSelectedColumns() const {
- return reader.getSelectedColumns();
- }
-
- proto::ColumnEncoding StripeStreamsImpl::getEncoding(uint64_t columnId
- ) const {
- return footer.columns(static_cast<int>(columnId));
- }
-
- const Timezone& StripeStreamsImpl::getWriterTimezone() const {
- return writerTimezone;
- }
-
- std::unique_ptr<SeekableInputStream>
- StripeStreamsImpl::getStream(uint64_t columnId,
- proto::Stream_Kind kind,
- bool shouldStream) const {
- uint64_t offset = stripeStart;
- for(int i = 0; i < footer.streams_size(); ++i) {
- const proto::Stream& stream = footer.streams(i);
- if (stream.has_kind() &&
- stream.kind() == kind &&
- stream.column() == static_cast<uint64_t>(columnId)) {
- uint64_t myBlock = shouldStream ? input.getNaturalReadSize():
- stream.length();
- return createDecompressor(reader.getCompression(),
- std::unique_ptr<SeekableInputStream>
- (new SeekableFileInputStream
- (&input,
- offset,
- stream.length(),
- memoryPool,
- myBlock)),
- reader.getCompressionSize(),
- memoryPool);
- }
- offset += stream.length();
- }
- return std::unique_ptr<SeekableInputStream>();
- }
-
- MemoryPool& StripeStreamsImpl::getMemoryPool() const {
- return memoryPool;
- }
-
- void ReaderImpl::startNextStripe() {
+ void RowReaderImpl::startNextStripe() {
reader.reset(); // ColumnReaders use lots of memory; free old memory first
currentStripeInfo = footer->stripes(static_cast<int>(currentStripe));
currentStripeFooter = getStripeFooter(currentStripeInfo);
@@ -1841,23 +756,13 @@ namespace orc {
localTimezone;
StripeStreamsImpl stripeStreams(*this, currentStripeFooter,
currentStripeInfo.offset(),
- *(stream.get()),
+ *(contents->stream.get()),
memoryPool,
writerTimezone);
- reader = buildReader(*(schema.get()), stripeStreams);
- }
-
- void ReaderImpl::checkOrcVersion() {
- std::string version = getFormatVersion();
- if (version != "0.11" && version != "0.12") {
- *(options.getErrorStream())
- << "Warning: ORC file " << stream->getName()
- << " was written in an unknown format version "
- << version << "\n";
- }
+ reader = buildReader(*contents->schema.get(), stripeStreams);
}
- bool ReaderImpl::next(ColumnVectorBatch& data) {
+ bool RowReaderImpl::next(ColumnVectorBatch& data) {
if (currentStripe >= lastStripe) {
data.numElements = 0;
if (lastStripe > 0) {
@@ -1886,7 +791,7 @@ namespace orc {
return rowsToRead != 0;
}
- std::unique_ptr<ColumnVectorBatch> ReaderImpl::createRowBatch
+ std::unique_ptr<ColumnVectorBatch> RowReaderImpl::createRowBatch
(uint64_t capacity) const {
return getSelectedType().createRowBatch(capacity, memoryPool);
}
@@ -1978,8 +883,7 @@ namespace orc {
std::unique_ptr<Reader> createReader(std::unique_ptr<InputStream> stream,
const ReaderOptions& options) {
MemoryPool *memoryPool = options.getMemoryPool();
- std::unique_ptr<proto::PostScript> ps;
- std::unique_ptr<proto::Footer> footer;
+ std::shared_ptr<FileContents> contents = std::shared_ptr<FileContents>(new FileContents());
std::string serializedFooter = options.getSerializedFileTail();
uint64_t fileLength;
uint64_t postscriptLength;
@@ -1989,8 +893,8 @@ namespace orc {
if (!tail.ParseFromString(serializedFooter)) {
throw ParseError("Failed to parse the file tail from string");
}
- ps.reset(new proto::PostScript(tail.postscript()));
- footer.reset(new proto::Footer(tail.footer()));
+ contents->postscript.reset(new proto::PostScript(tail.postscript()));
+ contents->footer.reset(new proto::Footer(tail.footer()));
fileLength = tail.filelength();
postscriptLength = tail.postscriptlength();
} else {
@@ -2007,8 +911,9 @@ namespace orc {
stream->read(buffer->data(), readSize, fileLength - readSize);
postscriptLength = buffer->data()[readSize - 1] & 0xff;
- ps = readPostscript(stream.get(), buffer, postscriptLength);
- uint64_t footerSize = ps->footerlength();
+ contents->postscript = REDUNDANT_MOVE(readPostscript(stream.get(),
+ buffer, postscriptLength));
+ uint64_t footerSize = contents->postscript->footerlength();
uint64_t tailSize = 1 + postscriptLength + footerSize;
uint64_t footerOffset;
@@ -2020,330 +925,69 @@ namespace orc {
footerOffset = readSize - tailSize;
}
- footer = readFooter(stream.get(), buffer, footerOffset, *ps,
- *memoryPool);
+ contents->footer = REDUNDANT_MOVE(readFooter(stream.get(), buffer,
+ footerOffset, *contents->postscript, *memoryPool));
delete buffer;
}
- return std::unique_ptr<Reader>(new ReaderImpl(std::move(stream),
+ contents->stream = std::move(stream);
+ return std::unique_ptr<Reader>(new ReaderImpl(std::move(contents),
options,
- std::move(ps),
- std::move(footer),
fileLength,
postscriptLength));
}
- ColumnStatistics::~ColumnStatistics() {
- // PASS
- }
-
- BinaryColumnStatistics::~BinaryColumnStatistics() {
- // PASS
- }
-
- BooleanColumnStatistics::~BooleanColumnStatistics() {
- // PASS
- }
-
- DateColumnStatistics::~DateColumnStatistics() {
- // PASS
- }
-
- DecimalColumnStatistics::~DecimalColumnStatistics() {
- // PASS
- }
-
- DoubleColumnStatistics::~DoubleColumnStatistics() {
- // PASS
- }
-
- IntegerColumnStatistics::~IntegerColumnStatistics() {
- // PASS
- }
-
- StringColumnStatistics::~StringColumnStatistics() {
- // PASS
- }
-
- TimestampColumnStatistics::~TimestampColumnStatistics() {
- // PASS
- }
-
- ColumnStatisticsImpl::~ColumnStatisticsImpl() {
- // PASS
- }
-
- BinaryColumnStatisticsImpl::~BinaryColumnStatisticsImpl() {
- // PASS
- }
-
- BooleanColumnStatisticsImpl::~BooleanColumnStatisticsImpl() {
- // PASS
- }
-
- DateColumnStatisticsImpl::~DateColumnStatisticsImpl() {
- // PASS
- }
-
- DecimalColumnStatisticsImpl::~DecimalColumnStatisticsImpl() {
- // PASS
+ std::string streamKindToString(StreamKind kind) {
+ switch (static_cast<int>(kind)) {
+ case StreamKind_PRESENT:
+ return "present";
+ case StreamKind_DATA:
+ return "data";
+ case StreamKind_LENGTH:
+ return "length";
+ case StreamKind_DICTIONARY_DATA:
+ return "dictionary";
+ case StreamKind_DICTIONARY_COUNT:
+ return "dictionary count";
+ case StreamKind_SECONDARY:
+ return "secondary";
+ case StreamKind_ROW_INDEX:
+ return "index";
+ case StreamKind_BLOOM_FILTER:
+ return "bloom";
+ }
+ std::stringstream buffer;
+ buffer << "unknown - " << kind;
+ return buffer.str();
}
- DoubleColumnStatisticsImpl::~DoubleColumnStatisticsImpl() {
- // PASS
+ std::string columnEncodingKindToString(ColumnEncodingKind kind) {
+ switch (static_cast<int>(kind)) {
+ case ColumnEncodingKind_DIRECT:
+ return "direct";
+ case ColumnEncodingKind_DICTIONARY:
+ return "dictionary";
+ case ColumnEncodingKind_DIRECT_V2:
+ return "direct rle2";
+ case ColumnEncodingKind_DICTIONARY_V2:
+ return "dictionary rle2";
+ }
+ std::stringstream buffer;
+ buffer << "unknown - " << kind;
+ return buffer.str();
}
- IntegerColumnStatisticsImpl::~IntegerColumnStatisticsImpl() {
+ RowReader::~RowReader() {
// PASS
}
- StringColumnStatisticsImpl::~StringColumnStatisticsImpl() {
+ Reader::~Reader() {
// PASS
}
- TimestampColumnStatisticsImpl::~TimestampColumnStatisticsImpl() {
+ InputStream::~InputStream() {
// PASS
- }
-
- ColumnStatisticsImpl::ColumnStatisticsImpl
- (const proto::ColumnStatistics& pb) {
- valueCount = pb.numberofvalues();
- }
-
- BinaryColumnStatisticsImpl::BinaryColumnStatisticsImpl
- (const proto::ColumnStatistics& pb, bool correctStats){
- valueCount = pb.numberofvalues();
- if (!pb.has_binarystatistics() || !correctStats) {
- _hasTotalLength = false;
-
- totalLength = 0;
- }else{
- _hasTotalLength = pb.binarystatistics().has_sum();
- totalLength = static_cast<uint64_t>(pb.binarystatistics().sum());
- }
- }
-
- BooleanColumnStatisticsImpl::BooleanColumnStatisticsImpl
- (const proto::ColumnStatistics& pb, bool correctStats){
- valueCount = pb.numberofvalues();
- if (!pb.has_bucketstatistics() || !correctStats) {
- _hasCount = false;
- trueCount = 0;
- }else{
- _hasCount = true;
- trueCount = pb.bucketstatistics().count(0);
- }
- }
-
- DateColumnStatisticsImpl::DateColumnStatisticsImpl
- (const proto::ColumnStatistics& pb, bool correctStats){
- valueCount = pb.numberofvalues();
- if (!pb.has_datestatistics() || !correctStats) {
- _hasMinimum = false;
- _hasMaximum = false;
-
- minimum = 0;
- maximum = 0;
- } else {
- _hasMinimum = pb.datestatistics().has_minimum();
- _hasMaximum = pb.datestatistics().has_maximum();
- minimum = pb.datestatistics().minimum();
- maximum = pb.datestatistics().maximum();
- }
- }
-
- DecimalColumnStatisticsImpl::DecimalColumnStatisticsImpl
- (const proto::ColumnStatistics& pb, bool correctStats){
- valueCount = pb.numberofvalues();
- if (!pb.has_decimalstatistics() || !correctStats) {
- _hasMinimum = false;
- _hasMaximum = false;
- _hasSum = false;
- }else{
- const proto::DecimalStatistics& stats = pb.decimalstatistics();
- _hasMinimum = stats.has_minimum();
- _hasMaximum = stats.has_maximum();
- _hasSum = stats.has_sum();
-
- minimum = stats.minimum();
- maximum = stats.maximum();
- sum = stats.sum();
- }
- }
-
- DoubleColumnStatisticsImpl::DoubleColumnStatisticsImpl
- (const proto::ColumnStatistics& pb){
- valueCount = pb.numberofvalues();
- if (!pb.has_doublestatistics()) {
- _hasMinimum = false;
- _hasMaximum = false;
- _hasSum = false;
-
- minimum = 0;
- maximum = 0;
- sum = 0;
- }else{
- const proto::DoubleStatistics& stats = pb.doublestatistics();
- _hasMinimum = stats.has_minimum();
- _hasMaximum = stats.has_maximum();
- _hasSum = stats.has_sum();
-
- minimum = stats.minimum();
- maximum = stats.maximum();
- sum = stats.sum();
- }
- }
-
- IntegerColumnStatisticsImpl::IntegerColumnStatisticsImpl
- (const proto::ColumnStatistics& pb){
- valueCount = pb.numberofvalues();
- if (!pb.has_intstatistics()) {
- _hasMinimum = false;
- _hasMaximum = false;
- _hasSum = false;
-
- minimum = 0;
- maximum = 0;
- sum = 0;
- }else{
- const proto::IntegerStatistics& stats = pb.intstatistics();
- _hasMinimum = stats.has_minimum();
- _hasMaximum = stats.has_maximum();
- _hasSum = stats.has_sum();
-
- minimum = stats.minimum();
- maximum = stats.maximum();
- sum = stats.sum();
- }
- }
-
- StringColumnStatisticsImpl::StringColumnStatisticsImpl
- (const proto::ColumnStatistics& pb, bool correctStats){
- valueCount = pb.numberofvalues();
- if (!pb.has_stringstatistics() || !correctStats) {
- _hasMinimum = false;
- _hasMaximum = false;
- _hasTotalLength = false;
-
- totalLength = 0;
- }else{
- const proto::StringStatistics& stats = pb.stringstatistics();
- _hasMinimum = stats.has_minimum();
- _hasMaximum = stats.has_maximum();
- _hasTotalLength = stats.has_sum();
-
- minimum = stats.minimum();
- maximum = stats.maximum();
- totalLength = static_cast<uint64_t>(stats.sum());
- }
- }
-
- TimestampColumnStatisticsImpl::TimestampColumnStatisticsImpl
- (const proto::ColumnStatistics& pb, bool correctStats) {
- valueCount = pb.numberofvalues();
- if (!pb.has_timestampstatistics() || !correctStats) {
- _hasMinimum = false;
- _hasMaximum = false;
- minimum = 0;
- maximum = 0;
- }else{
- const proto::TimestampStatistics& stats = pb.timestampstatistics();
- _hasMinimum = stats.has_minimum();
- _hasMaximum = stats.has_maximum();
-
- minimum = stats.minimum();
- maximum = stats.maximum();
- }
- }
-
- void ReaderImpl::updateSelectedByFieldId(uint64_t fieldId) {
- if (fieldId < schema->getSubtypeCount()) {
- selectChildren(*schema->getSubtype(fieldId));
- } else {
- std::stringstream buffer;
- buffer << "Invalid column selected " << fieldId << " out of "
- << schema->getSubtypeCount();
- throw ParseError(buffer.str());
- }
- }
-
- void ReaderImpl::updateSelectedByTypeId(uint64_t typeId) {
- if (typeId < selectedColumns.size()) {
- const Type& type = *idTypeMap[typeId];
- selectChildren(type);
- } else {
- std::stringstream buffer;
- buffer << "Invalid type id selected " << typeId << " out of "
- << selectedColumns.size();
- throw ParseError(buffer.str());
- }
- }
-
- void ReaderImpl::updateSelectedByName(const std::string& fieldName) {
- std::map<std::string, uint64_t>::const_iterator ite = nameIdMap.find(fieldName);
- if (ite != nameIdMap.end()) {
- updateSelectedByTypeId(ite->second);
- } else {
- throw ParseError("Invalid column selected " + fieldName);
- }
- }
-
- void ReaderImpl::selectChildren(const Type& type) {
- size_t id = static_cast<size_t>(type.getColumnId());
- if (!selectedColumns[id]) {
- selectedColumns[id] = true;
- for(size_t c = id; c <= type.getMaximumColumnId(); ++c){
- selectedColumns[c] = true;
- }
- }
- }
-
- /**
- * Recurses over a type tree and selects the parents of every selected type.
- * @return true if any child was selected.
- */
- bool ReaderImpl::selectParents(const Type& type) {
- size_t id = static_cast<size_t>(type.getColumnId());
- bool result = selectedColumns[id];
- for(uint64_t c=0; c < type.getSubtypeCount(); ++c) {
- result |= selectParents(*type.getSubtype(c));
- }
- selectedColumns[id] = result;
- return result;
- }
-
- /**
- * Recurses over a type tree and build two maps
- * map<TypeName, TypeId>, map<TypeId, Type>
- */
- void ReaderImpl::buildTypeNameIdMap(const Type* type, std::vector<std::string>& columns) {
- // map<type_id, Type*>
- idTypeMap[type->getColumnId()] = type;
+ };
- if (orc::STRUCT == type->getKind()) {
- for (size_t i = 0; i < type->getSubtypeCount(); ++i) {
- const std::string& fieldName = type->getFieldName(i);
- columns.push_back(fieldName);
- nameIdMap[toDotColumnPath(columns)] = type->getSubtype(i)->getColumnId();
- buildTypeNameIdMap(type->getSubtype(i), columns);
- columns.pop_back();
- }
- } else {
- // other non-primitive type
- for (size_t j = 0; j < type->getSubtypeCount(); ++j) {
- buildTypeNameIdMap(type->getSubtype(j), columns);
- }
- }
- }
- std::string ReaderImpl::toDotColumnPath(const std::vector<std::string>& columns) {
- if (columns.empty()) {
- return std::string();
- }
- std::ostringstream columnStream;
- std::copy(columns.begin(), columns.end(),
- std::ostream_iterator<std::string>(columnStream, "."));
- std::string columnPath = columnStream.str();
- return columnPath.substr(0, columnPath.length() - 1);
- }
}// namespace
http://git-wip-us.apache.org/repos/asf/orc/blob/6bf63bb1/c++/src/Reader.hh
----------------------------------------------------------------------
diff --git a/c++/src/Reader.hh b/c++/src/Reader.hh
new file mode 100644
index 0000000..3b2eac1
--- /dev/null
+++ b/c++/src/Reader.hh
@@ -0,0 +1,271 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef ORC_READER_IMPL_HH
+#define ORC_READER_IMPL_HH
+
+#include "orc/Int128.hh"
+#include "orc/OrcFile.hh"
+#include "orc/Reader.hh"
+
+#include "ColumnReader.hh"
+#include "Exceptions.hh"
+#include "RLE.hh"
+#include "TypeImpl.hh"
+
+namespace orc {
+
+ class ReaderOptions;
+ class RowReaderOptions;
+ class StripeInformation;
+
+ static const uint64_t DIRECTORY_SIZE_GUESS = 16 * 1024;
+
+ /**
+ * State shared between Reader and Row Reader
+ */
+ struct FileContents {
+ std::unique_ptr<InputStream> stream;
+ std::unique_ptr<proto::PostScript> postscript;
+ std::unique_ptr<proto::Footer> footer;
+ std::unique_ptr<Type> schema;
+ uint64_t blockSize;
+ CompressionKind compression;
+ };
+
+ class ReaderImpl;
+
+ class ColumnSelector {
+ private:
+ std::map<std::string, uint64_t> nameIdMap;
+ std::map<uint64_t, const Type*> idTypeMap;
+ const FileContents* contents;
+ std::vector<std::string> columns;
+
+ // build map from type name and id, id to Type
+ void buildTypeNameIdMap(const Type* type);
+ std::string toDotColumnPath();
+
+ public:
+ // Select a field by name
+ void updateSelectedByName(std::vector<bool>& selectedColumns, const std::string& name);
+ // Select a field by id
+ void updateSelectedByFieldId(std::vector<bool>& selectedColumns, uint64_t fieldId);
+ // Select a type by id
+ void updateSelectedByTypeId(std::vector<bool>& selectedColumns, uint64_t typeId);
+
+ // Select all of the recursive children of the given type.
+ void selectChildren(std::vector<bool>& selectedColumns, const Type& type);
+
+ // For each child of type, select it if one of its children
+ // is selected.
+ bool selectParents(std::vector<bool>& selectedColumns, const Type& type);
+ /**
+ * Constructor that selects columns.
+ * @param contents of the file
+ */
+ ColumnSelector(const FileContents* contents);
+
+ // Select the columns from the RowReaderoptions object
+ void updateSelected(std::vector<bool>& selectedColumns, const RowReaderOptions& options);
+
+ // Select the columns from the Readeroptions object
+ void updateSelected(std::vector<bool>& selectedColumns, const ReaderOptions& options);
+ };
+
+
+ class RowReaderImpl : public RowReader {
+ private:
+ const Timezone& localTimezone;
+
+ // contents
+ std::shared_ptr<FileContents> contents;
+
+ // inputs
+ std::vector<bool> selectedColumns;
+ const RowReaderOptions& options;
+
+ // custom memory pool
+ MemoryPool& memoryPool;
+
+ // footer
+ proto::Footer* footer;
+ DataBuffer<uint64_t> firstRowOfStripe;
+ mutable std::unique_ptr<Type> selectedSchema;
+
+ // reading state
+ uint64_t previousRow;
+ uint64_t firstStripe;
+ uint64_t currentStripe;
+ uint64_t lastStripe; // the stripe AFTER the last one
+ uint64_t currentRowInStripe;
+ uint64_t rowsInCurrentStripe;
+ proto::StripeInformation currentStripeInfo;
+ proto::StripeFooter currentStripeFooter;
+ std::unique_ptr<ColumnReader> reader;
+
+ // internal methods
+ proto::StripeFooter getStripeFooter(const proto::StripeInformation& info);
+ void startNextStripe();
+
+ public:
+ /**
+ * Constructor that lets the user specify additional options.
+ * @param contents of the file
+ * @param options options for reading
+ */
+ RowReaderImpl(std::shared_ptr<FileContents> contents,
+ const RowReaderOptions& options);
+
+ // Select the columns from the options object
+ void updateSelected();
+ const std::vector<bool> getSelectedColumns() const override;
+
+ const Type& getSelectedType() const override;
+
+ std::unique_ptr<ColumnVectorBatch> createRowBatch(uint64_t size
+ ) const override;
+
+ bool next(ColumnVectorBatch& data) override;
+
+ const RowReaderOptions& getRowReaderOptions() const;
+
+ CompressionKind getCompression() const;
+
+ uint64_t getCompressionSize() const;
+
+ uint64_t getRowNumber() const override;
+
+ void seekToRow(uint64_t rowNumber) override;
+
+ MemoryPool* getMemoryPool() const ;
+
+ };
+
+ class ReaderImpl : public Reader {
+ private:
+ // FileContents
+ std::shared_ptr<FileContents> contents;
+
+ // inputs
+ const ReaderOptions& options;
+ const uint64_t fileLength;
+ const uint64_t postscriptLength;
+
+ // custom memory pool
+ MemoryPool& memoryPool;
+
+ // footer
+ proto::Footer* footer;
+ uint64_t numberOfStripes;
+ uint64_t getMemoryUse(int stripeIx, std::vector<bool>& selectedColumns);
+
+ // internal methods
+ void readMetadata() const;
+ void checkOrcVersion();
+
+ // metadata
+ mutable std::unique_ptr<proto::Metadata> metadata;
+ mutable bool isMetadataLoaded;
+ public:
+ /**
+ * Constructor that lets the user specify additional options.
+ * @param contents of the file
+ * @param options options for reading
+ * @param fileLength the length of the file in bytes
+ * @param postscriptLength the length of the postscript in bytes
+ */
+ ReaderImpl(std::shared_ptr<FileContents> contents,
+ const ReaderOptions& options,
+ uint64_t fileLength,
+ uint64_t postscriptLength);
+
+ const ReaderOptions& getReaderOptions() const;
+
+ CompressionKind getCompression() const override;
+
+ std::string getFormatVersion() const override;
+
+ WriterVersion getWriterVersion() const override;
+
+ uint64_t getNumberOfRows() const override;
+
+ uint64_t getRowIndexStride() const override;
+
+ std::list<std::string> getMetadataKeys() const override;
+
+ std::string getMetadataValue(const std::string& key) const override;
+
+ bool hasMetadataValue(const std::string& key) const override;
+
+ uint64_t getCompressionSize() const override;
+
+ uint64_t getNumberOfStripes() const override;
+
+ std::unique_ptr<StripeInformation> getStripe(uint64_t
+ ) const override;
+
+ uint64_t getNumberOfStripeStatistics() const override;
+
+ const std::string& getStreamName() const override;
+
+ std::unique_ptr<Statistics>
+ getStripeStatistics(uint64_t stripeIndex) const override;
+
+ std::unique_ptr<RowReader> getRowReader(const RowReaderOptions& options
+ ) const override;
+
+ uint64_t getContentLength() const override;
+ uint64_t getStripeStatisticsLength() const override;
+ uint64_t getFileFooterLength() const override;
+ uint64_t getFilePostscriptLength() const override;
+ uint64_t getFileLength() const override;
+
+ std::unique_ptr<Statistics> getStatistics() const override;
+
+ std::unique_ptr<ColumnStatistics> getColumnStatistics(uint32_t columnId
+ ) const override;
+
+ std::string getSerializedFileTail() const override;
+
+ const Type& getType() const override;
+
+ bool hasCorrectStatistics() const override;
+
+ const proto::PostScript* getPostscript() const {return contents->postscript.get();}
+
+ uint64_t getBlockSize() const {return contents->blockSize;}
+
+ const proto::Footer* getFooter() const {return contents->footer.get();}
+
+ const Type* getSchema() const {return contents->schema.get();}
+
+ InputStream* getStream() const {return contents->stream.get();}
+
+ uint64_t getMemoryUse(int stripeIx = -1) override;
+
+ uint64_t getMemoryUseByFieldId(const std::list<uint64_t>& include, int stripeIx=-1) override;
+
+ uint64_t getMemoryUseByName(const std::list<std::string>& names, int stripeIx=-1) override;
+
+ uint64_t getMemoryUseByTypeId(const std::list<uint64_t>& include, int stripeIx=-1) override;
+ };
+
+}// namespace
+
+#endif